Background image - Fluent Bit bird on white background
Background

Explaining the Fluent Bit processor

Written by Phil Wilkins in Fluent Biton May 7, 2024

Explaining the Fluent Bit processor

This post is an excerpt from the forthcoming book Fluent Bit with Kubernetes by Phil Wilkins and published by Manning Publications. It appears here by agreement with the publisher. You can also download an advance copy of the book for free.

Fluent Bit 2.1.2 introduced additional customization logic for input and output plugins called Processors (not to be confused with the Stream Processor). It has a similar feel to using Lua as a custom filter in many respects. But there are some notable differences.

  • The processor can be viewed as being an 'inline' piece of logic. As a result, the logic implemented can have a direct material impact on the performance of the input or output plugin. As a result, we can significantly impact the plugin's behavior and performance, such as its ability to ingest data, so it should be treated with care.

  • The processor can be configured to interact with the different telemetry types Fluent Bit can handle since version 2.0 (logs, metrics, and traces).

  • The data being manipulated will not yet have been added to the buffer, and the actions of this logic could potentially prevent that from occurring.

  • We can only configure the processor feature using YAML.

As a result, this isn’t a fully-fledged custom plugin capability, but at the same time, it can have more potential impact.

Let’s look at an example of its use.

Processor Example

So we can easily separate the impact of the processor from the plugin, we’re going to work with two very simple plugins. As a source, we’ll take the `Random` input plugin, which we’ll then manipulate in the following ways:

  • The original random value will be copied to a new event attribute.

  • The random value will be divided by 2 (truncating any fractional parts

  • We’ll capture the response from pinging Google.com

As the Random plugin generates log events (rather than traces or metrics), we’ll need to tell the processor we want the events to be presented to us as log events. For the output, we’ll use stdout and again manipulate the contents in the following ways:

  • Truncate the response from the ping invocation.

  • Add an attribute with a value to the log event.

To use the processor, we need to introduce a new declaration block for the processor (called processors), which we then use first to name the type of event we want to interact with (in our case logs, but with the ability to also interact with signals and traces). Once we identify the filter using the name attribute, we follow the same parameters as we would if defining a custom filter. For a Lua filter, we need to define the method name to invoke in the code (call attribute). In this example, we’ve used the YAML facility to inline the code into the configuration with the code attribute.

We can see this as follows:

service:
  log_level: info
  http_server: on
  flush_interval: 1

pipeline:
  inputs:
    - name: random
      tag: test-tag
      interval_sec: 5                                                     #A
      processors:                                                         #B
        logs:                                                             #C
          - name: lua                                                     #D
            call: modify                                                  #E
            code: |                                                       #F
              function modify(tag, timestamp, record)
                new_record = record
                new_record["original_val"] = record["rand_value"]         #G
                local num = tonumber(record["rand_value"])
                local newNum = string.format("%d", num/2)                 #H
                new_record["rand_value"] = newNum
                new_record["tag"] = tag
                local handler = io.popen("ping -c 1 -i 0.1 google.com")   #I
                new_record["ping"] = handler:read("*a")
                return 1, timestamp, new_record
              end
  #filters:                                                               #J
  #  - name: stdout
  #    match: "*"
  outputs:
    - name: stdout
      match: "*"
      processors:                                                         #K
        logs:
          - name: lua
            call: modify_out
            code: |
              function modify_out(tag, timestamp, record)
                  new_record = record
                  local search = record["ping"]
                 local start = string.find(search, " ms")                 #L
                new_record["ping"] = string.sub(search, 0, start + 2) 
                  new_record["output"] = "new data"
                  return 1, timestamp, new_record
                end

Code notes:

  • #A In this YAML configuration, we’ve set the random number generator input to run every 5 seconds. This means you’ll have time to evaluate the output generated.

  • #B We start the configuration for the Processor

  • #C This tells Fluent Bit we’re interested in the log-based inputs (other options being traces and metrics)

  • #D We tell the Processor we want to use the Lua plugin (we could use other plugins here as well)

  • #E As with using Lua as a custom Filter, we need to identify the name of the method to invoke.

  • #F Rather than reference a separate file, we can embed the script into the YAML configuration. But be aware that any errors from LuaJIT will report line numbers based on the start of the Lua code, not.

  • #G Copy the random

  • #H Get the random number divided by two and applied back into the data structure

  • #I Ping google.com once with a very short delay

  • #J We have this commented-out filter, so if necessary, we can look at the final input

  • #K Inside the output plugin configuration, we declare another processor.

  • #L The output from pinging Google.com is length – so we will truncate the response string and update the attribute.

We can run this scenario very easily with the command:

fluent-bit -c processor-demo.yaml

We’ll see the Fluent Bit generating output like this:

[0] test-tag: [[1707683937.869194269, {}], {"rand_value"=>"3468270981094447104", "ping"=>"PING google.com (142.250.178.14) 56(84) bytes of data. #A
64 bytes from lhr48s27-in-f14.1e100.net (142.250.178.14): icmp_seq=1 ttl=116 time=2.82 ms", "output"=>"new data", "tag"=>"test-tag", "original_val"=>6936541962188894208}] #B
[0] test-tag: [[1707683942.869093656, {}], {"rand_value"=>"718867304183793408", "ping"=>"PING google.com (142.250.178.14) 56(84) bytes of data.
64 bytes from lhr48s27-in-f14.1e100.net (142.250.178.14): icmp_seq=1 ttl=116 time=3.41 ms", "output"=>"new data", "tag"=>"test-tag", "original_val"=>1437734608367586816}]
[0] test-tag: [[1707683947.869064092, {}], {"rand_value"=>"-4611686018427387904", "ping"=>"PING google.com (142.250.178.14) 56(84) bytes of data. #C
64 bytes from lhr48s27-in-f14.1e100.net (142.250.178.14): icmp_seq=1 ttl=116 time=3.78 ms", "output"=>"new data", "tag"=>"test-tag", "original_val"=>-9223372036854775808}]

Output notes:

  • #A As a result of our processor we can see additional element including the result of pinging Google.com

  • #B The random number from the original plugin has been copied to a new attribute, and the original rand_value tag has had its value modified

  • #C The text that is normally produced as a result of using ping has been changed prior to completing the output operation

Note from this console output sample that we have two different generated numbers – the random one and the result of dividing it by two. We can also use the shortened output from the ping command run.


Fluent Bit with Kubernetes

If you enjoyed this article, be sure to download your free copy of Fluent Bit with Kubernetes by Phil Wilkins. 

To learn more about Fluent Bit, visit Fluent Bit Academy, your destination for best practices and how-to’s on advanced processing, routing, and all things Fluent Bit. Here’s a sample of what you can find there:

About Calyptia and Chronosphere

From the creators of Fluent Bit, Calyptia lets you build turnkey data pipelines to seamlessly collect, transform, and route data to your Observability and SIEM backends. Calyptia simplifies your infrastructure complexity and provides new capabilities for data collection, transformation, and multi-routing. 

Calyptia is now a part of Chronosphere, the only cloud native observability solution that helps teams quickly resolve incidents before they impact the customer experience and the bottom line. Combined with the power of Calyptia, Chronosphere’s end-to-end observability platform helps teams rein in costs, improve developer productivity, and gain a competitive advantage.

You might also like

Fluent Bit or Fluentd

Fluent Bit and Fluentd – a child or a successor?

Fluent Bit may have started as a sibling to Fluentd, but it is fair to say that it has now grown up and is Fluentd's equal. Learn which is right for your needs and how they can be used together.

Continue reading
Calyptia + Lua + AI

Transform your logs in-flight with Lua, AI, and Calyptia

Learn how Calyptia lets you create custom processing rules to transform your data using Lua and how Calyptia integrates AI to simplify data processing.

Continue reading
Fluent Bit v3

Fluent Bit v3 gives users greater control of their data and telemetry pipelines

New release allows filtering of Windows and MacOS metrics, supports SQL for parsing logs, adds support for HTTP/2, and more.

Continue reading