Fluent Bit 中文文档
  • Fluent Bit 中文文档
  • 关于
    • Fluent Bit 是什么?
    • Fluent Bit 简要历史
    • Fluentd & Fluent Bit
    • 协议
  • 概念
    • 核心概念
    • 缓冲
    • 数据管道
      • 输入
      • 解析器
      • 过滤器
      • 缓冲
      • 路由
      • 输出
  • 安装
    • 升级说明
    • 平台支持
    • 依赖
    • 源码安装
      • 下载源码
      • 构建和安装
      • 以静态配置构建
    • Linux 软件包
      • Amazon Linux
      • Redhat / CentOS
      • Debian
      • Ubuntu
      • 树莓派
    • Docker
    • Kubernetes
    • Windows
  • 管理
    • 配置 Fluent Bit
      • 格式与模式
      • 配置文件
      • 变量
      • 命令
      • 上游服务负载均衡
      • 单位
    • 安全性
    • 缓冲与存储
    • 积压
    • 调度与重试
    • 内存管理
    • 监控
    • 内部状态导出/信号
  • 数据管道
    • 输入插件
      • Dummy
      • Memory Metics
      • Standard Input
      • Systemd
      • tail
    • 解析器
      • JSON
      • Regular Expression
      • LTSV
      • Logfmt
    • 过滤器
      • Grep
      • Kubernetes
      • Rewrite Tag
      • Modify
      • Nest
    • 输出插件
      • Counter
      • Elasticsearch
      • File
      • FlowCounter
      • HTTP
      • Kafka
      • NULL
      • PostgreSQL
      • Standard Output
      • TCP & TLS
  • 流处理
    • 简介
    • 总览
    • 快速开始
      • Fluent Bit + SQL
      • Check Keys and NULL values
      • Hands On! 101
  • FLUENT BIT FOR DEVELOPERS
    • C Library API
    • 手动提取记录
    • Golang 输出插件
Powered by GitBook
On this page
  • Requirements
  • 在命令行进行流处理
  • 1. 查看 Fluent Bit 版本
  • 2. 解析样例日志文件
  • 3. 选择带有指定键的记录
  • 4. 计算平均值
  • 5. 对结果分组和窗口
  • 6. 接收流处理结果作为新的数据流
  • F.A.Q
  • Where STREAM name comes from?

Was this helpful?

  1. 流处理
  2. 快速开始

Hands On! 101

PreviousCheck Keys and NULL valuesNextC Library API

Last updated 5 years ago

Was this helpful?

本文通过非常具体且简单的步骤来学习流处理器的工作原理。为了简单起见,我们使用了一个自定义 Docker 镜像,其中包含测试需要的相关组件。

Requirements

以下教程需要以下软件组件:

  • >= v1.2.0

  • (如果您的系统中已经安装了 Fluent Bit 二进制文件,则此项是非必须的)

另外,下载以下数据样本文件(130KB):

在命令行进行流处理

接下来的所有步骤,我们将从命令行运行 Fluent Bit,为简单起见,我们将使用官方的 Docker 镜像。

1. 查看 Fluent Bit 版本

$ docker run -ti fluent/fluent-bit:1.4 /fluent-bit/bin/fluent-bit --version
Fluent Bit v1.4.0

2. 解析样例日志文件

样本文件包含 JSON 记录。在此命令中,我们将添加解析器配置文件,并指示 tail 输入插件将内容解析为 json:

$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log      \
     fluent/fluent-bit:1.4                                          \
     /fluent-bit/bin/fluent-bit -R /fluent-bit/etc/parsers.conf     \
                                -i tail -p path=/sp-samples-1k.log  \
                                        -p parser=json              \
                                -o stdout -f 1

上面的命令将把已解析的内容打印到标准输出。内容将打印与每个记录关联的 Tag 以及具有两个字段的数组: 记录时间戳和记录的键值对映射

Fluent Bit v1.4.0
* Copyright (C) 2019-2020 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

[2019/05/08 13:34:16] [ info] [storage] initializing...
[2019/05/08 13:34:16] [ info] [storage] in-memory
[2019/05/08 13:34:16] [ info] [storage] normal synchronization mode, checksum disabled
[2019/05/08 13:34:16] [ info] [engine] started (pid=1)
[2019/05/08 13:34:16] [ info] [sp] stream processor started
[0] tail.0: [1557322456.315513208, {"date"=>"22/abr/2019:12:43:51 -0600", "ip"=>"73.113.230.135", "word"=>"balsamine", "country"=>"Japan", "flag"=>false, "num"=>96}]
[1] tail.0: [1557322456.315525280, {"date"=>"22/abr/2019:12:43:52 -0600", "ip"=>"242.212.128.227", "word"=>"inappendiculate", "country"=>"Chile", "flag"=>false, "num"=>15}]
[2] tail.0: [1557322456.315532364, {"date"=>"22/abr/2019:12:43:52 -0600", "ip"=>"85.61.182.212", "word"=>"elicits", "country"=>"Argentina", "flag"=>true, "num"=>73}]
[3] tail.0: [1557322456.315538969, {"date"=>"22/abr/2019:12:43:52 -0600", "ip"=>"124.192.66.23", "word"=>"Dwan", "country"=>"Germany", "flag"=>false, "num"=>67}]
[4] tail.0: [1557322456.315545150, {"date"=>"22/abr/2019:12:43:52 -0600", "ip"=>"18.135.244.142", "word"=>"chesil", "country"=>"Argentina", "flag"=>true, "num"=>19}]
[5] tail.0: [1557322456.315550927, {"date"=>"22/abr/2019:12:43:52 -0600", "ip"=>"132.113.203.169", "word"=>"fendered", "country"=>"United States", "flag"=>true, "num"=>53}]

到目前为止,还没有流处理,在步骤 #3 中,我们将开始进行一些基本查询。

3. 选择带有指定键的记录

如下命令通过 -T 选项引入一个流处理器(SP)查询,并将输出插件更改为 null,其目的是在标准输出接口中获得 流处理结果,避免混淆。

$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log           \
     fluent/fluent-bit:1.2                                               \
     /fluent-bit/bin/fluent-bit                                          \
         -R /fluent-bit/etc/parsers.conf                                 \
         -i tail                                                         \
             -p path=/sp-samples-1k.log                                  \
             -p parser=json                                              \
         -T "SELECT word, num FROM STREAM:tail.0 WHERE country='Chile';" \
         -o null -f 1

上面的查询旨在查询 country 键的值与 Chile 匹配的所有记录,并且对于每个匹配记录,仅使用 word 和 num 键字段组成一条记录并输出:

[0] [1557322913.263534, {"word"=>"Candide", "num"=>94}]
[0] [1557322913.263581, {"word"=>"delightfulness", "num"=>99}]
[0] [1557322913.263607, {"word"=>"effulges", "num"=>63}]
[0] [1557322913.263690, {"word"=>"febres", "num"=>21}]
[0] [1557322913.263706, {"word"=>"decasyllables", "num"=>76}]

4. 计算平均值

以下查询与上一步中的查询类似,但这次我们将使用 AVG() 的聚合函数来获取输入记录的平均值:

$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log           \
     fluent/fluent-bit:1.2                                               \
     /fluent-bit/bin/fluent-bit                                          \
         -R /fluent-bit/etc/parsers.conf                                 \
         -i tail                                                         \
             -p path=/sp-samples-1k.log                                  \
             -p parser=json                                              \
         -T "SELECT AVG(num) FROM STREAM:tail.0 WHERE country='Chile';"  \
         -o null -f 1

输出:

[0] [1557323573.940149, {"AVG(num)"=>61.230770}]
[0] [1557323573.941890, {"AVG(num)"=>47.842106}]
[0] [1557323573.943544, {"AVG(num)"=>40.647060}]
[0] [1557323573.945086, {"AVG(num)"=>56.812500}]
[0] [1557323573.945130, {"AVG(num)"=>99.000000}]

为什么我们得到了多个记录?答案是: 当 Fluent Bit 处理数据时,记录以数据块的方式输入,流处理器对数据块进行处理,输入插件提取了 5 个记录块,流处理器会独立地处理每个块的查询。要一次处理多个块,我们必须在窗口时间内对结果进行分组。

5. 对结果分组和窗口

结果分组旨在简化数据处理,且在定义的时间范围内使用时,我们可以完成很多事情。如下查询示例按 country 关键字将结果分组,并计算 num 值的平均值,处理窗口时间为 1 秒意味着: 处理 1 秒窗口时间内传入的所有数据块:

$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log      \
     fluent/fluent-bit:1.2                                          \
     /fluent-bit/bin/fluent-bit                                     \
         -R /fluent-bit/etc/parsers.conf                            \
         -i tail                                                    \
             -p path=/sp-samples-1k.log                             \
             -p parser=json                                         \
         -T "SELECT country, AVG(num) FROM STREAM:tail.0            \
             WINDOW TUMBLING (1 SECOND)                             \
             WHERE country='Chile'                                  \
             GROUP BY country;"                                     \
         -o null -f 1

输出:

[0] [1557324239.003211, {"country"=>"Chile", "AVG(num)"=>53.164558}]

6. 接收流处理结果作为新的数据流

现在,我们看到了一个更真实的用例。将数据流处理结果发送到标准输出对于学习很有帮助,现在我们将指示流处理器将结果作为输入,成为 Fluent Bit 数据管道的一部分,并在其上附加一个 Tag。

可以使用 CREATE STREAM 语句完成此操作,该语句还将使用 sp-results 作为结果的标签。请注意,现在输出插件为 stdout,且带有匹配所有带有 sp-results 标签记录的参数:

$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log      \
     fluent/fluent-bit:1.2                                          \
     /fluent-bit/bin/fluent-bit                                     \
         -R /fluent-bit/etc/parsers.conf                            \
         -i tail                                                    \
             -p path=/sp-samples-1k.log                             \
             -p parser=json                                         \
         -T "CREATE STREAM results WITH (tag='sp-results')          \
             AS                                                     \
               SELECT country, AVG(num) FROM STREAM:tail.0          \
               WINDOW TUMBLING (1 SECOND)                           \
               WHERE country='Chile'                                \
               GROUP BY country;"                                   \
         -o stdout -m 'sp-results' -f 1

输出:

[0] sp-results: [1557325032.000160100, {"country"=>"Chile", "AVG(num)"=>53.164558}]

F.A.Q

Where STREAM name comes from?

Fluent Bit 具有流的概念,且每个输入插件实例都有一个默认名称。您可以通过设置别名来覆盖默认名称。注意以下示例中 alias 参数和新的 stream 名称。

$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log      \
     fluent/fluent-bit:1.4                                          \
     /fluent-bit/bin/fluent-bit                                     \
         -R /fluent-bit/etc/parsers.conf                            \
         -i tail                                                    \
             -p path=/sp-samples-1k.log                             \
             -p parser=json                                         \
             -p alias=samples                                       \
         -T "CREATE STREAM results WITH (tag='sp-results')          \
             AS                                                     \
               SELECT country, AVG(num) FROM STREAM:samples         \
               WINDOW TUMBLING (1 SECOND)                           \
               WHERE country='Chile'                                \
               GROUP BY country;"                                   \
         -o stdout -m 'sp-results' -f 1
Fluent Bit
Docker Engine
https://fluentbit.io/samples/sp-samples-1k.log