Priest Tomb



写了个简单的logstash-output-rocketmq插件

写在前面

近期参与一个新的项目,姑且是负责了数据存储相关模块的需求分析和设计(其实就是参考学习网络上流传的其他大厂分享的成熟方案),目前的方案是用 FileBeat + LogStash 做收集,之后再存到 HDFS、HBase、MySQL

本来是看到 LogStash 有现成的直接输出到 HDFS 的插件(logstash-output-webhdfs),就计划先把数据落到 HDFS,后续通过定时程序从 HDFS 取数据再分别存到 HBase 和 MySQL 去来着,不过被领导否了,说从 LogStash 一步到位,同时把数据扔到 HDFS、HBase、MySQL 去,那这样的话就得来一个消息队列在中间,这样一方面也方便我们自己对数据做一个把控,毕竟数据还要分别存到三个地方去,另一方面也方便以后做其他扩展,三下五除二就敲定了要用 Rocketmq(之前的项目中有使用,所以不想再多费精力用其他的消息队列)

然而网上搜了很久,LogStash 的官方输出插件里没有 Rocketmq,连个民间第三方的都没搜到,所以不得不想办法自己来写一个了。。

花了两天写了个超级简陋的版本,现在丢 github 上去了:logstash-output-rocketmq,还煞有其事地准备了英文的 README (其实就是机翻)


环境说明

Ruby 2.6

LogStash 6.4

Rocketmq 4.2


自己动手

0. 学 Ruby

因为 LogStash 是用 Ruby 写的,又恰好不会这个语言,索性借此机会学习一下

看官方文档和菜鸟教程学了几天,把基本的语法概念了解了个大概,于是准备开始第二步

1. 学源码

先前在做方案设计的时候在网上找到了 LogStash 的一个第三方插件 logstash-output-jdbc,当时想着能不能像 LogStash 直接输出 HDFS 那样,也直接输出到数据库去,就找到了这个。在自己写插件的过程中,参考了这个插件的源码居多(核心思路来自于此)

另外也看了官方的另外两个插件 logstash-output-kafkalogstash-output-rabbitmq

偷偷吐槽一句,Ruby 的代码风格可真”飘逸”。。

2. 写插件

先介绍思路吧,因为 Rocketmq 没有 JRuby 实现的客户端(logstash-output-rabbitmq 就是借助了 JRuby 实现的客户端来做的),所以这里使用了官方的客户端 jar 包,将使用客户端所需的关键 jar 包放到某个目录下,在初始化插件时加载,就可以在插件中直接调用 jar 包中的类和方法实现生产者的初始化和消息发送了

2.1. 准备 example

参考官方文档 How to write a Logstash output plugin,下载 logstash-output-example

打开看一下会发现还是有很多乱七八糟的文件的,最关键的几个文件是:

  • lib/logstash/outputs/example.rb 插件的核心代码文件,主要用于定义插件名称、向 LogStash 注册插件、接收事件(并处理)等

  • spec/outputs/example_spec.rb 插件的测试文件,使用 RSpec 语法,主要是用于功能测试

  • logstash-output-example.gemspec 插件的说明文件,描述插件的基本信息、配置插件运行依赖的文件等,在看其他插件源码时甚至看到能在这里引入 Maven,不明觉厉

将一些 example 都修改成想实现的插件的名字就可以了,比如我这里就是叫 rocketmq

2.2. 写核心文件

因为整个脚本的内容不多,我这里就直接附代码了,在代码注释中进行详细的介绍说明

注:这篇博客里的代码不再更新,最新的插件代码可以到库里

# encoding: utf-8
require "logstash/outputs/base"
require "logstash/namespace"
require "java"

# 自己写的插件类需要继承对应的 Base 类
class LogStash::Outputs::Rocketmq < LogStash::Outputs::Base

  # 设置插件可多线程并发执行
  # 默认为 single,即使配置多个 workers,也只能依次运行 multi_receive 方法(性能上来说变成单线程)
  concurrency :shared

  config_name "rocketmq"

  # 使用 config 定义插件运行所需要的基本参数
  # 本地 Logstash 的路径,必需,如 C:/ELK/logstash、/usr/local/logstash
  config :logstash_path, :validate => :string, :required => true

  # Rocketmq 的 NameServer 地址,必需,如 192.168.10.10:5678
  config :name_server_addr, :validate => :string, :required => true

  # Rocketmq 的 producer group
  config :producer_group, :validate => :string, :default => "defaultProducerGroup"

  # Message 的 topic,必需
  config :topic, :validate => :string, :required => true

  # Message 的 tag
  config :tag, :validate => :string, :default => "defaultTag"

  # Message 的 key
  config :key, :validate => :string, :default => "defaultKey"

  # 发送异常后的重试次数,默认 2 次
  config :retry_times, :validate => :number, :default => 2

  # 注册方法,在 LogStash 启动时会调用,相当于对插件进行初始化
  def register
    load_jar_files

    @stopping = Concurrent::AtomicBoolean.new(false)

    # 创建生产者对象
    @producer = org.apache.rocketmq.client.producer.DefaultMQProducer.new(producer_group)
    @producer.setNamesrvAddr(name_server_addr)
    @producer.start
  end

  # 接收事件方法,接收 events 对象,在其他插件里还有看到 multi_receive_encoded 方法
  # 入参是 events_and_data 对象,这两个方法有什么不同我反正是不懂
  def multi_receive(events)
    return if events.empty?
    events.each do |event|
      retrying_send(event)
    end
  end

  # 到我们指定的目录下加载依赖的 jar 文件
  def load_jar_files
    jarpath = logstash_path + "/vendor/jar/rocketmq/*.jar"
    @logger.info("RocketMq plugin required jar files are loadding... Jar files path: ", path: jarpath)

    jars = Dir[jarpath]
    raise LogStash::ConfigurationError, 'RocketMq plugin init error, no jars found! Please check the jar files path!' if jars.empty?

    jars.each do |jar|
      @logger.trace('RocketMq plugin loaded a jar: ', jar: jar)
      require jar
    end
  end

  # 生产者客户端核心功能方法
  def retrying_send(event)
    sent_times = 0

    begin
      # 配置 message 对象
      mq_message = org.apache.rocketmq.common.message.Message.new
      mq_message.setTopic(topic)
      mq_message.setTags(tag)
      mq_message.setKeys(key)
      mq_message.setBody("#{event}".bytes)
      result = @producer.send(mq_message)

      if result.nil?
        raise "Send message error! Result is null."
      end

      if org.apache.rocketmq.client.producer.SendStatus::SEND_OK != result.getSendStatus
        status_name = result.getSendStatus.name
        raise "Send message error! Result code is #{status_name}"
      end
    rescue => e
      @logger.error('An Exception Occured!!',
                    :message => e.message,
                    :exception => e.class)
      if @stopping.false? and (sent_times < retry_times)
        # 重试
        sent_times += 1
        retry
      else
        # 根据实际需求处理没发送成功的消息
        puts "Message send failed: #{event}"
      end
    end
  end

  # LogStash 停止时会调用该方法
  def close
    @stopping.make_true
    @producer.shutdown
  end
end

3. 打包安装测试

一方面懒得学 RSpec 写测试代码,一方面没用过 LogStash,不知道影响插件运行的最关键的 multi_receive 方法的入参 events 到底是什么内容,所以写了点测试输出的代码,直接打包安装进行测试

为了方便,我在自己本机上安装了 Filebeat 和 LogStash(幸亏这两个软件对 windows 都有不错的支持),然后用命令 gem build logstash-output-rocketmq.gemspec 对插件进行打包,得到 gem 文件后,再到 LogStash 的目录下用命令 bin/logstash-plugin install logstash-output-rocketmq-0.1.0.gem 进行安装

安装成功后对 FileBeat 和 LogStash 进行简单的配置,测试了一些日志文件,发现插件正常可用,于是暂时收工

4. 注意

真实的安装环境可能会没有网络,需要离线安装插件,所以要用其他命令

打包

bin/logstash-plugin prepare-offline-pack logstash-output-rocketmq

安装

bin/logstash-plugin install file:///path/to/logstash-offline-plugins-6.4.0.zip


最后

在 github 上也有说,这个版本写出来只能称得上是个 demo,毕竟也没考虑其他 LogStash 版本和 Rocketmq 版本,写出来目前可能够我的工作需要(后期按需再调整优化),因为没搜到网上有关于 Rocketmq 版的 LogStash output 插件,所以发出来做个分享

可能实际写得很烂哈哈,还请见谅,毕竟这几个东西我之前都还没用过。。如果有更好的思路或者代码有写错的地方,请发 issue 或者直接评论告诉我,thx