介绍
Flume是一个分布式可靠可用的系统,它能够将不同数据源的海量日志数据进行高效的收集,聚合,移动,最后存储到一个中心化数据存储系统中.同时支持failover和负载均衡.
Flume使用Java编写,官方推荐运行环境为JDK7(2016.3).
架构
Flume的架构主要有以下核心概念:
- Event: 一个数据单元,带有一个可选的消息头
- Flow: Event从源头到达目标点的迁移过程的抽象
- Client: 操作位于源点处的Event,将其发送到Agent
- Agent: 一个独立的Flume进程,包含组件Source,Channel,Sink
- Source: 用来消费传递到该组件的Event
- Channel: 中转Event的一个临时存储,保存有Source组件传递过来的Event
- Sink: 从Channel中读取并移除Event,将Event传递到Flow Pipeline中的下一个Agent(如果有的话)
数据流
Flume的核心是把数据从数据源收集过来,再送到目的地.为了保证输送一定成功,在送到目的地之前,会先缓存数据,待数据真正到达目的地之后,删除自己缓存的数据.
数据传输的基本单位是Event,如果是文本文件,通常是一行记录,这也是事务的基本单位.Event从Source,流向Channel,再到Sink,本身为一个byte数组,并可携带headers信息.Event代表一个数据流的最小完整单元,从外部数据源来,向外部目的地去.
其运行核心是Agent,它是一个完整的数据手机工具,含有三个核心组件,分别是source,channel,sink,通过这些组件,Event可以从一个地方流向另一个地方,如下图:
source可以接收外部发送过来的数据.不同的source可以接收不同的数据格式.比如有目录池(spooling directory)数据源,即监控指定文件夹中的新文件变化,如果目录中有新文件产生,就会立刻读取内容.
channel是一个存储池,接收source的输出,知道有sink消费掉channel中的数据.channel中的数据直到进入到下一个channel中h或者进入终端才会被删除.当sink写入失败后,可以自动重启,不会造成数据丢失,因此很可靠.
sink会消费channel中的数据,然后送给外部源或者其他source,可以写入到HDFS或HBase中
核心组件
Source
Client操作数据的来源,支持Avro,log4j,syslog,或http post(json),可以让应用程序和已有的Source直接打交道,如AvroSource,SyslogTcpSource.也可以写一个Source,以IPC或RPC的方式接入自己的应用,Avro和Thrift都可以(分别有NettyAvroRpcClient和ThriftRpcClient实现了RpcClient接口),其中Avro是默认的RPC协议.
对现有程序改动最小的使用方式是直接读取原来程序记录的日志文件,基本可以实现无缝接入,无需对现有程序进行任何改动,对于直接读取文件Source,有两种方式:
ExecSource: 以运行Linux命令的方式,只需的输出最新的数据,如
tail -F 文件名
指令,这种方式下,取的文件名必须是指定的.ExecSource可以实现对日志的实时收集,但是在Flume不运行或这指令执行出错时,将无法收集到数据,无法保证数据的完整性.SpoolSource: 检测配置的目录下新增的文件,并将文件中的数据读取出来,需注意两点: 拷贝到spool目录下的文件不可以再打开编辑,spool目录下不能包含子目录.
SpoolSource虽然无法实现实时的收集数据,但是可以实现以分钟的方式分割文件,趋近于实时.
如果应用无法实现以分钟切割日志文件的话,可以两种收集方式结合使用.在实际使用的过程中,可以结合log4j使用,将log4j的文件分割机制设置为每分钟一次,将文件拷贝到spool的监控目录.
log4j有一个TimeRolling的插件,可以把log4j分割文件到spool目录,基本实现了实时监控.Flume在传完文件之后,将会修改文件的后缀,变为.COMPLETED(该后缀可以自定义配置).
Flume Source可以支持的类型:
Source类型 | 说明 |
---|---|
Avro Source | 支持Avro协议(实际上是Avro RPC),内置支持 |
Thrift Source | 支持Thrift协议,内置支持 |
Exec Source | 基于Unix的command在标准输出上生产数据 |
JMS Source | 从JMS系统(消息、主题)中读取数据,ActiveMQ已经测试过 |
Spooling Directory Source | 监控指定目录内数据变更 |
Twitter 1% firehose Source | 通过API持续下载Twitter数据,试验性质 |
Netcat Source | 监控某个端口,将流经端口的每一个文本行数据作为Event输入 |
Sequence Generator Source | 序列生成器数据源,生产序列数据 |
Syslog Sources | 读取syslog数据,产生Event,支持UDP和TCP两种协议 |
HTTP Source | 基于HTTP POST或GET方式的数据源,支持JSON、BLOB表示形式 |
Legacy Sources | 兼容老的Flume OG中Source(0.9.x版本) |
Channel
当前有几个Channel可供选择,分别是 Memory Channel,JDBCChannel,File Channel,Psuedo Transaction Channel,比较常见的前三种channel.
- MemoryChannel 可以实现高速的吞吐,但是无法保证数据的完整性.
- MemoryRecoverChannel 在官方文档的建议下已经使用FileChannel替代.
- FileChannel 保证数据的完整性和一致性,在具体配置FileChannel时,建议FileChannel设置的目录和程序日志文件保存的目录设成不同的磁盘,以便提高效率.
FileChannel是一个持久化隧道,它持久化所有的事件,并将其存储到磁盘中.因此,即使Java虚拟机当掉,或者操作系统崩溃或重启,再或者事件没有在管道中成功的传递到下一个代理(agent),都不会造成数据丢失. Memory Channel是一个不稳定的隧道,原因是它在内存中存储所有事件.如果Java进程死掉,任何存储在内存中的事件将会丢失.另外,内存的空间受到RAM大小的限制,而File Channel这方面是他的优势,只要磁盘空间足够,它就可以将所有的事件数据存储到磁盘上.
Flume Channel支持的类型:
Channel类型 | 说明 |
---|---|
Memory Channel | Event数据存储在内存中 |
JDBC Channel | Event数据存储在持久化存储中,当前Flume Channel内置支持Derby |
File Channel | Event数据存储在磁盘文件中 |
Spillable Memory Channel | Event数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件(当前试验性的,不建议生产环境使用) |
Pseudo Transaction Channel | 测试用途 |
Custom Channel | 自定义Channel实现 |
Sink
Sink在设置存储数据时,可以向文件系统,数据库,hadoop存数据,在日志数据较少时,可以将数据存储在文件系统中,并且设定一定的文件间隔保存数据,在日志数据较多时,可以将响应的日志数据存储到Hadoop中,便于日后进行响应的数据分析.
Flume Sink支持的类型:
Channel类型 | 说明 |
---|---|
HDFS Sink | 数据写入HDFS |
Logger Sink | 数据写入日志文件 |
Avro Sink | 数据被转换成Avro Event,然后发送到配置的RPC端口上 |
Thrift Sink | 数据被转换成Thrift Event,然后发送到配置的RPC端口上 |
IRC Sink | 数据在IRC上进行回放 |
File Roll Sink | 存储数据到本地文件系统 |
Null Sink | 丢弃到所有数据 |
HBase Sink | 数据写入HBase数据库 |
Morphline Solr Sink | 数据发送到Solr搜索服务器(集群) |
ElasticSearch Sink | 数据发送到Elastic Search搜索服务器(集群) |
Kite Dataset Sink | 写数据到Kite Dataset,试验性质的 |
Custom Sink | 自定义Sink实现 |
更多Sink参考官方文档.
可靠性
核心是把数据从数据源收集过来,再送到目的地,为了保证输送一定成功,在送达目的地之前,会先缓存数据,带数据真正到达目的地之后,删除自己缓存的数据.
Flume使用事务性的方式保证传送Event整个过程的可靠性.Sink必须在Event被存入Channel后,或者,已经被传到到下一个agent中,又或者,已经被存入外部数据目的地之后,才能把Event从Channel中remove掉.
这样的数据流里的Event无论是在一个agent里还是多个agent之间流转,都能保证可靠,因为以上事务event能被存储起来.而Channel的多种实现在可恢复性上有不同的保证,也保证了event不同程度的可靠性.比如Flume支持在本地保存一份文件channel作为备份,而memory channel将event存储在内存queue里,速度快,但丢失的话无法恢复.
可恢复性
使用场景
下面展示集中Flow Pipeline,及各自适应的场景:
多个agent顺序连接
可以将多个agent顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中.这是最简单的情况,一般情况下,应该控制这种顺序连接的agent的数量,因为数据流经的路径变长了,如果不考虑failover的话,出现故障将影响整个Flow上Agent的收集工作.
多个agent的数据汇集到一个agent
这种情况应用的场景比较多,比如要收集Web网站的行为日志,Web网站为了可用性使用的负载均衡的集群模式,每个节点都产生用户行为日志,可以为每个节点都配置一个Agent来单独收集日志数据,然后多个Agent将数据最终汇聚到一个用来存储数据的存储系统,如HDFS.
多路Agent
这种模式有两种方式,一种用来复制,另一种用来分流.复制方式可以将最前端的数据复制多份,分别传递到多个channel中,每个channel中接收到的数据都是相同的.
配置格式实例入下:
# List the sources, sinks and channels for the agent
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>
# set list of channels for source (separated by space)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>
# set channel for sinks
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>
<Agent>.sources.<Source1>.selector.type = replicating
上面指定了selector的type为replication,其他的配置没有指定,使用的复制方式,Source1会将数据分别存储到channel1和channel2,这两个channel中的数据是相同的,然后数据分别被传送到sink1和sink2.
分流方式,selector可以根据header的值来确定数据传递到哪一个channel,配置格式如下:
# Mapping for multiplexing selector
<Agent>.sources.<Source1>.selector.type = multiplexing
<Agent>.sources.<Source1>.selector.header = <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
#...
<Agent>.sources.<Source1>.selector.default = <Channel2>
上面的selector的type的值为multiplexing,同时配置selector的header信息,还配置了多个selector的mapping值,即header的值,如果:header的值为value1和value2,数据从Source1路由到channel1,如果header的值为value2和value3,数据从source1路由到channel2.
实现loadbalance功能
Load balancing Sink Processor能够实现loadbalance功能,上图agent1是一个路由节点,负责将channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的agent上,实例配置:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
实现failover功能
Failover Sink Processor能够实现failover功能,具体流程类型loadbalance,但是内部处理机制与loadbalance完全不同,它维护的是一个优先级sink组件列表,只有有一个sink组件可用,Event就被传递到该组件.如果一个Sink能够处理Event,则会加入到一个Pool中,否则会被移除Pool并计算失败次数,设置一个惩罚因子,实例配置如下:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 7
a1.sinkgroups.g1.processor.priority.k3 = 6
a1.sinkgroups.g1.processor.maxpenalty = 20000
安装和使用
安装需要的基本要求:
- JDK 1.6版本以上
- 足够的内存用于配置
- 磁盘空间
文件夹读写权限
首先安装对应版本的JDK,参考JDK的安装.
然后在Flume官方下载最新版本的二进制文件
apache-flume-1.6.0-bin.tar
,创建一个/usr/lib/flume
的目录,并设置读写权限:$ sudo mkdir /usr/lib/flume $ sudo chmod -R 777 /usr/lib/flume $ cp apache-flume-1.6.0-bin.tar /usr/lib/flume/ $ cd /usr/lib/flume $ tar -xvf apache-flume-1.6.0-bin.tar
设置对应的环境变量:
vim ~/.bash_profile ### Flume Home Settings export FLUME_HOME="/usr/lib/flume/apache-flume-1.6.0-bin" export FLUME_CONF_DIR="$FLUME_HOME/conf" export FLUME_CLASSPATH="$FLUME_CONF_DIR" export PATH="$FLUME_HOME/bin:$PATH" source ~/.bash_profile
然后在Flume的配置文件路径,即
FLUME_CONF_DIR
中将flume-env.sh.template
复制一份改名为flume-env.sh
,配置需要的JAVA_HOME
.如果需要使用内存型channel,需要在上面的文件
flume-env.sh
中设置对应的JAVA_OPTS
变量,默认是100MB到200MB,最好根据机器配置设置为500MB到1000MB.cp flume-env.sh.template flume-env.sh export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_77 export JAVA_OPTS="-Xms500m -Xmx1000m -Dcom.sun.management.jmxremote"
配置
log4j.properties
日志路径:flume.log.dir=/data/pro/flume/running_log
安装完成,测试:
flume-ng --help
配置
flume-conf.properties
文件,配置具体的agent:cp flume-conf.properties.template flume-conf.properties
实例: spooldir源发送到控制台用于测试
a1.channels = ch1
a1.sources = src1
a1.sinks = sink_logger
a1.sources.src1.type = spooldir
a1.sources.src1.channels = ch1
a1.sources.src1.spoolDir = /data/pro/flume/testlog
a1.sources.src1.deletePolicy = never
a1.sources.src1.fileHeader = true
a1.channels.ch1.type = memory
a1.channels.ch1.capacity = 1000000
a1.channels.ch1.transactionCapacity = 1000000
a1.channels.ch1.keep-alive = 10
a1.sinks.sink_logger.type = logger
a1.sinks.sink_logger.channel = ch1
flume-ng agent -c /usr/lib/flume/apache-flume-1.6.0-bin/conf -f /usr/lib/flume/apache-flume-1.6.0-bin/conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n a1
实例: avro数据源
安装成功之后,在响应的 /conf
目录创建f1.conf文件,内容如下:
agent-1.channels.ch-1.type = memory
agent-1.sources.avro-source1.channels = ch-1
agent-1.sources.avro-source1.type = avro
agent-1.sources.avro-source1.bind = 0.0.0.0
agent-1.sources.avro-source1.port = 41414
agent-1.sources.avro-source1.threads = 5
agent-1.sinks.log-sink1.channel = ch-1
agent-1.sinks.log-sink1.type = logger
agent-1.channels = ch-1
agent-1.sources = avro-source1
agent-1.sinks = log-sink1
然后启动agent:
$ flume-ng agent -c /etc/flume-ng/conf -f /etc/flume-ng/conf/f1.conf -Dflume.root.logger=DEBUG,console -n agent-1
参数说明:
- -n 指定agent名称
- -c 指定配置文件目录
- -f 指定配置文件
- -Dflume.root.logger=DEBUG,console 设置日志等级
下面启动一个avro-client客户端生产数据:
$ flume-ng avro-client -c /etc/flume-ng/conf -H localhost -p 41414 -F /etc/passwd -Dflume.root.logger=DEBUG,console
spooldir源发送到kafka
a1.channels = ch1
a1.sources = src1
a1.sinks = sink_kafka
## 配置source
a1.sources.src1.type = spooldir # 源类型为文件目录
a1.sources.src1.channels = ch1 # 对应的channel名
a1.sources.src1.spoolDir = /root/log # 监控的目录
a1.sources.src1.deletePolicy= never # 是否删除已完成的文件: never或immediate(立即)
a1.sources.src1.fileHeader = true # 时候设置一个header来保存文件名的绝对路径
a1.sources.src1.interceptors = i1 # 在header中添加timestamp以在hadoop中使用
a1.sources.src1.interceptors.i1.type = timestamp
## 配置channel
a1.channels.ch1.type = file # 类型
a1.channels.ch1.checkpointDir= /root/checkpoint # 检查点目录
a1.channels.ch1.dataDirs= /root/data # 缓存目录
## 配置sink: kafka
a1.sinks.sink_kafka.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink_kafka.topic = mytopic # 对应Kafka中的主题
a1.sinks.sink_kafka.brokerList = host1:9092,host2:9092 # kafka的地址
a1.sinks.sink_kafka.requiredAcks = 1
a1.sinks.sink_kafka.batchSize = 20
a1.sinks.sink_kafka.channel = ch1
a1.sinks.sink_kafka.serializer.class=kafka.serializer.StringEncoder
a1.sinks.sink_kafka.max.message.size=1000000
a1.sinks.sink_kafka.producer.type=sync
a1.sinks.sink_kafka.custom.encoding=UTF-8
手动拷贝一个文件到 ‘/root/log’ 目录,观察日志输出和 ‘/root/log’ 下的变化.
实例: spooldir数据源并写入HDFS
agent-1.channels.ch-1.type = file
agent-1.channels.ch-1.checkpointDir= /root/checkpoint
agent-1.channels.ch-1.dataDirs= /root/data
agent-1.sources.src-1.type = spooldir
agent-1.sources.src-1.channels = ch-1
agent-1.sources.src-1.spoolDir = /root/log
agent-1.sources.src-1.deletePolicy= never
agent-1.sources.src-1.fileHeader = true
agent-1.sources.src-1.interceptors =i1
agent-1.sources.src-1.interceptors.i1.type = timestamp
agent-1.sinks.sink_hdfs.channel = ch-1
agent-1.sinks.sink_hdfs.type = hdfs
agent-1.sinks.sink_hdfs.hdfs.path = hdfs://cdh1:8020/user/root/events/%Y-%m-%d
agent-1.sinks.sink_hdfs.hdfs.filePrefix = logs
agent-1.sinks.sink_hdfs.hdfs.inUsePrefix = .
agent-1.sinks.sink_hdfs.hdfs.rollInterval = 30
agent-1.sinks.sink_hdfs.hdfs.rollSize = 0
agent-1.sinks.sink_hdfs.hdfs.rollCount = 0
agent-1.sinks.sink_hdfs.hdfs.batchSize = 1000
agent-1.sinks.sink_hdfs.hdfs.writeFormat = text
agent-1.sinks.sink_hdfs.hdfs.fileType = DataStream
#agent-1.sinks.sink_hdfs.hdfs.fileType = CompressedStream
#agent-1.sinks.sink_hdfs.hdfs.codeC = lzop
agent-1.channels = ch-1
agent-1.sources = src-1
agent-1.sinks = sink_hdfs
spooldir数据源发送到多个sink
## 定义各组件的名字
agent-1.channels = ch-1 ch-2
agent-1.sources = src-1
agent-1.sinks = sink_kafka sink_logger
## 配置source
agent-1.sources.src-1.type = spooldir # 源类型为文件目录
agent-1.sources.src-1.channels = ch-1 # 对应的channel名
agent-1.sources.src-1.spoolDir = /root/log # 监控的目录
agent-1.sources.src-1.deletePolicy= never # 是否删除已完成的文件: never或immediate(立即)
agent-1.sources.src-1.fileHeader = true # 时候设置一个header来保存文件名的绝对路径
agent-1.sources.src-1.interceptors =i1 # 在header中添加timestamp以在hadoop中使用
agent-1.sources.src-1.interceptors.i1.type = timestamp
## ch-1和sink_kafka一组发送数据到kafka
## 配置channel
agent-1.channels.ch-1.type = file # 类型
agent-1.channels.ch-1.checkpointDir= /root/checkpoint # 检查点目录
agent-1.channels.ch-1.dataDirs= /root/data # 缓存目录
## 配置sink: kafka
agent-1.sinks.sink_kafka.type = org.apache.flume.sink.kafka.KafkaSink
agent-1.sinks.sink_kafka.topic = mytopic # 对应Kafka中的主题
agent-1.sinks.sink_kafka.brokerList = localhost:9092 # kafka的地址
agent-1.sinks.sink_kafka.requiredAcks = 1
agent-1.sinks.sink_kafka.batchSize = 20
agent-1.sinks.sink_kafka.channel = ch-1
## ch-2和sink_logger输出到控制台用于调试
## 配置channel
agent-1.channels.ch-2.type = memory
agent-1.channels.ch-2.capacity = 1000000
agent-1.channels.ch-2.transactionCapacity = 1000000
agent-1.channels.ch-2.keep-alive = 10
## 配置sink: 用于测试的Logger
agent-1.sinks = sink-logger
agent-1.sinks.k1.type = logger
agent-1.sinks.k1.channel = ch-2
说明:
- 通过 interceptors 往 header 里添加 timestamp,这样做,可以在 hdfs.path 引用系统内部的时间变量或者主机的 hostname。
- 通过设置 hdfs.inUsePrefix,例如设置为 .时,hdfs 会把该文件当做隐藏文件,以避免在 mr 过程中读到这些临时文件,引起一些错误
- 如果使用 lzo 压缩,则需要手动创建 lzo 索引,可以通过修改 HdfsSink 的代码,通过代码创建索引
- FileChannel 的目录最好是和 spooldir 的数据目录处于不同磁盘。
最佳实践
- 模块命名规则: Source以src开头,channel以ch开头,sink以sink开头
- 模块之间的内部通信统一使用Avor接口
将日志采集系统分为三层:Agent,Collector,Store.
- 其中Agent层每个机器部署一个进程,负责单机的日志收集工作;
- collector层部署在中心服务器,负责接收agent发送的日志,并且将日志根据路由规则写入响应的Store层;
- Store层负责永久或临时的日志存储服务,或者将日志导流到其他服务器.
扩展MemoryChannel和FileChannel,提供DualChannel实现,以提供高存储和大缓存,即官方中提供的Spillable Memory Channel
- 监控collector HdfsSink写数据到hdfs的速递,FileChannel中拥堵的events数量,以及写hdfs状态(查看是否有 .tmp 文件生成).
美团对 flume 的改进代码见 github.