Hands On! 101
本文通过非常具体且简单的步骤来学习流处理器的工作原理。为了简单起见,我们使用了一个自定义 Docker 镜像,其中包含测试需要的相关组件。
Requirements
以下教程需要以下软件组件:
Fluent Bit >= v1.2.0
Docker Engine (如果您的系统中已经安装了 Fluent Bit 二进制文件,则此项是非必须的)
另外,下载以下数据样本文件(130KB):
在命令行进行流处理
接下来的所有步骤,我们将从命令行运行 Fluent Bit,为简单起见,我们将使用官方的 Docker 镜像。
1. 查看 Fluent Bit 版本
$ docker run -ti fluent/fluent-bit:1.4 /fluent-bit/bin/fluent-bit --version
Fluent Bit v1.4.02. 解析样例日志文件
样本文件包含 JSON 记录。在此命令中,我们将添加解析器配置文件,并指示 tail 输入插件将内容解析为 json:
上面的命令将把已解析的内容打印到标准输出。内容将打印与每个记录关联的 Tag 以及具有两个字段的数组: 记录时间戳和记录的键值对映射
到目前为止,还没有流处理,在步骤 #3 中,我们将开始进行一些基本查询。
3. 选择带有指定键的记录
如下命令通过 -T 选项引入一个流处理器(SP)查询,并将输出插件更改为 null,其目的是在标准输出接口中获得 流处理结果,避免混淆。
上面的查询旨在查询 country 键的值与 Chile 匹配的所有记录,并且对于每个匹配记录,仅使用 word 和 num 键字段组成一条记录并输出:
4. 计算平均值
以下查询与上一步中的查询类似,但这次我们将使用 AVG() 的聚合函数来获取输入记录的平均值:
输出:
为什么我们得到了多个记录?答案是: 当 Fluent Bit 处理数据时,记录以数据块的方式输入,流处理器对数据块进行处理,输入插件提取了 5 个记录块,流处理器会独立地处理每个块的查询。要一次处理多个块,我们必须在窗口时间内对结果进行分组。
5. 对结果分组和窗口
结果分组旨在简化数据处理,且在定义的时间范围内使用时,我们可以完成很多事情。如下查询示例按 country 关键字将结果分组,并计算 num 值的平均值,处理窗口时间为 1 秒意味着: 处理 1 秒窗口时间内传入的所有数据块:
输出:
6. 接收流处理结果作为新的数据流
现在,我们看到了一个更真实的用例。将数据流处理结果发送到标准输出对于学习很有帮助,现在我们将指示流处理器将结果作为输入,成为 Fluent Bit 数据管道的一部分,并在其上附加一个 Tag。
可以使用 CREATE STREAM 语句完成此操作,该语句还将使用 sp-results 作为结果的标签。请注意,现在输出插件为 stdout,且带有匹配所有带有 sp-results 标签记录的参数:
输出:
F.A.Q
Where STREAM name comes from?
Fluent Bit 具有流的概念,且每个输入插件实例都有一个默认名称。您可以通过设置别名来覆盖默认名称。注意以下示例中 alias 参数和新的 stream 名称。
Last updated
Was this helpful?