apache kafka 原理

分区副本机制

kafka 有三层结构:kafka 有多个主题,每个主题有多个分区,每个分区又有多条消息。

分区机制:主要解决了单台服务器存储容量有限和单台服务器并发数限制的问题。一个分片的不同副本不能放到同一个 broker 上。

当主题数据量非常大的时候,一个服务器存放不了,就将数据分成两个或者多个部分,存放在多台服务器上。每个服务器上的数据,叫做一个分片。

分区对于 kafka 集群的好处是:实现负载均衡,高存储能力、高伸缩性。分区对于消费者来说,可以提高并发度,提高效率。

副本:副本备份机制解决了数据存储的高可用问题。

当数据只保存一份的时候,有丢失的风险。为了更好的容错和容灾,将数据拷贝几份,保存到不同的机器上。

多个 follower 副本通常存放在和 leader 副本不同的 broker 中。通过这样的机制实现了高可用,当某台机器挂掉后,其他 follower 副本也能迅速”转正“,开始对外提供服务。

kafka 副本的作用:在 kafka 中,实现副本的目的就是冗余备份,且仅仅是冗余备份,所有的读写请求都是由 leader 副本进行处理的。follower 副本仅有一个功能,那就是从 leader 副本拉取消息,尽量让自己跟 leader 副本的内容一致。

follower 副本不对外提供服务,这样可以防止出现一些类似于数据库事务的幻读脏读的问题。为了提高一些性能而导致出现数据不一致问题,显然是不值得的。

kafka 保证数据不丢失机制

从 kafka 的大体角度上可以分为数据生产者,kafka 集群,还有就是消费者,而要保证数据的不丢失也要从这三个角度去考虑。

消息生产者

消息生产者保证数据不丢失 – 消息确认机制(ack 机制),参考值有三个:0,1,-1。

// producer 无需等待来自 broker 的确认而继续发送下一批消息。
// 这种情况下数据传输效率最高,但是数据可靠性确是最低的。       
properties.put(producerconfig.acks_config,"0");

// producer 只要收到一个分区副本成功写入的通知就认为推送消息成功了。
// 这里有一个地方需要注意,这个副本必须是 leader 副本。
// 只有 leader 副本成功写入了,producer 才会认为消息发送成功。
properties.put(producerconfig.acks_config,"1");

// ack=-1,简单来说就是,producer 只有收到分区内所有副本的成功写入的通知才认为推送消息成功了。
properties.put(producerconfig.acks_config,"-1");

消息消费者

kafka 消费消息的模型:

消息队列:0 1 2 3 4 5 6 7 8 9 10 11 12

producers wirtes

consumer a (offset=9) reads
consumer b (offset=11) reads

消费者丢失数据:由于 kafka consumer 默认是自动提交位移的(先更新位移,再消费消息),如果消费程序出现故障,没消费完毕,则丢失了消息,此时,broker 并不知道。

解决方案:

enable.auto.commit=false 关闭自动提交位移。

在消息被完整处理之后再手动提交位移。

properties.put(consumerconfig.enable_auto_commit_config, "false");

消息存储及查询机制

kafka 使用日志文件的方式来保存生产者消息,每条消息都有一个 offset 值来表示它在分区中的偏移量。

kafka 中存储的一般都是海量的消息数据,为了避免日志文件过大,一个分片并不是直接对应在一个磁盘上的日志文件,而是对应磁盘上的一个目录,这个目录的命名规则是 <topic_name>_<partition_id>。

kafka 容器数据目录:/kafka/kafka-logs-kafka1

消息存储机制

kafka 作为消息中间件,只负责消息的临时存储,并不是永久存储,所以需要删除过期的数据。如果将所有的数据都存储在一个文件中,要删除过期的数据的时候,就变得非常的麻烦。如果将其进行切分成多个文件后,如果要删除过期数据,就可以根据文件的日期属性删除即可。默认只保留 168 小时,即七天之内的数据。因此 kafka 的数据存储方案是多文件存储。

log 分段:

每个分片目录中,kafka 通过分段的方式将数据分为多个 logsegment。

一个 logsegment 对应磁盘上的一个日志文件(00000000000000000000.log)和一个索引文件 (00000000000000000000.index)。

其中日志文件是用来记录消息的,索引文件是用来保存消息的索引。

每个 logsegment 的大小可以在 server.properties 中 log.segment.bytes=107370(设置分段大小,默认是 1 gb)选项进行设置。

当 log 文件等于 1 g 时,新的会写入到下一个 segment 中。

timeindex 文件,是 kafka 的具体时间日志。

通过 offset 查找 message

存储的结构:

一个主题 –> 多个分区 –> 多个日志段(多个文件)。

第一步 – 查询 segment file:

segment file 命名规则跟 offset 有关,根据 segment file 可以知道它的起始偏移量,因为 segment file 的命名规则是上一个 segment 文件最后一条消息的 offset 值。所以只要根据 offset 二分查找文件列表,就可以快速定位到具体文件。

比如,第一个 segment file 是 00000000000000000000.index 表示最开始的文件,起始偏移量 (offset) 为 0。第二个是 00000000000000091932.index – 代表消息量起始偏移量为 91933 = 91932 + 1。那么 offset=5000 时应该定位 00000000000000000000.index。

第二步 – 通过 segment file 查找 message:

通过第一步定位到 segment file,当 offset=5000 时,依次定位到 00000000000000000000.index 的元数据物理位置和 00000000000000000000.log 的物理偏移地址,然后再通过 00000000000000000000.log 顺序查找直到 offset=5000 为止。

生产者消息分发策略

kafka 在数据生产的时候,有一个数据分发策略。默认的情况使用 defaultpartitioner.class 类。

这个类中就定义数据分发的策略:

public interface partitioner extends configurable, closeable {

    /**
     * compute the partition for the given record.
     *
     * @param topic the topic name
     * @param key the key to partition on (or null if no key)
     * @param keybytes the serialized key to partition on( or null if no key)
     * @param value the value to partition on or null
     * @param valuebytes the serialized value to partition on or null
     * @param cluster the current cluster metadata
     */
    public int partition(string topic, object key, byte[] keybytes, object value, byte[] valuebytes, cluster cluster);

    /**
     * this is called when partitioner is closed.
     */
    public void close();

}

默认实现类:org.apache.kafka.clients.producer.internals.defaultpartitioner

1) 如果是用户指定了 partition,生产就不会调用 defaultpartitioner.partition() 方法。

数据分发策略的时候,可以指定数据发往哪个partition。

当 producerrecord 的构造参数中有 partition 的时候,就可以发送到对应 partition 上。

/**
 * creates a record to be sent to a specified topic and partition
 *
 * @param topic the topic the record will be appended to
 * @param partition the partition to which the record should be sent
 * @param key the key that will be included in the record
 * @param value the record contents
 * @param headers the headers that will be included in the record
 */
public producerrecord(string topic, integer partition, k key, v value,  iterable<header> headers) {
    this(topic, partition, null, key, value, headers);
}

2) defaultpartitioner 源码

如果指定 key,是取决于 key 的 hash 值。

如果不指定 key,轮询分发。

public int partition(string topic, object key, byte[] keybytes, object value, byte[] valuebytes, cluster cluster) {
    // 获取该 topic 的分区列表
    list<partitioninfo> partitions = cluster.partitionsfortopic(topic);
    // 获得分区的个数
    int numpartitions = partitions.size();
    // 如果 key 值为 null; 如果没有指定 key,那么就是轮询
    if (keybytes == null) {
        // 维护一个 key 为 topic 的 concurrenthashmap,并通过 cas 操作的方式对 value 值执行递增 +1 操作
        int nextvalue = nextvalue(topic);
        // 获取该 topic 的可用分区列表
        list<partitioninfo> availablepartitions = cluster.availablepartitionsfortopic(topic);
        // 如果可用分区大于 0
        if (availablepartitions.size() > 0) {
            // 执行求余操作,保证消息落在可用分区上
            int part = utils.topositive(nextvalue) % availablepartitions.size();
            return availablepartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return utils.topositive(nextvalue) % numpartitions;
        }
    } else {
        // 指定了 key,key 肯定就不为 null
        // hash the keybytes to choose a partition
        return utils.topositive(utils.murmur2(keybytes)) % numpartitions;
    }
}

消费者负载均衡机制

同一个分区中的数据,只能被一个消费者组中的一个消费者所消费。例如 p0 分区中的数据不能被 consumer group a 中 c1 与 c2 同时消费。

消费组:一个消费组中可以包含多个消费者,properties.put(consumerconfig.group_id_config, “groupname”);

如果该消费组有四个消费者,主题有四个分区,那么每人一个。多个消费组可以重复消费消息。

  • 如果有 3 个 partition,p0/p1/p2,同一个消费组有 3 个消费者,c0/c1/c2,则为一一对应关系。
  • 如果有 3 个 partition,p0/p1/p2,同一个消费组有 2 个消费者,c0/c1,则其中一个消费者消费 2 个分区的数据,另一个消费者消费一个分区的数据。
  • 如果有 2 个 partition, p0/p1,同一个消费组有 3 个消费者,c0/c1/c3,则其中有一个消费者空闲,另外 2 个消费者消费分别各自消费一个分区的数据。

kakfa 配置文件说明

server.properties

1、broker.id = 0

kafka 集群是由多个节点组成的,每个节点称为一个 broker,中文翻译是代理。每个 broker 都有一个不同的 brokerid,由 broker.id 指定,是一个不小于 0 的整数,各 brokerid 必须不同,但不必连续。如果想扩展 kafka 集群,只需引入新节点,分配一个不同的 broker.id 即可。

启动 kafka 集群时,每一个 broker 都会实例化并启动一个 kafkacontroller,并将该 broker 的 brokerid 注册到 zookeeper 的相应节点中。集群各 broker 会根据选举机制选出其中一个 broker 作为 leader,即 leader kafkacontroller。leader kafkacontroller 负责主题的创建与删除、分区和副本的管理等。当 leader kafkacontroller 宕机后,其他 broker 会再次选举出新的 leader kafkacontroller。

2、log.dir = /export/data/kafka/

broker 持久化消息到哪里,数据目录。

3、log.retention.hours = 168

log 文件最小存活时间,默认是 168h,即 7 天。相同作用的还有 log.retention.minutes、log.retention.ms。

数据存储的最大时间超过这个时间会根据 log.cleanup.policy 设置的策略处理数据,也就是消费端能够多久去消费数据。

log.retention.bytes 和 log.retention.hours 任意一个达到要求,都会执行删除,会被 topic 创建时的指定参数覆盖。

4、log.retention.check.interval.ms

多长时间检查一次是否有 log 文件要删除。默认是 300000ms,即 5 分钟。

5、log.retention.bytes

限制单个分区的 log 文件的最大值,超过这个值,将删除旧的 log,以满足 log 文件不超过这个值。默认是 -1,即不限制。

6、log.roll.hours

多少时间会生成一个新的 log segment,默认是 168h,即 7 天。相同作用的还有 log.roll.ms、segment.ms。

7、log.segment.bytes

log segment 多大之后会生成一个新的 log segment,默认是 1073741824,即 1g。

8、log.flush.interval.messages

指定 broker 每收到几个消息就把消息从内存刷到硬盘(刷盘)。默认是 9223372036854775807。

kafka 官方不建议使用这个配置,建议使用副本机制和操作系统的后台刷新功能,因为这更高效。这个配置可以根据不同的 topic 设置不同的值,即在创建 topic 的时候设置值。

在 linux 操作系统中,把数据写入到文件系统之后,数据其实在操作系统的 page cache 里面,并没有刷到磁盘上去。如果此时操作系统挂了,其实数据就丢了。

1、kafka 是多副本的,当配置了同步复制之后。多个副本的数据都在 page cache 里面,出现多个副本同时挂掉的概率比 1 个副本挂掉,概率就小很多了。

2、操作系统有后台线程,定期刷盘。如果应用程序每写入 1 次数据,都调用一次 fsync,那性能损耗就很大,所以一般都会在性能和可靠性之间进行权衡。因为对应一个应用来说,虽然应用挂了,只要操作系统不挂,数据就不会丢。

9、log.flush.interval.ms

指定 broker 每隔多少毫秒就把消息从内存刷到硬盘。默认值同 log.flush.interval.messages 一样, 9223372036854775807。

同 log.flush.interval.messages 一样,kafka 官方不建议使用这个配置。

10、delete.topic.enable=true

是否允许从物理上删除 topic。

kafka 监控与运维

kafka-eagle 概述

在生产环境下,在 kafka 集群中,消息数据变化是被关注的问题,当业务前提不复杂时,可以使用 kafka 命令提供带有 zookeeper 客户端工具的工具,可以轻松完成工作。随着业务的复杂性,增加 group 和 topic,那么使用 kafka 提供命令工具,已经感到无能为力,那么 kafka 监控系统目前尤为重要,需要观察消费者应用的细节。

为了简化开发者和服务工程师维护 kafka 集群的工作有一个监控管理工具,叫做 kafka-eagle。这个管理工具可以很容易地发现分布在集群中的哪些 topic 分布不均匀,或者是分区在整个集群分布不均匀的的情况。它支持管理多个集群、选择副本、副本重新分配以及创建 topic。同时,这个管理工具也是一个非常好的可以快速浏览这个集群的工具。

搭建安装 kafka-eagle

kafka-eagle 在 docker 中没有镜像。

环境要求:需要安装 jdk,启动 zk 以及 kafka 的服务。

# 启动 zookeeper
zkserver.sh start

# 启动 kafka
cd /export/servers/kafka/bin 
nohup ./kafka-server-start.sh /export/servers/kafka/config/server.properties 2>&1 &

windows host 文件:

192.168.186.20 kafka1
192.168.186.20 kafka2
192.168.186.20 kafka3
192.168.186.11 node1
192.168.186.12 node2
192.168.186.13 node3

搭建步骤:

1) 下载 kafka-eagle 的源码包:

kafka-eagle 官网 – http://download.kafka-eagle.org/

可以从官网上面直接下载最新的安装包即可 kafka-eagle-bin-1.3.2.tar.gz 这个版本即可。

代码托管地址 – https://github.com/smartloli/kafka-eagle/releases

2) 上传安装包并解压:

这里选择将 kafak-eagle 安装在 node3 服务器。

如果要解压的是 zip 格式,需要先安装命令支持。

yum install unzip

unzip xxxx.zip

# 将安装包上传至 node01 服务器的 /export/softwares 路径下, 然后解压
cd /export/software/
unzip kafka-eagle.zip
cd kafka-eagle/kafka-eagle-web/target/
tar -zxf kafka-eagle-web-2.0.1-bin.tar.gz -c /export/servers

3) 准备数据库:

kafka-eagle 需要使用一个数据库来保存一些元数据信息,这里直接使用 mysql 数据库来保存即可,在本地执行以下命令创建一个 mysql 数据库即可。

可以使用 sqlite 或者 mysql 数据库。

-- 进入 mysql 客户端
create database if not exists eagle character set utf8mb4;

4) 修改 kafka-eagle 配置文件:

cd /export/servers/kafka-eagle-web-1.3.2/conf
vi system-config.properties

# 修改内容如下 

######################################
# multi zookeeper & kafka cluster list
######################################
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=node1:2181,node2:2181,node3:2181


######################################
# kafka offset storage
######################################
cluster1.kafka.eagle.offset.storage=kafka
# cluster2.kafka.eagle.offset.storage=zk

######################################
# kafka sqlite jdbc driver address
######################################
# kafka.eagle.driver=org.sqlite.jdbc
# kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
# kafka.eagle.username=root
# kafka.eagle.password=www.kafka-eagle.org

######################################
# kafka mysql jdbc driver address
######################################
kafka.eagle.driver=com.mysql.jdbc.driver
kafka.eagle.url=jdbc:mysql://192.168.1.116:3306/eagle?useunicode=true&characterencoding=utf-8&zerodatetimebehavior=converttonull
kafka.eagle.username=root
kafka.eagle.password=password

默认情况下 mysql 只允许本机连接到 mysql 实例中,所以如果要远程访问,必须开放权限。

# 修改权限
update user set host = '%' where user ='root';
# 刷新配置
flush privileges;

5) 配置环境变量:

kafka-eagle 必须配置环境变量,node03 服务器执行以下命令来进行配置环境变量。

vi /etc/profile

# 内容如下:
export ke_home=/export/servers/kafka-eagle-web-1.3.2
export path=:$ke_home/bin:$path

# 让修改立即生效,执行  
source /etc/profile

6) 启动 kakfa-eagle:

cd /export/servers/kafka-eagle-web-1.3.2/bin
chmod u+x ke.sh
./ke.sh start