Simple Kafka

为什么要用消息系统

  1. 解耦
  2. 冗余
  3. 扩展性
  4. 灵活性和峰值处理能力
  5. 可恢复性
  6. 送达保证
  7. 顺序保证
  8. 缓冲
  9. 理解数据流
  10. 异步通信

常用消息框架对比

RabbitMQ

使用Erlang编写的开源消息队列,本身支持很多协议,AMQP,XMPP,SMTP,STOMP,也正因如此,它非常重量级,更适合于企业级的开发.同时实现了Broker架构,意味着消息发送给客户端是先在中心队列排队.对路由,负载均衡和持久化有很好的支持.

Redis

Redis是一个基于KV对的NoSQL数据库,开发维护活跃.虽然他是一个KV数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用.对于RabbitMQ和Redis的入队和出队操作,个执行100万次,每10万次记录一次执行时间.测试数据分别为128b,512b,1k和10k四种不同大小的数据.实验表明,入队时,数据小是Redis性能高于RabbitMQ,而数据超过了10K则Redis无法忍受,出队时,无论数据大小,Redis都表现出非常好的性能,RabbitMQ出队性能则远低于Redis.

ZeroMQ

号称最快的消息队列,尤其针对大吞吐量的需求场景.ZMQ能够实现RabbitMQ不擅长的高级/复杂队列,但是开发人员需要自己组合各种技术框架,技术上的复杂度是对ZMQ更高应用成功的挑战.ZMQ有一个独特的非中间件的模式,只需要简单的引用ZMQ程序库,可以使用NuGet安装,然后就可以在应用程序之间发送消息了.但是ZMQ仅支持非持久性队列,如果宕机数据则会丢失.

ActiveMQ

是Apache下的一个子项目,类似于ZMQ,它能够以代理人和点对点的技术实现队列,同时类似于RabbitMQ,它少量的代码就能高效的实现高级应用场景.

Kafka/Jafka

Kafka简介

Kafka是一个分布式发布-订阅消息系统,是一个分布式的,可划分的,冗余备份的持久性的日志服务.它主要用于处理活跃的流式数据.

主要起到两个作用:

  1. 降低系统组网复杂度
  2. 降低编程复杂度,各个子系统不再是各个协商接口,各个子系统类似插口插在插座上,Kafka承担高速数据总线的任务

主要特点

  1. 同时为发布和订阅提供高吞吐量.据了解,Kafka每秒可以生产约25万消息(50MB),每秒处理55万消息(110MB)
  2. 可进行持久化操作.将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序.通过将数据持久化到硬盘,以及replication防止数据丢失
  3. 分布式系统,易于向外扩展.所有的producer,broker,consumer都会有多个,均为分布式.无需停机即可扩展服务.
  4. 消息被处理的状态实在consumer端维护,而不是在server端维护,当失败时能自动平衡
  5. 支持online和offline场景

架构

Kafka架构

Kafka的整体架构非常简单,是显式分布式架构.producer,broker,consumer均有多个.producer和consumer实现Kafka注册的接口,数据从producer发送到broker,broker承担一个中间缓存和分发的作用.broker分发注册到系统中的consumer.broker的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存.客户端可服务端的通信是基于简单高性能,且与编程语言无关的TCP协议.

基本概念:

  1. Topic: 特质Kafka处理的消息源(feeds of messages)的不同分类
  2. Partition: Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列.其中每条消息都会分配一个有序的ID(offset)
  3. Message: 消息,是通信的基本单位,每个producer可以向一个Topic发送发布一些消息
  4. Producers: 消息和数据生产者,想Kafka的一个topic发布消息的过程称为producers
  5. Consumers: 消息和数据消费者,订阅topic并处理其发布的消息的过程称为consumers
  6. Broker: 缓存代理,Kafka中一台或多台服务器统称为Broker

消息发送的流程

消息流程

  1. Producer 根据指定的partition方法(round-roubin,hash等)将消息发送到指定topic的partition中
  2. Kafka集群接收到Producer发送过来的消息后,将其持久化到硬盘,并保留消息指定时长(可配置),而不关心消息是否被消费
  3. Consumer从Kafka集群pull数据,并控制取消息的offset

Kafka的设计

吞吐量

高吞吐是Kakfa需要实现的核心目标之一:

  1. 数据磁盘持久化: 消息不在内存中cache,直接写入磁盘,充分利用磁盘的顺序读写性能
  2. zero-copy: 减少磁盘IO操作
  3. 数据批量发送
  4. 数据压缩
  5. Topic划分为多个partition,提高并行性(parallelism)

负载均衡

  1. Producer根据用户指定的算法,将消息发送的指定的partition
  2. 存在多个partition,每个partition有自己的replication,每个replication分布在不同的Broker节点上
  3. 多个partition需要选取出lead partition,lead复制读写,并由zookeeper负责failover
  4. 通过zookeeper管理broker与consumer的动态加入与离开

拉取系统

由于Kafka Broker会持久化数据,broker没有内存压力,因此,consumer非常适合采取pull的方式消费数据,具有以下几点好处:

  1. 简化Kafka设计
  2. Consumer根据消费能力自助控制拉取速速
  3. Consumer根据自身情况自主选择消费方式,例如,批量,重复,从尾端开始消费等

可扩展性

当需要增加broker节点时新增的broker会向zookeeper注册,而producer和consumer会根据注册在zookeeper上的watcher感知到这些变化,并及时作出调整.

Kafka的应用场景

消息队列

行为跟踪

实时记录用户行为,以发布-订阅的方式存在topic中以便分析.

元信息监控

日志收集

流处理

事件源

持久性日志

Kafka的设计要点

  1. 直接利用Linux文件系统的cache来高速缓存数据

  2. 采用zero-copy提高发送性能.传统的数据发送需要发生4次上下文切换,采用sendfile系统调用之后,数据直接在内核态交换,系统上下文切换减少为2次,可以提高60%的发送性能.

  3. 数据在磁盘上存取代价为O(1).Kafka以topic来进行消息管理,每个topic有多个part,每个part对应一个逻辑log,由多个segment组成,每个segment中存储多条消息,消息ID由其逻辑位置决定,即从消息ID可以直接定位到其存储位置,避免ID到位置的额外映射.每个part在内存中对应一个index,记录每个segment中的第一条消息偏移.发布者发布到topic上的消息会被均匀的分布在每个part上(随机或根据用户指定的回调函数进行分布),broker收到发布消息后想对应part的最后一个segment上添加该消息,当某个setment上的消息数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息才能被订阅者订阅到,segment达到一定的大小后将不会再往该segment写数据,broker会创建一个新的segment.

  4. 显示分布式,所有的producer,broker,consumer都会为多个,均为分布式.producer和broker之间没有负载均衡.broker和consumer之间通过zookeeper进行负载均衡.所有的broker和consumer都会在zookeeper中进行注册,且zookeeper会保存他们的元数据信息.如果某个broker或consumer发生变化,所有其他的broker和consumer都会得到通知.

与Flume的区别

  1. Kafka是一个通用型的系统.可有有很多的生产者和消费者分享这个主题.而Flume被设计成特定用途的工作,特定的向HDFS或HBase发送,Flume为了更好的为HDFS服务而做了特定的优化,并且与Hadoop的安全体系整合在了一起.基于这样的结论,若果被多个应用程序消费的话使用Kafka,面向单一或HDFS的话使用Flume.

  2. Flume拥有许多配置的来源(source)和存储池(sink).Kafka拥有的是非常小的生产-消费体系.如果你的数据来源稳定,不需要额外的编码,那你可以使用Flume提供的source和sink,反之,如果你需要准备自己的生产者和消费者,则使用Kafka.

  3. Flume可以在拦截器里面实时处理数据(比较局限,功能单一).这个特性对于过滤数据非常有用,而Kafka需要一个外部系统帮助处理数据.

  4. 无论Flume和Kafka,两个系统都可以保证不丢失数据.Flume不会复制事件.相应的,即使我们在使用一个可以信赖的文件通道,如果Flume Agent这个节点宕机了,你会失去所有的事件访问能力直到你修复这个节点.使用Kafka的管道特性不会有这个问题.

  5. Flume和Kakfa可以配合使用.

配置集群

  1. 下载并解压稳定版本,这里是kafka_2.11-0.9.0.1,创建安装目录一般是/usr/lib/kafka,将解压后的文件复制到安装目录.

  2. 配置环境变量,编辑~/.bash_profile文件,添加上面的安装目录:

    vim ~/.bash_profile
    
    export KAFKA_HOME=/usr/lib/kafka/kafka_2.11-0.9.0.1/bin
    export PATH=$KAFKA_HOME:$PATH
    
    source ~/.bash_profile
    
  3. 创建数据目录用于存放zookeeper数据,由于系统盘容量限制,在挂载数据盘中创建一个单独的目录,/data/pro/kafka/data/zookeeper,同时创建日志目录/data/pro/kafka/log.

  4. 配置Kafka的配置文件:

    ## 分别在各个节点的安装目录`config`下编辑`server.properties`文件
    ## 这里配置3个节点: 192.168.0.1,192.168.0.2,192.168.0.3
    vim /usr/lib/kafka/kafka_2.11-0.9.0.1/config/server.properties
    
    broker.id=1                 // 每个节点配置一个单独的ID,可按顺序递增
    host.name=192.168.0.1
    log.dirs=/data/pro/kafka/log
    zookeeper.connect=192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181
    
  5. 配置zookeeper配置文件:

    ## 分别在各个节编辑配置文件,添加zookeeper集群的信息
    vim /usr/lib/kafka/kafka_2.11-0.9.0.1/config/zookeeper.properties
    
    dataDir=/data/pro/kafka/data/zookeeper      # 直接指认到zookeeper的数据路径
    server.1=192.168.0.1:2888:3888
    server.2=192.168.0.2:2888:3888
    server.3=192.168.0.3:2888:3888
    
  6. 配置生产者配置文件:

    ## 分别在三个节点
    vim /usr/lib/kafka/kafka_2.11-0.9.0.1/config/producer.properties
    
    broker.list=192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092
    producer.type=async
    
  7. 配置消费者配置文件:

    ## 分别在三个节点
    vim /usr/lib/kafka/kafka_2.11-0.9.0.1-bin/config/consumer.properties
    
    zookeeper.contact=192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181
    
  8. 启动:

    ## 分别在三个节点启动,在安装目录
    kafka-server-start.sh config/server.properties &
    
    ## 关闭
    kafka-server-stop.sh
    
    ## 创建topic, 一个副本,一个分区,topic的name: test
    kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    
    ##创建topic, 三个副本,一个分区topic的name: test1
    kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
    
    ## 查看所有主题
    kafka-topics.sh --list --zookeeper localhost:2181
    
    ## 查看主题信息
    kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
    
    ## 在本地单节点启动一个生产者
    kafka-console-producer.sh --broker-list localhost:9092 --topic test
    # 输入一些消息,然后关闭
    
    ## 在本地单节点启动一个消费者
    kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
    # 这时会受到刚才通过生产者发出的消息,或者集群部署时可以在其他节点启动消费者进行测试
    
    ## 为主题添加分块
    kafka-topics.sh --zookeeper localhost:2181/chroot --alter --topic my_topic_name --partitions 40
    
    ## 删除主题,需要在配置文件中添加: delete.topic.enable=true
    kafka-topics.sh --zookeeper localhost:2181/chroot --delete --topic my_topic_name
    
    ## 优雅的关闭服务
    controlled.shutdown.enable=true
    

一些其他配置(Twitter的生产环境配置参考):

# Replication configurations
num.replica.fetchers=4
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536
replica.lag.time.max.ms=10000

controller.socket.timeout.ms=30000
controller.message.queue.size=10

# Log configuration
num.partitions=8
message.max.bytes=1000000
auto.create.topics.enable=true
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.retention.hours=168
log.flush.interval.ms=10000
log.flush.interval.messages=20000
log.flush.scheduler.interval.ms=2000
log.roll.hours=168
log.retention.check.interval.ms=300000
log.segment.bytes=1073741824

# ZK configuration
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=2000

# Socket server configuration
num.io.threads=8
num.network.threads=8
socket.request.max.bytes=104857600
socket.receive.buffer.bytes=1048576
socket.send.buffer.bytes=1048576
queued.max.requests=16
fetch.purgatory.purge.interval.requests=100
producer.purgatory.purge.interval.requests=100

-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
-XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80

标点符: 分布式消息系统:Kafka
悠悠香草: Kafka 文件存储机制那些事
周明耀: Apache kafka 工作原理介绍
sstutu: kafka入门:简介、使用场景、设计原理、主要配置及集群搭建
Jason’s Blog: Kafka深度解析