Priest Tomb



从源码学习 Logstash 内存式队列的运行原理

duration_in_millis 是什么

在上篇博客《LogIQ —— 一个 Logstash 监控数据展示页面》中关于管道状态数据展示中有提到执行耗时,在 Logstash 的监控 API 中能拿到的数据就是这个 duration_in_millis ,单位是毫秒,更确切的说,这个数据指的是过滤器和输出插件的执行耗时。对于输入插件而言,是另一个 queue_push_duration_in_millis

关于为什么要研究这两个数据,起因主要是在对 Logstash 做监控时,不只是需要看 Logstash 处理了多少数据,还想关注 Logstash 处理的快不快,如果慢,是哪里慢,因为一般我们会使用不止一个过滤器插件甚至输出插件。

比如之前项目中有一个应用场景,配置了五六个过滤器,输出时使用了一个第三方插件 logstash-output-jdbc 。使用者发现日志采集比较慢,反馈到我这里时,下意识觉得是不是过滤器配置太多了,排查后发现其中一个过滤器确实比其他的慢很多,但更多的是因为 jdbc 插件默认不支持批量入库,一条一条入库当然会慢。考虑到过滤器是业务层面必须的配置,最后是魔改了 jdbc 插件,使其支持批量入库,将入库效率提升了近 4 倍,相比之下,一个过滤器的“缓慢”已经可以忽略不计。

比较插件的耗时问题,主要就是观察监控 API 中的 duration_in_millis ,最早接触监控 API 时,对这些参数不是很了解,怕对字面意思的理解和实际有偏差,虽然当时在官方论坛中找到了官方人员对参数的解释,因为一会儿说读客户端写客户端,一会儿说持久化队列内存式队列,让一个新手看得满脸问号,最后还是翻看了源码来验证自己的猜想和理解。

下面的解析部分以 Logstash 的内存式队列为例,会涉及 Logstash 启动、各插件间的数据传递、监控 API 中的个别性能指标


版本说明

Logstash 6.4.0


太长不看

  • Logstash 启动时根据配置的队列类型进行初始化,内存式队列将根据 pipeline.batch.sizepipeline.workers 初始化一个有界阻塞队列(ArrayBlockingQueue)
  • 利用阻塞队列,初始化一个写客户端和读客户端,实现一个生产者+消费者模型
  • 写客户端即输入插件的核心,输入插件接收数据,转化为 Logstash 内的 event 对象,生产资源
  • 读客户端即过滤器和输出插件的核心,消费资源,交由过滤器和输出插件依次执行
  • queue_push_duration_in_millis 是写客户端生产资源时被阻塞的时间,该值越大,说明消费者性能越低
  • duration_in_millis 是读客户端获取资源后,所有过滤器和输出插件执行业务处理的时间

相关源码解析

0. pipeline.rb

该脚本即为 Logstash 启动环节的关键,从代码中可以看到 pipeline 的初始化和启动,以及启动、运行各工作线程(包括输入、过滤、输出插件)的关键方法。

这里先附上 pipeline 初始化和工作线程运行的部分:

def initialize(pipeline_config, namespaced_metric = nil, agent = nil)
  super
  # 初始化队列
  open_queue

  # 一些实例变量的初始化
  @worker_threads = []
  @signal_queue = java.util.concurrent.LinkedBlockingQueue.new
  ...
end

def worker_loop(batch_size, batch_delay)
  filter_queue_client.set_batch_dimensions(batch_size, batch_delay)
  output_events_map = Hash.new { |h, k| h[k] = [] }
  while true
    signal = @signal_queue.poll || NO_SIGNAL
    # 从队列中读取一批数据,并启动监测
    batch = filter_queue_client.read_batch.to_java
    batch_size = batch.filteredSize
    if batch_size > 0
      @events_consumed.add(batch_size)
      # 依次执行各过滤器插件
      filter_batch(batch)
    end
    # 刷新过滤器,将处理完的 event 传递给输出插件
    flush_filters_to_batch(batch, :final => false) if signal.flush?
    if batch.filteredSize > 0
      # 依次执行各输出插件
      output_batch(batch, output_events_map)
      filter_queue_client.close_batch(batch)
    end
    # keep break at end of loop, after the read_batch operation, some pipeline specs rely on this "final read_batch" before shutdown.
    break if (@worker_shutdown.get && !draining_queue?)
  end
  # 收到停止信号后的 pipeline 排空操作
  ...
end

1. open_queue

上面 pipeline.rb 中可以看到调用到了 open_queue 方法进行队列的初始化,该方法的方法体是 Java 代码,定义在 AbstractPipelineExt.openQueue(ThreadContext)

private AbstractWrappedQueueExt queue;
private JRubyAbstractQueueWriteClientExt inputQueueClient;
private QueueReadClientBase filterQueueClient;

@JRubyMethod(name = "open_queue")
public final IRubyObject openQueue(final ThreadContext context) throws IOException {
    try {
        queue = QueueFactoryExt.create(context, null, settings);
    } catch (final Exception ex) {
        LOGGER.error("Logstash failed to create queue.", ex);
        throw new IllegalStateException(ex);
    }
    inputQueueClient = queue.writeClient(context);
    filterQueueClient = queue.readClient();

    // 初始化性能监测指标
    ...
    return context.nil;
}

先看 QueueFactoryExt.create(ThreadContext, IRubyObject, IRubyObject)

@JRubyMethod(meta = true)
public static AbstractWrappedQueueExt create(final ThreadContext context, final IRubyObject recv,
    final IRubyObject settings) throws IOException {
    final String type = getSetting(context, settings, "queue.type").asJavaString();
    if ("persisted".equals(type)) {
        // 初始化持久化队列
        ...
    } else if ("memory".equals(type)) {
        return new JrubyWrappedSynchronousQueueExt(
            context.runtime, RubyUtil.WRAPPED_SYNCHRONOUS_QUEUE_CLASS
        ).initialize(
            context, context.runtime.newFixnum(
                getSetting(context, settings, "pipeline.batch.size")
                    .convertToInteger().getIntValue()
                    * getSetting(context, settings, "pipeline.workers")
                    .convertToInteger().getIntValue()
            )
        );
    } else {
        // 异常处理
        ...
    }
}

启动内存式队列时,最终初始化的队列对象为 JrubyWrappedSynchronousQueueExt ,而队列的核心为一个有界阻塞队列,其长度为 Logstash 配置文件中的 worker 数量乘上每个批量的大小。请记住这个阻塞队列

private BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue;

@JRubyMethod
@SuppressWarnings("unchecked")
public JrubyWrappedSynchronousQueueExt initialize(final ThreadContext context,
    IRubyObject size) {
    int typedSize = ((RubyNumeric)size).getIntValue();
    this.queue = new ArrayBlockingQueue<>(typedSize);
    return this;
}

2. init write client & read client

回到上面 open_queue 方法,初始化队列完成后,接着初始化两个 Client :inputQueueClient —— 写客户端、filterQueueClient —— 读客户端。

这两个客户端的初始化方法也在上面的 JrubyWrappedSynchronousQueueExt 类中:

@Override
protected JRubyAbstractQueueWriteClientExt getWriteClient(final ThreadContext context) {
    return JrubyMemoryWriteClientExt.create(queue);
}

@Override
protected QueueReadClientBase getReadClient() {
    // batch size and timeout are currently hard-coded to 125 and 50ms as values observed
    // to be reasonable tradeoffs between latency and throughput per PR #8707
    return JrubyMemoryReadClientExt.create(queue, 125, 50);
}

可以看出,读、写客户端的初始化均使用到该阻塞队列,所以可以得到这样一个结论:Logstash pipeline 的核心使用了一个有界的阻塞队列,输入插件作为写客户端和输出插件(包括过滤器)作为读客户端,实现了一个生产者和消费者的模型

所以接下来继续看生产者和消费者的实现细节。

3. write client push events to queue

先从 pipeline.rb 中输入插件的初始化看起:

def start_inputs
  ...
  # 注册所有输入插件
  register_plugins(@inputs)
  # 启动所有输入插件
  @inputs.each { |input| start_input(input) }
end

def start_input(plugin)
  @input_threads << Thread.new { inputworker(plugin) }
end

def inputworker(plugin)
  Util::set_thread_name("[#{pipeline_id}]<#{plugin.class.config_name}")
  begin
    # 执行输入插件的 run 方法,传入一个 writeClient
    plugin.run(wrapped_write_client(plugin.id.to_sym))
  rescue => e
    ...
  ensure
    plugin.do_close
  end
end

这个 wrapped_write_client 同样也是一个 Java 方法,定义在上面提到的 AbstractPipelineExt 类中,它最终初始化了一个 JRubyWrappedWriteClientExt 对象并返回,而该 WrappedWriteClient 初始化时,也传入了上面步骤 2 中初始化过的 QueueWriteClient。

@JRubyMethod(name = "wrapped_write_client", visibility = Visibility.PROTECTED)
public final JRubyWrappedWriteClientExt wrappedWriteClient(final ThreadContext context,
    final IRubyObject pluginId) {
    return new JRubyWrappedWriteClientExt(context.runtime, RubyUtil.WRAPPED_WRITE_CLIENT_CLASS)
        .initialize(inputQueueClient, pipelineId.asJavaString(), metric, pluginId);
}

这里使用委托模式实现了两种写客户端,WrappedWriteClient 是封装给 Ruby 实现的输入插件直接使用的,其内部还包括了对各种监控指标的计算;而 QueueWriteClient 是 Logstash 内部真正用于队列操作的客户端实现,有 push 单条数据、push 批量数据等方法。

关于输入插件如何将数据写入队列,这里以 logstash-input-beats 插件为例:

#------------------ beats.rb ------------------
def run(output_queue)
  # 上面提到的执行各输入插件的 run 方法,这里传入的正是一个 WrappedWriteClient
  # 初始化一个监听器(由 netty 实现)并启动监听
  message_listener = MessageListener.new(output_queue, self)
  @server.setMessageListener(message_listener)
  @server.listen
end

#------------------ message_listener.rb ------------------
def initialize(queue, input)
  @connections_list = ThreadSafe::Hash.new
  @queue = queue
  # 其他初始化
  ...
end

def onNewMessage(ctx, message)
  hash = message.getData
  # 将核心数据内容取出
  target_field = extract_target_field(hash)

  if target_field.nil?
    # 将接收到的内容直接初始化一个 event 对象,并 push 进队列
    event = LogStash::Event.new(hash)
    @nocodec_transformer.transform(event)
    @queue << event
  else
    # 使用自定义的 codec callback 处理接收到的内容,后续同样是初始化一个 event
    # 对象,并 push 进队列
    codec(ctx).accept(CodecCallbackListener.new(target_field,
                                                hash,
                                                message.getIdentityStream(),
                                                @codec_transformer,
                                                @queue))
  end
end

其余的插件理论上来说应该都差不多,这里还有个有意思的地方,@queue << event 中的 << 是 Ruby 中向数组中追加元素的操作符,在 Logstash 中对该操作符进行了一个“重写”,也就是 WrappedWriteClient 最终向队列生产数据的代码:

@JRubyMethod(name = {"push", "<<"}, required = 1)
public IRubyObject push(final ThreadContext context, final IRubyObject event)
    throws InterruptedException {
    final long start = System.nanoTime();
    // 计数 + 1
    incrementCounters(1L);
    // 调用 QueueWriteClient 向阻塞队列中 put 一条消息
    final IRubyObject res = writeClient.doPush(context, (JrubyEventExtLibrary.RubyEvent) event);
    // 累加耗时
    incrementTimers(start);
    return res;
}

这里每条消息 put 成功后累加的耗时,就是每个输入插件的性能指标里名为 queue_push_duration_in_millis 的耗时。正是由于队列为有界阻塞队列,如果队列已满,put 操作会阻塞等待,所以该值越大,说明 Logstash 的输出环节处理越慢,也就是性能越差。

至此,理清楚了 Logstash 输入插件的大致逻辑和 queue_push_duration_in_millis 的含义。

4. read client read events from queue

同样,读客户端的源头还要从 pipeline.rb 中看起:

def worker_loop(batch_size, batch_delay)
  filter_queue_client.set_batch_dimensions(batch_size, batch_delay)
  output_events_map = Hash.new { |h, k| h[k] = [] }
  while true
    signal = @signal_queue.poll || NO_SIGNAL

    # 读客户端拉取一批数据
    batch = filter_queue_client.read_batch.to_java
    batch_size = batch.filteredSize
    if batch_size > 0
      @events_consumed.add(batch_size)
      # 执行所有的 filter 插件的逻辑
      filter_batch(batch)
    end
    flush_filters_to_batch(batch, :final => false) if signal.flush?
    if batch.filteredSize > 0
      # 执行所有的 output 插件的逻辑
      output_batch(batch, output_events_map)
      filter_queue_client.close_batch(batch)
    end
    break if (@worker_shutdown.get && !draining_queue?)
  end
  # 收到关停信号后的逻辑
  ...
end

def filter_batch(batch)
  filter_func(batch.to_a).each do |e|
    batch.merge(e) unless e.cancelled?
  end
  filter_queue_client.add_filtered_metrics(batch.filtered_size)
  @events_filtered.add(batch.filteredSize)
rescue Exception => e
  ...
end

def output_batch(batch, output_events_map)
  batch.to_a.each do |event|
    output_func(event).each do |output|
      output_events_map[output].push(event)
    end
  end
  output_events_map.each do |output, events|
    output.multi_receive(events)
    events.clear
  end
  filter_queue_client.add_output_metrics(batch.filtered_size)
end

整体分为两步,第一步,读客户端从队列中拉取一部分数据,第二步,执行过滤器、输出插件的处理逻辑。

先看读客户端是怎么拉取数据的,filter_queue_client 同样是定义在上面提到的 AbstractPipelineExt 类中,它返回了步骤 2 中初始化的 JrubyMemoryReadClientExt 对象(这里额外插一句,初始化时代码中写死了 batchSize 和 waitForMillis 两个参数,就是说 Logstash 配置文件中的该参数不会生效,原因是开发人员经过一定的测试,选定了一个他们认为性能较好配置)

— update 2021-04-10 —

关于 Java 代码中写死的 batchSize 和 waitForMillis 两个参数,经过一些测试后,发现 logstash.yml 中配置了之后,实际是会生效的,于是回过头来继续看看是怎么回事。

这个文件的第一次提交是源自他们在 Logstash 6.0 版本后从 Ruby 代码更新成 Java 实现,Ruby代码里最早就写成了两个固定值,batchSize = 125,waitForMillis = 5,而 Issue #8707PR #8702 当时测试的主要是 waitForMillis 由 5 毫秒调整为 50 毫秒,batchSize 125 还是延续了以前的设定。

那么重点来了,这两个值在代码里初始化时写死,可以说是一个历史遗留问题,那为什么没人去改正它们,因为对 pipeline.batch.sizepipeline.batch.delay 这两个参数来说,其实是在 Ruby 代码里设置的,也就在上面的 pipeline.rb 文件的 worker_loop 方法的第一行 filter_queue_client.set_batch_dimensions(batch_size, batch_delay),看代码的时候不小心忽视了这一行,翻看了其 Java 部分的代码,该 set_batch_dimensions 方法是在 JrubyMemoryReadClientExt.java 类的父类中定义的。

所以,简单来说,读客户端初始化时,代码中直接搬老版 Ruby 代码的做法,写死了两个参数,但最终在插件运行时,还会重新将两个参数设置成 logstash.yml 文件中配置的数值。

就是这样,错误修正结束,下面回到原文。

该类的 readBatch 方法如下:

@Override
public QueueBatch readBatch() throws InterruptedException {
    MemoryReadBatch batch = MemoryReadBatch.create(
            LsQueueUtils.drain(queue, batchSize, waitForNanos));
    startMetrics(batch);
    return batch;
}

使用工具类从队列中排出数据,并转换成 Batch 对象返回。因为是阻塞队列,可能会遇到队列中没有数据的情况,所以 drain 方法还有一个等待超时时间,Logstash 有其自己的设计,这里就把实现部分摘抄出来,不做过多的解释了(感觉解释起来更乱,不如直接读源码):

public static LinkedHashSet<JrubyEventExtLibrary.RubyEvent> drain(
    final BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue, final int count, final long nanos
) throws InterruptedException {
    int left = count;
    //todo: make this an ArrayList once we remove the Ruby pipeline/execution
    final LinkedHashSet<JrubyEventExtLibrary.RubyEvent> collection =
        new LinkedHashSet<>(4 * count / 3 + 1);
    do {
        final int drained = drain(queue, collection, left, nanos);
        if (drained == 0) {
            break;
        }
        left -= drained;
    } while (left > 0);
    return collection;
}

private static int drain(final BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue,
    final Collection<JrubyEventExtLibrary.RubyEvent> collection, final int count,
    final long nanos) throws InterruptedException {
    int added = 0;
    do {
        added += queue.drainTo(collection, count - added);
        if (added < count) {
            final JrubyEventExtLibrary.RubyEvent event =
                queue.poll(nanos, TimeUnit.NANOSECONDS);
            if (event == null) {
                break;
            }
            collection.add(event);
            added++;
        }
    } while (added < count);
    return added;
}

从队列中取到数据,接着就是第二步,由配置的插件们处理数据,因为没怎么研究 Ruby 部分怎么根据配置文件初始化过滤器和输出插件的(涉及 config_ast.rb),这里就只从 Java 代码部分简单描述下输出插件的实现。

在代码中能找到 output 相关的三个类:AbstractOutputDelegatorExt、OutputDelegatorExt、OutputStrategyExt,从命名可以看出,输出插件使用了委托模式和策略模式实现。

在 OutputStrategyExt 类中有三种实现,分别是:Legacy —— 一个 Logstash v6 之后被废弃的实现、Single —— 现在版本中的单线程输出插件、Shared —— 现在版本中的多线程输出插件。

而如果搜索 Ruby 代码中 output 调用的 multi_receive 方法,会在 AbstractOutputDelegatorExt 类中找到实现,代码如下:

@JRubyMethod(name = OUTPUT_METHOD_NAME)
public IRubyObject multiReceive(final IRubyObject events) {
    final RubyArray batch = (RubyArray) events;
    final int count = batch.size();
    // 增加输入计数
    eventMetricIn.increment((long) count);
    final long start = System.nanoTime();
    // 执行 output 插件处理逻辑
    doOutput(batch);
    // 增加耗时
    eventMetricTime.increment(
        TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS)
    );
    // 增加输出计数
    eventMetricOut.increment((long) count);
    return this;
}

继续跟 doOutput 方法可以在 OutputStrategyExt 类中发现,它最终又会调用 Ruby 代码中每个输出插件实现的 multi_receive 方法。

而上面的代码中涉及的三个指标数据就是输出插件的 inoutduration_in_millis,所以每个输出插件的 duration_in_millis 可以说就是其执行业务处理的真正耗时。

至此,理清楚了 Logstash 输出插件(包括过滤器)的大致逻辑和 duration_in_millis 的含义。


最后

这是在 Logstash v6.4.0 版本的源码基础上进行的梳理学习,因为没能在本地启动源码进行更直接的调试分析,所以可能会有部分解析存在错误,希望可以在评论中指出和讨论。

另外,梳理这部分源码逻辑的初衷是为了查清楚 queue_push_duration_in_millisduration_in_millis 两个监控指标的真实含义,来确定自己对官方监控 API 中的数据的理解,所以文章中的解析没有非常深入和专业(描述过多会显得啰嗦,也没有写这种源码解析类博客的经验),更多的是为了记录下自己觉得关键的代码细节,方便以后翻看。

最后的最后,还是忍不住想要吐槽 Logstash 这种源码中混杂着 Java 和 Ruby 的情况,一会儿代码在 Java 里,一会儿代码又在 Ruby 里,真的是让人晕头转向。不过官方在 7.0 版本之后推出了 Java 版本的插件开发,不知道未来会不会把剩下的那些部分也全改成 Java 实现。

最后的最后的最后,这篇博客拖拖拉拉从12月写到了新年,真是太难了。。。