为什么要用消息系统
- 解耦
- 冗余
- 扩展性
- 灵活性和峰值处理能力
- 可恢复性
- 送达保证
- 顺序保证
- 缓冲
- 理解数据流
- 异步通信
常用消息框架对比
RabbitMQ
使用Erlang编写的开源消息队列,本身支持很多协议,AMQP,XMPP,SMTP,STOMP,也正因如此,它非常重量级,更适合于企业级的开发.同时实现了Broker架构,意味着消息发送给客户端是先在中心队列排队.对路由,负载均衡和持久化有很好的支持.
Redis
Redis是一个基于KV对的NoSQL数据库,开发维护活跃.虽然他是一个KV数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用.对于RabbitMQ和Redis的入队和出队操作,个执行100万次,每10万次记录一次执行时间.测试数据分别为128b,512b,1k和10k四种不同大小的数据.实验表明,入队时,数据小是Redis性能高于RabbitMQ,而数据超过了10K则Redis无法忍受,出队时,无论数据大小,Redis都表现出非常好的性能,RabbitMQ出队性能则远低于Redis.
ZeroMQ
号称最快的消息队列,尤其针对大吞吐量的需求场景.ZMQ能够实现RabbitMQ不擅长的高级/复杂队列,但是开发人员需要自己组合各种技术框架,技术上的复杂度是对ZMQ更高应用成功的挑战.ZMQ有一个独特的非中间件的模式,只需要简单的引用ZMQ程序库,可以使用NuGet安装,然后就可以在应用程序之间发送消息了.但是ZMQ仅支持非持久性队列,如果宕机数据则会丢失.
ActiveMQ
是Apache下的一个子项目,类似于ZMQ,它能够以代理人和点对点的技术实现队列,同时类似于RabbitMQ,它少量的代码就能高效的实现高级应用场景.
Kafka/Jafka
Kafka简介
Kafka是一个分布式发布-订阅消息系统,是一个分布式的,可划分的,冗余备份的持久性的日志服务.它主要用于处理活跃的流式数据.
主要起到两个作用:
- 降低系统组网复杂度
- 降低编程复杂度,各个子系统不再是各个协商接口,各个子系统类似插口插在插座上,Kafka承担高速数据总线的任务
主要特点
- 同时为发布和订阅提供高吞吐量.据了解,Kafka每秒可以生产约25万消息(50MB),每秒处理55万消息(110MB)
- 可进行持久化操作.将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序.通过将数据持久化到硬盘,以及replication防止数据丢失
- 分布式系统,易于向外扩展.所有的producer,broker,consumer都会有多个,均为分布式.无需停机即可扩展服务.
- 消息被处理的状态实在consumer端维护,而不是在server端维护,当失败时能自动平衡
- 支持online和offline场景
架构
Kafka的整体架构非常简单,是显式分布式架构.producer,broker,consumer均有多个.producer和consumer实现Kafka注册的接口,数据从producer发送到broker,broker承担一个中间缓存和分发的作用.broker分发注册到系统中的consumer.broker的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存.客户端可服务端的通信是基于简单高性能,且与编程语言无关的TCP协议.
基本概念:
- Topic: 特质Kafka处理的消息源(feeds of messages)的不同分类
- Partition: Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列.其中每条消息都会分配一个有序的ID(offset)
- Message: 消息,是通信的基本单位,每个producer可以向一个Topic发送发布一些消息
- Producers: 消息和数据生产者,想Kafka的一个topic发布消息的过程称为producers
- Consumers: 消息和数据消费者,订阅topic并处理其发布的消息的过程称为consumers
- Broker: 缓存代理,Kafka中一台或多台服务器统称为Broker
消息发送的流程
- Producer 根据指定的partition方法(round-roubin,hash等)将消息发送到指定topic的partition中
- Kafka集群接收到Producer发送过来的消息后,将其持久化到硬盘,并保留消息指定时长(可配置),而不关心消息是否被消费
- Consumer从Kafka集群pull数据,并控制取消息的offset
Kafka的设计
吞吐量
高吞吐是Kakfa需要实现的核心目标之一:
- 数据磁盘持久化: 消息不在内存中cache,直接写入磁盘,充分利用磁盘的顺序读写性能
- zero-copy: 减少磁盘IO操作
- 数据批量发送
- 数据压缩
- Topic划分为多个partition,提高并行性(parallelism)
负载均衡
- Producer根据用户指定的算法,将消息发送的指定的partition
- 存在多个partition,每个partition有自己的replication,每个replication分布在不同的Broker节点上
- 多个partition需要选取出lead partition,lead复制读写,并由zookeeper负责failover
- 通过zookeeper管理broker与consumer的动态加入与离开
拉取系统
由于Kafka Broker会持久化数据,broker没有内存压力,因此,consumer非常适合采取pull的方式消费数据,具有以下几点好处:
- 简化Kafka设计
- Consumer根据消费能力自助控制拉取速速
- Consumer根据自身情况自主选择消费方式,例如,批量,重复,从尾端开始消费等
可扩展性
当需要增加broker节点时新增的broker会向zookeeper注册,而producer和consumer会根据注册在zookeeper上的watcher感知到这些变化,并及时作出调整.
Kafka的应用场景
消息队列
行为跟踪
实时记录用户行为,以发布-订阅的方式存在topic中以便分析.
元信息监控
日志收集
流处理
事件源
持久性日志
Kafka的设计要点
直接利用Linux文件系统的cache来高速缓存数据
采用zero-copy提高发送性能.传统的数据发送需要发生4次上下文切换,采用sendfile系统调用之后,数据直接在内核态交换,系统上下文切换减少为2次,可以提高60%的发送性能.
数据在磁盘上存取代价为O(1).Kafka以topic来进行消息管理,每个topic有多个part,每个part对应一个逻辑log,由多个segment组成,每个segment中存储多条消息,消息ID由其逻辑位置决定,即从消息ID可以直接定位到其存储位置,避免ID到位置的额外映射.每个part在内存中对应一个index,记录每个segment中的第一条消息偏移.发布者发布到topic上的消息会被均匀的分布在每个part上(随机或根据用户指定的回调函数进行分布),broker收到发布消息后想对应part的最后一个segment上添加该消息,当某个setment上的消息数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息才能被订阅者订阅到,segment达到一定的大小后将不会再往该segment写数据,broker会创建一个新的segment.
显示分布式,所有的producer,broker,consumer都会为多个,均为分布式.producer和broker之间没有负载均衡.broker和consumer之间通过zookeeper进行负载均衡.所有的broker和consumer都会在zookeeper中进行注册,且zookeeper会保存他们的元数据信息.如果某个broker或consumer发生变化,所有其他的broker和consumer都会得到通知.
与Flume的区别
Kafka是一个通用型的系统.可有有很多的生产者和消费者分享这个主题.而Flume被设计成特定用途的工作,特定的向HDFS或HBase发送,Flume为了更好的为HDFS服务而做了特定的优化,并且与Hadoop的安全体系整合在了一起.基于这样的结论,若果被多个应用程序消费的话使用Kafka,面向单一或HDFS的话使用Flume.
Flume拥有许多配置的来源(source)和存储池(sink).Kafka拥有的是非常小的生产-消费体系.如果你的数据来源稳定,不需要额外的编码,那你可以使用Flume提供的source和sink,反之,如果你需要准备自己的生产者和消费者,则使用Kafka.
Flume可以在拦截器里面实时处理数据(比较局限,功能单一).这个特性对于过滤数据非常有用,而Kafka需要一个外部系统帮助处理数据.
无论Flume和Kafka,两个系统都可以保证不丢失数据.Flume不会复制事件.相应的,即使我们在使用一个可以信赖的文件通道,如果Flume Agent这个节点宕机了,你会失去所有的事件访问能力直到你修复这个节点.使用Kafka的管道特性不会有这个问题.
Flume和Kakfa可以配合使用.
配置集群
下载并解压稳定版本,这里是
kafka_2.11-0.9.0.1
,创建安装目录一般是/usr/lib/kafka
,将解压后的文件复制到安装目录.配置环境变量,编辑
~/.bash_profile
文件,添加上面的安装目录:vim ~/.bash_profile export KAFKA_HOME=/usr/lib/kafka/kafka_2.11-0.9.0.1/bin export PATH=$KAFKA_HOME:$PATH source ~/.bash_profile
创建数据目录用于存放zookeeper数据,由于系统盘容量限制,在挂载数据盘中创建一个单独的目录,
/data/pro/kafka/data/zookeeper
,同时创建日志目录/data/pro/kafka/log
.配置Kafka的配置文件:
## 分别在各个节点的安装目录`config`下编辑`server.properties`文件 ## 这里配置3个节点: 192.168.0.1,192.168.0.2,192.168.0.3 vim /usr/lib/kafka/kafka_2.11-0.9.0.1/config/server.properties broker.id=1 // 每个节点配置一个单独的ID,可按顺序递增 host.name=192.168.0.1 log.dirs=/data/pro/kafka/log zookeeper.connect=192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181
配置zookeeper配置文件:
## 分别在各个节编辑配置文件,添加zookeeper集群的信息 vim /usr/lib/kafka/kafka_2.11-0.9.0.1/config/zookeeper.properties dataDir=/data/pro/kafka/data/zookeeper # 直接指认到zookeeper的数据路径 server.1=192.168.0.1:2888:3888 server.2=192.168.0.2:2888:3888 server.3=192.168.0.3:2888:3888
配置生产者配置文件:
## 分别在三个节点 vim /usr/lib/kafka/kafka_2.11-0.9.0.1/config/producer.properties broker.list=192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092 producer.type=async
配置消费者配置文件:
## 分别在三个节点 vim /usr/lib/kafka/kafka_2.11-0.9.0.1-bin/config/consumer.properties zookeeper.contact=192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181
启动:
## 分别在三个节点启动,在安装目录 kafka-server-start.sh config/server.properties & ## 关闭 kafka-server-stop.sh ## 创建topic, 一个副本,一个分区,topic的name: test kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test ##创建topic, 三个副本,一个分区topic的name: test1 kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic ## 查看所有主题 kafka-topics.sh --list --zookeeper localhost:2181 ## 查看主题信息 kafka-topics.sh --describe --zookeeper localhost:2181 --topic test ## 在本地单节点启动一个生产者 kafka-console-producer.sh --broker-list localhost:9092 --topic test # 输入一些消息,然后关闭 ## 在本地单节点启动一个消费者 kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning # 这时会受到刚才通过生产者发出的消息,或者集群部署时可以在其他节点启动消费者进行测试 ## 为主题添加分块 kafka-topics.sh --zookeeper localhost:2181/chroot --alter --topic my_topic_name --partitions 40 ## 删除主题,需要在配置文件中添加: delete.topic.enable=true kafka-topics.sh --zookeeper localhost:2181/chroot --delete --topic my_topic_name ## 优雅的关闭服务 controlled.shutdown.enable=true
一些其他配置(Twitter的生产环境配置参考):
# Replication configurations
num.replica.fetchers=4
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536
replica.lag.time.max.ms=10000
controller.socket.timeout.ms=30000
controller.message.queue.size=10
# Log configuration
num.partitions=8
message.max.bytes=1000000
auto.create.topics.enable=true
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.retention.hours=168
log.flush.interval.ms=10000
log.flush.interval.messages=20000
log.flush.scheduler.interval.ms=2000
log.roll.hours=168
log.retention.check.interval.ms=300000
log.segment.bytes=1073741824
# ZK configuration
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=2000
# Socket server configuration
num.io.threads=8
num.network.threads=8
socket.request.max.bytes=104857600
socket.receive.buffer.bytes=1048576
socket.send.buffer.bytes=1048576
queued.max.requests=16
fetch.purgatory.purge.interval.requests=100
producer.purgatory.purge.interval.requests=100
-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
-XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80
标点符: 分布式消息系统:Kafka
悠悠香草: Kafka 文件存储机制那些事
周明耀: Apache kafka 工作原理介绍
sstutu: kafka入门:简介、使用场景、设计原理、主要配置及集群搭建
Jason’s Blog: Kafka深度解析