目录

一、Flume:大数据世界的 "数据管道系统"

1.1 Flume 的核心定位与应用场景

1.2 核心架构:三组件驱动的数据流转

二、Flume 安装与环境配置:从 0 到 1 的实战部署

2.1 环境准备与安装步骤

2.2 关键配置文件修改

三、Flume 数据模型与核心组件解析

3.1 基础数据流转模型

3.2 核心组件详解

四、实战案例:从入门到企业级应用

4.1 基础案例:Avro+Memory+Logger(测试验证,没有实战意义)

4.2 生产级案例:Taildir+File+HDFS(实时日志采集)

五、Flume 拦截器:数据清洗的瑞士军刀

5.1 内置拦截器应用

 5.2 自定义拦截器开发(企业级必备技能)

如何自定义时间戳拦截器


一、Flume:大数据世界的 "数据管道系统"

在大数据领域,日志数据就像城市地下的自来水,源源不断地产生却需要有序管理。Apache Flume 作为 Apache 顶级项目,正是这样一套专业的 "数据管道系统",它能将分散在各处的日志数据高效收集、聚合并传输到中央数据仓库。

参考网址:Flume 1.9用户手册中文版 — 可能是目前翻译最完整的版本了

1.1 Flume 的核心定位与应用场景

  • 专业定位:分布式、高可靠、高可用的日志收集框架
  • 数据类型覆盖
    • 文本日志(服务器访问日志、应用运行日志)
    • 业务系统数据(数据库变更记录)
    • 第三方数据(API 接口数据、爬虫数据)
  • 典型场景

1.2 核心架构:三组件驱动的数据流转

Flume 的架构设计遵循 "生产者 - 消费者" 模式,由三个核心组件构成:Source、Channel、Sink。

Agent :  是Flume中的基本单位,一个Flume配置文件,可以有多个Agent.
每一个Agent中有三个组件组成,缺一不可:
1、Source  来源,数据过来的地方
2、channel 通道   传递数据用的通道,一般比较的长,一个Channel中可以存储多个数据
3、Sink    数据下沉的地方
4、event   一个event就是一个数据单元(一条数据) 

关键概念:Event(数据单元),每个 Event 包含 headers(元数据)和 body(数据内容),就像快递包裹上的面单信息和包裹内容。 

二、Flume 安装与环境配置:从 0 到 1 的实战部署

2.1 环境准备与安装步骤

生产环境推荐配置

  • JDK 1.8+(必须先安装并配置 JAVA_HOME)
  • 安装路径规范:/opt/installs/(企业级惯例)

执行安装命令

# 解压安装包
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/installs/
# 重命名目录
mv /opt/installs/apache-flume-1.9.0-bin /opt/installs/flume
# 配置环境变量
echo "export FLUME_HOME=/opt/installs/flume" >> /etc/profile
echo "export PATH=$PATH:$FLUME_HOME/bin" >> /etc/profile
# 使配置生效
source /etc/profile

 验证安装

flume-ng version

正常输出应包含:

Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
...

2.2 关键配置文件修改

核心配置文件:flume-env.sh

cd $FLUME_HOME/conf
# 复制模板文件
cp flume-env.sh.template flume-env.sh
# 编辑配置文件
vi flume-env.sh

 修改内容:

# 指向实际JDK路径
export JAVA_HOME=/opt/installs/jdk
# 分配合理内存(生产环境建议2GB+)
export JAVA_OPTS="-Xms2048m -Xmx2048m -Dcom.sun.management.jmxremote"

三、Flume 数据模型与核心组件解析

3.1 基础数据流转模型

单 Agent 模型(适合测试场景)

Source → Channel → Sink

多 Agent 级联模型(生产环境常用) 

Agent1(Source→Channel) → Agent2(Source→Channel) → Sink

3.2 核心组件详解

常用 Source 类型

类型 适用场景 核心配置参数
Avro Source 分布式数据采集 bind (绑定 IP)、port (端口)
Exec Source 执行命令获取数据 command (执行命令)
Spooling Directory 监控文件夹新文件 directory (监控目录)
Taildir Source 监控文件追加内容 filegroups (文件组)

常用 Channel 类型

类型 特点 适用场景
Memory Channel 速度快但不持久 测试环境或短暂缓冲
File Channel 持久化存储 生产环境可靠性要求高
Kafka Channel 与 Kafka 集成 数据需要多系统消费

常用 Sink 类型

类型 适用场景 核心配置参数
Logger Sink 日志输出(测试用) 无特殊参数
HDFS Sink 数据存入 Hadoop 分布式文件系统 hdfs.path (存储路径)
Kafka Sink 数据发送到 Kafka bootstrap.servers (集群地址)

四、实战案例:从入门到企业级应用

在自己flume安装路径下创建自己的配置文件文件夹myconf 

mkdir /opt/installs/flume/myconf

4.1 基础案例:Avro+Memory+Logger(测试验证,没有实战意义)

配置文件(avro_test.conf)

# 定义Agent名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# 配置Avro Source
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.138.139.128#自己的IP
a1.sources.r1.port = 44444

# 配置Memory Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 配置Logger Sink
a1.sinks.k1.type = logger

# 绑定组件关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动命令

flume-ng agent -n a1 -c $FLUME_HOME/conf -f $FLUME_HOME/myconf/avro_test.conf -Dflume.root.logger=INFO,console

# -c  后面跟上 配置文件的路径
# -f  跟上自己编写的conf文件
# -n  agent的名字
# -Dflume.root.logger=INFO,console   INFO 日志输出级别  Debug,INFO,warn,error 等

测试

# 使用telnet发送测试数据
telnet localhost 44444
> {"message":"hello flume"}

4.2 生产级案例:Taildir+File+HDFS(实时日志采集)

tailDir 是用来监控多个文件夹下的多个文件的,只要文件内容发生变化,就会再次的进行数据的抽取

配置文件(taildir_hdfs.conf)

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.filegroups = f1
# . 代表的意思是一个任意字符   * 代表前面的字符出现0到多次
a1.sources.r1.filegroups.f1 = /home/scripts/datas/.*txt.*


a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000

a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume3/logs

tailDir 可以采集一个不断变化的文件,只要采集的文件内容不断变化,我们就可以将其不断的上传至hdfs。

flume-ng agent -c ./ -f taildir_memory_hdfs.conf -n a1 -Dflume.root.logger=INFO,console
由于我们刚才的一些数据非常的大,根据如下参数可以疯狂创建文件:
hdfs.rollInterval 30 
#当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒
hdfs.rollSize  1024 
#当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节
hdfs.rollCount 10 
#当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)

五、Flume 拦截器:数据清洗的瑞士军刀

Flume支持在运行时对Event进行修改或丢弃,可以通过拦截器来实现。

Event 就是数据,拦截住数据之后,可以修改它,也可以删除它。

5.1 内置拦截器应用

时间戳拦截器(Timestamp Interceptor)

这个拦截器会向每个Event的header中添加一个时间戳属性进去,key默认是“timestamp ”(也可以通过headerName参数来自定义修改),value就是当前的毫秒值(其实就是用System.currentTimeMillis()方法得到的)。 如果Event已经存在同名的属性,可以选择是否保留原始的值。


a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/data/c.txt

a1.sources.r1.channels = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000

a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
# round 用于控制含有时间转义符的文件夹的生成规则
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# i1 是拦截器的名字,自己定义的
a1.sources.r1.interceptors = i1    
a1.sources.r1.interceptors.i1.type = timestamp

 5.2 自定义拦截器开发(企业级必备技能)

需求:

处理数据样例:
log='{
  "host":"www.baidu.com",
  "user_id":"13755569427",
  "items":[
    {
      "item_type":"eat",
      "active_time":156234
    },
    {
      "item_type":"car",
      "active_time":156233
    }
  ]
}'

结果样例:
[{"active_time":156234,"user_id":"13755569427","item_type":"eat","host":"www.baidu.com"},
  {"active_time":156233,"user_id":"13755569427","item_type":"car","host":"www.baidu.com"}]

开发步骤

  1. 创建 Maven 项目,添加依赖:
<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.78</version>
    </dependency>
</dependencies>

2.实现拦截器接口

public class JsonParserInterceptor implements Interceptor {
    
    private static final Logger LOG = LoggerFactory.getLogger(JsonParserInterceptor.class);
    
    @Override
    public void initialize() {
        // 初始化操作
    }
    
    @Override
    public Event intercept(Event event) {
        try {
            // 获取数据内容
            byte[] body = event.getBody();
            String json = new String(body, StandardCharsets.UTF_8);
            
            // 解析JSON(示例:提取user_id)
            JSONObject jsonObject = JSONObject.parseObject(json);
            String userId = jsonObject.getString("user_id");
            
            // 添加到headers
            Map<String, String> headers = event.getHeaders();
            headers.put("user_id", userId);
            
            return event;
        } catch (Exception e) {
            LOG.error("JSON解析失败", e);
            return null; // 解析失败则丢弃该Event
        }
    }
    
    @Override
    public List<Event> intercept(List<Event> events) {
        List<Event> result = new ArrayList<>();
        for (Event event : events) {
            Event intercepted = intercept(event);
            if (intercepted != null) {
                result.add(intercepted);
            }
        }
        return result;
    }
    
    @Override
    public void close() {
        // 资源释放
    }
    
    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new JsonParserInterceptor();
        }
        
        @Override
        public void configure(Context context) {
            // 配置参数获取
        }
    }
}
package com.bigdata;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

import java.util.ArrayList;

public class Main {
    public static void main(String[] args) {
        // 学习fastjson的用法
        String log = "{\n" +
                "\"host\":\"www.baidu.com\",\n" +
                "\"user_id\":\"13755569427\",\n" +
                "\"items\":[\n" +
                "    {\n" +
                "        \"item_type\":\"eat\",\n" +
                "        \"active_time\":156234\n" +
                "    },\n" +
                "    {\n" +
                "        \"item_type\":\"car\",\n" +
                "        \"active_time\":156233\n" +
                "    }\n" +
                " ]\n" +
                "}";
        // json {}  [{},{}]
        // 将该json 字符串变为  一个个json 对象
        // 将一个json字符串转换为json 对象
        JSONObject jsonObject = JSON.parseObject(log);
        // 从json 对象中获取值
        String host = jsonObject.getString("host");
        String userId = jsonObject.getString("user_id");
        System.out.println(host);
        System.out.println(userId);
        // 从json 对象中获取json数组
        JSONArray items = jsonObject.getJSONArray("items");

        // 定义一个集合,将来把封装好的json 对象,放入进去
        ArrayList<JSONObject> list = new ArrayList<>();
        // 循环这个Array数组,得到更加详细的数据
        for (Object item : items) {


            JSONObject obj2 = JSON.parseObject(item.toString());
            String active_time = obj2.getString("active_time");
            String item_type = obj2.getString("item_type");


            // 将解析出来的4个属性,全部放入一个json 对象中
            JSONObject newObj = new JSONObject();

            newObj.put("host", host);
            newObj.put("user_id", userId);
            newObj.put("active_time", active_time);
            newObj.put("item_type", item_type);
            list.add(newObj);
        }
        // 如何将一个集合,变为json 字符串
        String jsonString = JSON.toJSONString(list);
        System.out.println(jsonString);

    }
}

打包,上传至 flume 下的lib 下。

cp target/flume-interceptor-1.0-SNAPSHOT.jar $FLUME_HOME/lib/

编写一个flume脚本文件 testInter.conf

a1.sources = s1
a1.channels = c1
a1.sinks = r1

a1.sources.s1.type = TAILDIR

#以空格分隔的文件组列表。每个文件组表示要跟踪的一组文件
a1.sources.s1.filegroups = f1
#文件组的绝对路径
a1.sources.s1.filegroups.f1=/home/b.log

#使用自定义拦截器
a1.sources.s1.interceptors = i1
a1.sources.s1.interceptors.i1.type = com.bigdata.DemoInterceptor$BuilderEvent

a1.channels.c1.type = file
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


a1.sinks.r1.type = hdfs
a1.sinks.r1.hdfs.path = /flume/202409
a1.sinks.r1.hdfs.fileSuffix= .log

# 将上传的数据格式使用text类型,便于查看
a1.sinks.r1.hdfs.fileType=DataStream
a1.sinks.r1.hdfs.writeFormat=Text


a1.sources.s1.channels = c1
a1.sinks.r1.channel = c1

如何自定义时间戳拦截器

只需要在方法中添加如下代码

        HashMap<String, String> hashMap = new HashMap<>();
        hashMap.put("timestamp",System.currentTimeMillis()+"");
        event.setHeaders(hashMap);

六、总结与进阶方向

Flume 作为大数据生态中的 "数据搬运工",其核心价值在于:

  1. 提供标准化的数据采集接口
  2. 实现数据的可靠传输与缓冲
  3. 无缝对接多种数据源与数据目标

对于进阶学习者,建议从以下方向深入:

  • 源码级理解 Flume 的事务机制
  • 自定义组件开发(Source/Channel/Sink)
  • 与 Kafka、Spark 等组件的深度集成
  • 大规模集群下的性能调优与故障排查

通过本文的系统讲解,相信无论是零基础的萌新还是有一定经验的开发者,都能对 Flume 有全面的认识。在实际工作中,建议结合具体业务场景进行灵活配置与优化,让这一强大的日志收集工具发挥最大价值。

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐