Kafka

Kafka

Kafka 概述

Kafka 定义

Kafka:一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要用于大数据实时处理领域。也是一个开源分布式事件流平台(Event Streaming Platform),用于高性能数据管道、流分析、数据集成和关键任务应用。

发布/订阅: 消息的发布者不会将消息直接发送给特定的订阅者, 而是将发布的消息分为不同的类别, 订阅者只接收感兴趣的消息。

消息队列的应用场景

缓冲/消峰

有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。

image-20220719103226806

解耦

允许你独立的扩展或修改两边的处理过程, 只要确保它们遵守同样的接口约束。

image-20220719103424612

异步通信

允许用户把一个消息放入队列, 但并不立即处理它, 然后在需要的时候再去处理它们。

image-20220719103448821

消息队列的两种模式

image-20220719103512602

点对点模式

消费者主动拉取数据,消息收到后清除消息

发布/订阅模式

可以有多个topic主题(浏览、点赞、收藏、评论等)

消费者消费数据之后,不删除数据

每个消费者相互独立,都可以消费到数据

Kafka 架构

基础架构

  1. 为方便扩展,并提高吞吐量,一个topic分为多个partition
  2. 配合分区的设计,提出消费者组的概念,组内每个消费者并行消费
  3. 为提高可用性,为每个partition增加若干副本,类似NameNode HA
  4. ZK中记录谁是leader, Kafka2.8.0以后也可以配置不采用ZK
Kafka architecture
  1. Producer: 消息生产者,就是向 Kafka broker 发消息的客户端。
  2. Consumer: 消息消费者,向 Kafka broker 取消息的客户端。
  3. Consumer Group(CG): 消费者组,由多个 consumer 组成。 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。 所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者
  4. Broker: 一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。
  5. Topic: 可以理解为一个队列, 生产者和消费者面向的都是一个 topic。
  6. Partition: 为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上, 一个 topic 可以分为多个 partition(如按时间(day,hour)),每个 partition 是一个有序的队列。
  7. Replica: 副本。 一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个 Follower。
  8. Leader: 每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 Leader。
  9. Follower: 每个分区多个副本中的“从”,实时从 Leader 中同步数据,保持和Leader 数据的同步。 Leader 发生故障时,某个 Follower 会成为新的 Leader。

Kafka 快速入门

集群规划

hadoop102 hadoop103 hadoop104
zk zk zk
kafka kafka kafka

注意: 停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息, Zookeeper 集群一旦先停止,Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程了。

topic命令行操作

bin/kafka-topics.sh + :

参数 描述
–bootstrap-server <String: server toconnect to> 连接的 Kafka Broker 主机名称和端口号。
–topic <String: topic> 操作的 topic 名称。
–create 创建主题。
–delete 删除主题。
–alter 修改主题。
–list 查看所有主题。
–describe 查看主题详细描述。
–partitions <Integer: # of partitions> 设置分区数。
–replication-factor<Integer: replication factor> 设置分区副本。
–config <String: name=value> 更新系统默认的配置。

创建 first topic :

1
[test@hadoop102 kafka]$  bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first

producer 命令行操作

bin/kafka-console-producer.sh +

参数 描述
–bootstrap-server <String: server toconnect to> 连接的 Kafka Broker 主机名称和端口号。
–topic <String: topic> 操作的 topic 名称。

发送消息:

1
2
3
[test@hadoop102 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
>hello world
>flink spark

consumer 命令行操作

bin/kafka-console-consumer.sh +

参数 描述
–bootstrap-server <String: server toconnect to> 连接的 Kafka Broker 主机名称和端口号。
–topic <String: topic> 操作的 topic 名称。
–from-beginning 从头开始消费。
–group <String: consumer group id> 指定消费者组名称。

消费 first 主题中的数据:

1
[test@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

把主题中所有的数据都读取出来(包括历史数据):

1
[test@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first

发送原理

在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。 main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。

principle of sending

生产者重要参数列表

参数名称 描述
bootstrap.servers 生 产 者 连 接 集 群 所 需 的 broker 地 址 清 单 。 例 如hadoop102:9092,hadoop103:9092,hadoop104:9092,可以 设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker 里查找到其他 broker 信息。
key.serializer 和 value.serializer 指定发送消息的 key 和 value 的序列化类型。一定要写全类名。
buffer.memory RecordAccumulator 缓冲区总大小, 默认 32m
batch.size 缓冲区一批数据最大值, 默认 16k。适当增加该值,可 以提高吞吐量,但是如果该值设置太大,会导致数据 传输延迟增加。
linger.ms 如果数据迟迟未达到 batch.size, sender 等待 linger.time 之后就会发送数据。单位 ms, 默认值是 0ms,表示没 有延迟。 生产环境建议该值大小为 5-100ms 之间。
acks 0:生产者发送过来的数据,不需要等数据落盘应答。 1:生产者发送过来的数据, Leader 收到数据后应答。 -1(all):生产者发送过来的数据, Leader+和 isr 队列 里面的所有节点收齐数据后应答。 默认值是-1, -1 和 all 是等价的
max.in.flight.requests.per.connection 允许最多没有返回 ack 的次数, 默认为 5,开启幂等性 要保证该值是 1-5 的数字。
retries 当消息发送出现错误的时候,系统会重发消息。 retries 表示重试次数。 默认是 int 最大值, 2147483647。 如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1 否则在重试此失败消息的时候,其他的消息可能发送 成功了。
retry.backoff.ms 两次重试之间的时间间隔,默认是 100ms。
enable.idempotence 是否开启幂等性默认 true,开启幂等性。
compression.type 生产者发送的所有数据的压缩方式。 默认是 none,也 就是不压缩。 支持压缩类型: none、 gzip、 snappy、 lz4 和 zstd

例子 : 带回调函数的异步发送

sending with callback

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata) 和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.test.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallback {
public static void main(String[] args) throws InterruptedException {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// key,value 序列化(必须): key.serializer, value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 4. 调用 send 方法,发送消息
for (int i = 0; i < 5; i++) {
// 添加回调
kafkaProducer.send(new ProducerRecord<>("first", "test " + i), new Callback() {
// 该方法在 Producer 收到 ack 时调用,为异步调用
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
// 没有异常,输出信息到控制台
System.out.println(" 主 题 : " + metadata.topic() + "->" + "分区: " + metadata.partition());
} else {
// 出现异常打印
exception.printStackTrace();
}
}
});
// 延迟一会会看到数据发往不同分区
Thread.sleep(2);
}
// 5. 关闭资源
kafkaProducer.close();
}
}

测试:

在 hadoop102 上开启 Kafka 消费者。

1
[test@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。

1
2
3
4
5
test 0
test 1
test 2
test 3
test 4

在 IDEA 控制台观察回调信息。

1
2
3
4
5
主题: first->分区: 0
主题: first->分区: 0
主题: first->分区: 1
主题: first->分区: 1
主题: first->分区: 1

同步发送 API

只需在异步发送的基础上,再调用一下 get()方法即可。

1
2
3
4
// 异步发送 默认
kafkaProducer.send(new ProducerRecord<>("first","kafka" + i));
// 同步发送
kafkaProducer.send(new ProducerRecord<>("first","kafka" + i)).get();

生产者分区

好处

  1. 便于合理使用存储资源

    每个Partition在一个Broker上存储, 可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。 合理控制分区的任务, 可以实现负载均衡的效果。

  2. 提高并行度

    生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

分区策略

在IDEA中全局查找(ctrl +n) ProducerRecord类, 在类中可以看到如下构造方法:

  1. 指明partition的情况下,直接将指明的值作为partition值;例如partition=0,所有数据写入分区0:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
    ... ...
    }
    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
    ... ...
    }
    public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
    ... ...
    }
    public ProducerRecord(String topic, Integer partition, K key, V value) {
    ... ...
    }
  2. 没有指明partition值但有key的情况下, 将key的hash值与topic的partition数进行取余得到partition值:

    例如: key1的hash值=5, key2的hash值=6 , topic的partition数=2, 那么key1 对应的value1写入1号分区, key2对应的value2写入0号分区。

    1
    2
    3
    public ProducerRecord(String topic, K key, V value) {
    ... ...
    }
  3. 既没有partition值又没有key值的情况下, Kafka采用Sticky Partition(黏性分区器) , 会随机选择一个分区, 并尽可能一直使用该分区, 待该分区的batch已满或者已完成, Kafka再随机一个分区进行使用(和上一次的分区不同) 。

    例如:第一次随机选择0号分区, 等0号分区当前批次满了(默认16k) 或者linger.ms设置的时间到, Kafka再随机一个分区进行使用(如果还是0会继续随机) 。

    1
    2
    3
    public ProducerRecord(String topic, V value) {
    ... ...
    }

自定义分区器

如果研发人员可以根据企业需求,自己重新实现分区器。

例如我们实现一个分区器实现, 发送过来的数据中如果包含 test,就发往 0 号分区,不包含 test,就发往 1 号分区。

实现步骤

  1. 定义类实现 Partitioner 接口。

  2. 重写 partition()方法。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    package com.test.kafka.producer;
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    import java.util.Map;
    /**
    * 1. 实现接口 Partitioner
    * 2. 实现 3 个方法:partition,close,configure
    * 3. 编写 partition 方法,返回分区号
    */
    public class MyPartitioner implements Partitioner {
    /**
    * 返回信息对应的分区
    * @param topic 主题
    * @param key 消息的 key
    * @param keyBytes 消息的 key 序列化后的字节数组
    * @param value 消息的 value
    * @param valueBytes 消息的 value 序列化后的字节数组
    * @param cluster 集群元数据可以查看分区信息
    * @return
    */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    // 获取消息
    String msgValue = value.toString();
    // 创建 partition
    int partition;
    // 判断消息是否包含 test
    if (msgValue.contains("test")){
    partition = 0;
    }else {
    partition = 1;
    }
    // 返回分区号
    return partition;
    }
    // 关闭资源
    @Override
    public void close() {

    }
    // 配置方法
    @Override
    public void configure(Map<String, ?> configs) {

    }



  3. 使用分区器的方法,在生产者的配置中添加分区器参数。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    package com.test.kafka.producer;
    import org.apache.kafka.clients.producer.*;
    import java.util.Properties;
    public class CustomProducerCallbackPartitions {
    public static void main(String[] args) throws InterruptedException {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    // 添加自定义分区器
    properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.test.kafka.producer.MyPartitioner");
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    for (int i = 0; i < 5; i++) {
    kafkaProducer.send(new ProducerRecord<>("first", "test" + i), new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception e) {
    if (e == null){
    System.out.println(" 主 题 : " + metadata.topic() + "->" + "分区: " + metadata.partition());
    }else {
    e.printStackTrace();
    }
    }
    });
    }
    kafkaProducer.close();
    }
    }

生产者如何提高吞吐量

batch.size:批次大小,默认16k

linger.ms:等待时间,修改为5-100ms

compression.type:压缩snappy

RecordAccumulator:缓冲区大小,修改为64m

数据可靠性

ack 应答原理

ack

0: 生产者发送过来的数据, 不需要等数据落盘应答

数据可靠性分析:丢数

1: 生产者发送过来的数据, Leader收到数据后应答。

数据可靠性分析:丢数

如:应答完成后,还没开始同步副本, Leader挂了

新的Leader不会收到Hello的信息,因为生产者已经认为发送成功了。

-1(all) : 生产者发送过来的数据, Leader和ISR队列里面的所有节点收齐数据后应答。

思考: Leader收到数据, 所有Follower都开始同步数据,但有一个Follower, 因为某种故障, 迟迟不能与Leader进行同步, 那这个问题怎么解决呢?

Leader维护了一个动态的in-sync replica set( ISR) , 意为和Leader保持同步的Follower+Leader集合(leader: 0, isr:0,1,2)。

如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。 该时间阈值由replica.lag.time.max.ms参数设定,默认30s。 例如2超时, (leader:0, isr:0,1)。

这样就不用等长期联系不上或者已经故障的节点。

数据可靠性分析

如果分区副本设置为1个, 或者ISR里应答的最小副本数量( min.insync.replicas 默认为1) 设置为1, 和ack=1的效果是一样的, 仍然有丢数的风险(leader: 0, isr:0) 。

数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

可靠性总结

acks=0, 生产者发送过来数据就不管了, 可靠性差, 效率高;

acks=1, 生产者发送过来数据Leader应答, 可靠性中等, 效率中等;

acks=-1, 生产者发送过来数据Leader和ISR队列里面所有Follwer应答, 可靠性高, 效率低;

在生产环境中, acks=0很少使用; acks=1, 一般用于传输普通日志, 允许丢个别数据; acks=-1, 一般用于传输和钱相关的数据,对可靠性要求比较高的场景。

代码中需要添加:

1
2
3
4
// 设置 acks
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// 重试次数 retries,默认是 int 最大值, 2147483647
properties.put(ProducerConfig.RETRIES_CONFIG, 3);

数据重复 & 数据去重

Leader 在生产者发送过来的数据, Leader和ISR队列里面的所有节点收齐数据(hello)后应答,在最后应答时,leader 的 ack 还没有发出就挂了,导致 Producer 认为发送失败重传,但是新的 Leader (之前的 follower) 中已经有了该数据(hello)再次发送就导致了数据重复。


数据传递语义

至少一次(At Least Once) = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

最多一次(At Most Once) = ACK级别设置为0

总结

At Least Once可以保证数据不丢失, 但是不能保证数据不重复;

At Most Once可以保证数据不重复, 但是不能保证数据不丢失。

精确一次(Exactly Once) : 对于一些非常重要的信息, 比如和钱相关的数据, 要求数据既不能重复也不丢失。
Kafka 0.11版本以后, 引入了一项重大特性: 幂等性事务


幂等性

指Producer不论向Broker发送多少次重复数据, Broker端都只会持久化一条, 保证了不重复。

精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2) 。

重复数据的判断标准

具有<PID, Partition, SeqNumber>相同主键的消息提交时, Broker只会持久化一条。 其中PID是Kafka每次重启都会分配一个新的; Partition 表示分区号; Sequence Number是单调自增的。

所以幂等性只能保证的是在单分区单会话内不重复。

例如:

(Sequence=0,PID=1000,Value=Hello );(Sequence=1,PID=1000,Value=world ); (Sequence=1,PID=1000,Value=world ):negative_squared_cross_mark:

如何使用幂等性

开启参数 enable.idempotence 默认为 true, false 关闭。


生产者事务

事务原理

幂等性并不能跨多个分区运行,而事务可以弥补这个缺陷。事务可以保证对多个分区写入操作的原子性。操作的原子性是指多个操作要么全部成功,要么全部失败,不存在不一致的情况。

对流式应用而言,一个典型的应用模式为“consumer-transform-produce”, 这种模式下消费和生产并存: 应用程序从某个主题中消费消息,然后经过一系列操作写入另一个主题,消费者可能再提交消费位移的过程中出现问题而导致重复消费,也有可能生产者重复生产消息。 kafka中的事务可以使应用程序将消费消息,生产消息、提交消费位移当作原子操作来处理,同时成功或者失败,即使该生产或消费跨越多个分区。

transaction

场景

  1. 最简单的需求是producer发的多条消息组成一个事务这些消息需要对consumer同时可见或者同时不可见 。
  2. producer可能会给多个topic,多个partition发消息,这些消息也需要能放在一个事务里面,这就形成了一个典型的分布式事务
  3. kafka的应用场景经常是应用先消费一个topic,然后做处理再发到另一个topic,这个consume-transform-produce过程需要放到一个事务里面,比如在消息处理或者发送的过程中如果失败了,消费位点也不能提交。
  4. producer或者producer所在的应用可能会挂掉,新的producer启动以后需要知道怎么处理之前未完成的事务 。
  5. 流式处理的拓扑可能会比较深,如果下游只有等上游消息事务提交以后才能读到,可能会导致rt非常长吞吐量也随之下降很多,所以需要实现read committed和read uncommitted两种事务隔离级别。

transactionalId与PID一一对应,两者之间所不同的是transactionalId由用户显示设置,而PID是由kafka内部分配的。 为了保证新的生产者启动后,具有相同transactionalId的旧生产者能够立即失效,每个生产者通过transactionalId获取PID的同时,还会获取一个单调递增的producer epoch(对应下面要讲述的kafkaProducer.initTransactions()方法)。 如果使用同一个transactionalId开启两个生产者,那么前一个生产者会报提示有一个新的生产者利用同一个事务id申请了producer epoch。提示老的生产者它再broker里面已经过期了。

从生产者的角度分析,通过事务,Kafka 可以保证跨生产者会话的消息幂等发送,以及跨生产者会话的事务恢复。前者表示具有相同 transactionalId 的新生产者实例被创建且工作的时候,旧的且拥有相同transactionalId的生产者实例将不再工作。后者指当某个生产者实例宕机后,新的生产者实例可以保证任何未完成的旧事务要么被提交(Commit),要么被中止(Abort),如此可以使新的生产者实例从一个正常的状态开始工作。

而从消费者的角度分析,事务能保证的语义相对偏弱。出于以下原因,Kafka 并不能保证已提交的事务中的所有消息都能够被消费:

  1. 对采用日志压缩策略的主题而言,事务中的某些消息有可能被清理(相同key的消息,后写入的消息会覆盖前面写入的消息)。
  2. 事务中消息可能分布在同一个分区的多个日志分段(LogSegment)中,当老的日志分段被删除时,对应的消息可能会丢失。
  3. 消费者可以通过seek()方法访问任意offset的消息,从而可能遗漏事务中的部分消息。
  4. 消费者在消费时可能没有分配到事务内的所有分区,如此它也就不能读取事务中的所有消息。

说明:开启事务, 必须开启幂等性。

Producer 在使用事务功能前,必须先自定义一个唯一的 transactional.id。 有了 transactional.id,即使客户端挂掉了,它重启后也能继续处理未完成的事务

Kafka 的事务一共有如下 5 个 API :

1
2
3
4
5
6
7
8
9
10
11
// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

单个 Producer,使用事务保证消息的仅一次发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.test.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerTransactions {
public static void main(String[] args) throws InterruptedException {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
// key,value 序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 设置事务 id(必须),事务 id 任意起名
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");
// 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 初始化事务
kafkaProducer.initTransactions();
// 开启事务
kafkaProducer.beginTransaction()
try {
// 4. 调用 send 方法,发送消息
for (int i = 0; i < 5; i++) {
// 发送消息
kafkaProducer.send(new ProducerRecord<>("first", "test " + i));
}
// 提交事务
kafkaProducer.commitTransaction();
} catch (Exception e) {
// 终止事务
kafkaProducer.abortTransaction();
}finally {
// 5. 关闭资源
kafkaProducer.close();
}
}
}

数据有序

单分区内, 有序(有条件的) ;

多分区, 分区与分区间无序;


数据乱序

  1. kafka在1.x版本之前保证数据单分区有序:

    max.in.flight.requests.per.connection=1( 不需要考虑是否开启幂等性) 。

  2. kafka在1.x及以后版本保证数据单分区有序:

    1. 未开启幂等性

      max.in.flight.requests.per.connection需要设置为1。

    2. 开启幂等性

      max.in.flight.requests.per.connection需要设置小于等于5。

      原因说明:因为在kafka1.x以后,启用幂等后, kafka服务端会缓存producer发来的最近5个request的元数据,故无论如何,都可以保证最近5个request的数据都是有序的。

数据乱序

Broker

Zookeeper 存储的 Kafka 信息

Kafka info in zookeeper

在zookeeper的服务端存储的Kafka相关信息:

  1. /kafka/brokers/ids : [0,1,2] 记录有哪些服务器
  2. /kafka/brokers/topics/first/partitions/0/state : {“leader”:1 ,”isr”:[1,0,2] } 记录谁是Leader,有哪些服务器可用
  3. /kafka/controller {“brokerid” :0} 辅助选举Leader

Kafka Broker 总体工作流程

borker working procedure

Broker 重要参数

参数名称 描述
replica.lag.time.max.ms ISR 中, 如果 Follower 长时间未向 Leader 发送通 信请求或同步数据,则该 Follower 将被踢出 ISR。 该时间阈值, 默认 30s。
auto.leader.rebalance.enable 默认是 true。 自动 Leader Partition 平衡。
leader.imbalance.per.broker.percentage **默认是 10%**。 每个 broker 允许的不平衡的 leader 的比率。如果每个 broker 超过了这个值,控制器 会触发 leader 的平衡。
leader.imbalance.check.interval.seconds 默认值 300 秒。检查 leader 负载是否平衡的间隔时 间。
log.segment.bytes Kafka 中 log 日志是分成一块块存储的,此配置是 指 log 日志划分 成块的大小, 默认值 1G
log.index.interval.bytes 默认 4kb, kafka 里面每当写入了 4kb 大小的日志 (.log),然后就往 index 文件里面记录一个索引。
log.retention.hours Kafka 中数据保存的时间, 默认 7 天
log.retention.minutes Kafka 中数据保存的时间, 分钟级别,默认关闭。
log.retention.ms Kafka 中数据保存的时间, 毫秒级别,默认关闭
log.retention.check.interval.ms 检查数据是否保存超时的间隔, 默认是 5 分钟
log.retention.bytes 默认等于-1,表示无穷大。 超过设置的所有日志总大小,删除最早的 segment。
log.cleanup.policy 默认是 delete,表示所有数据启用删除策略;如果设置值为 compact,表示所有数据启用压缩策 略。
num.io.threads 默认是 8。 负责写磁盘的线程数。整个参数值要占 总核数的 50%。
num.replica.fetchers 副本拉取线程数,这个参数占总核数的 50%的 1/3
num.network.threads 默认是 3。 数据传输线程数,这个参数占总核数的 50%的 2/3 。
log.flush.interval.messages 强制页缓存刷写到磁盘的条数,默认是 long 的最 大值, 9223372036854775807。一般不建议修改, 交给系统自己管理。
log.flush.interval.ms 每隔多久,刷数据到磁盘,默认是 null。一般不建 议修改,交给系统自己管理。

节点服役和退役

服役新节点

  1. 关闭 hadoop104,并右键执行克隆操作。

  2. 开启 hadoop105,并修改 IP 地址。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    #根据你自己的 Hadoop 集群网络来配置
    [root@hadoop104 ~]# vim /etc/sysconfig/network-scripts/ifcfgens33
    DEVICE=ens33
    TYPE=Ethernet
    ONBOOT=yes
    BOOTPROTO=static
    NAME="ens33"
    IPADDR=192.168.10.105
    PREFIX=24
    GATEWAY=192.168.10.2
    DNS1=192.168.10.2
  3. 在 hadoop105 上,修改主机名称为 hadoop105。

    1
    2
    [root@hadoop104 ~]# vim /etc/hostname
    hadoop105
  4. 重新启动 hadoop104、 hadoop105。

  5. 修改 haodoop105 中 kafka 的 broker.id 为 3。

  6. 删除 hadoop105 中 kafka 下的 datas 和 logs。

  7. 启动 hadoop102、 hadoop103、 hadoop104 上的 kafka 集群

  8. 单独启动 hadoop105 中的 kafka。

    1
    [test@hadoop105 kafka]$ bin/kafka-server-start.sh -daemon ./config/server.properties

执行负载均衡操作

  1. 创建一个要均衡的主题。

    1
    2
    3
    4
    5
    6
    7
    [test@hadoop102 kafka]$ vim topics-to-move.json
    {
    "topics": [
    {"topic": "first"}
    ],
    "version": 1
    }
  2. 生成一个负载均衡的计划。

    1
    2
    3
    4
    5
    6
    7
    [test@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate

    Current partition replica assignment
    {"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]}]}

    Proposed partition reassignment configuration
    {"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}
  3. 创建副本存储计划(所有副本存储在 broker0、 broker1、 broker2、 broker3 中)。

    1
    2
    3
    [test@hadoop102 kafka]$ vim increase-replication-factor.json

    {"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}
  4. 执行副本存储计划。

    1
    [test@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
  5. 验证副本存储计划。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    [test@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify

    Status of partition reassignment:
    Reassignment of partition first-0 is complete.
    Reassignment of partition first-1 is complete.
    Reassignment of partition first-2 is complete.

    Clearing broker-level throttles on brokers 0,1,2,3
    Clearing topic-level throttles on topic first

退役旧节点

先按照退役一台节点, 生成执行计划,然后按照服役时操作流程执行负载均衡

  1. 创建一个要均衡的主题。

    1
    2
    3
    4
    5
    6
    7
    [test@hadoop102 kafka]$ vim topics-to-move.json
    {
    "topics": [
    {"topic": "first"}
    ],
    "version": 1
    }
  2. 创建执行计划。

    1
    2
    3
    4
    5
    6
    7
    [test@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate

    Current partition replica assignment
    {"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[0,2,3],"log_dirs":["any","any","any"]}]}

    Proposed partition reassignment configuration
    {"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[1,2,0],"log_dirs":["any","any","any"]}]}
  3. 创建副本存储计划(所有副本存储在 broker0、 broker1、 broker2 中)。

    1
    2
    3
    [test@hadoop102 kafka]$ vim increase-replication-factor.json

    {"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[1,2,0],"log_dirs":["any","any","any"]}]}
  4. 执行副本存储计划。

    1
    [test@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
  5. 验证副本存储计划。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    [test@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify

    Status of partition reassignment:
    Reassignment of partition first-0 is complete.
    Reassignment of partition first-1 is complete.
    Reassignment of partition first-2 is complete.

    Clearing broker-level throttles on brokers 0,1,2,3
    Clearing topic-level throttles on topic first

执行停止命令

在 hadoop105 上执行停止命令即可。

Kafka 副本

副本基本信息

  1. Kafka 副本作用:提高数据可靠性。

  2. Kafka 默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。

  3. Kafka 中副本分为: Leader 和 Follower。 Kafka 生产者只会把数据发往 Leader,然后 Follower 找 Leader 进行同步数据。

  4. Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。

    AR = ISR + OSR

    ISR,表示和 Leader 保持同步的 Follower 集合。 如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Followe 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms 参数设定,默认 30s。 Leader 发生故障之后,就会从 ISR 中选举新的 Leader。

    OSR, 表示 Follower 与 Leader 副本同步时,延迟过多的副本。

Leader 选举流程

Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群 broker 的上下线,所有 topic 的分区副本分配和 Leader 选举等工作。

Controller 的信息同步工作是依赖于 Zookeeper 的。

borker working procedure

Leader 和 Follower 故障处理细节

LEO(Log End Offset): 每个副本的最后一个offset, LEO其实就是最新的offset + 1。

HW(High Watermark): 所有副本中最小的LEO 。

Follower ERROR

Follower故障

  1. Follower发生故障后会被临时踢出ISR
  2. 这个期间Leader和Follower继续接收数据
  3. 待该Follower恢复后, Follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向Leader进行同步
  4. 等该Follower的LEO大于等于该Partition的HW,即Follower追上Leader之后,就可以重新加入ISR了。

Leader ERROR

Leader故障

  1. Leader发生故障之后,会从ISR中选出一个新的Leader
  2. 为保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据。

注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

手动调整分区副本存储

在生产环境中, 每台服务器的配置和性能不一致, 但是Kafka只会根据自己的代码规则创建对应的分区副本, 就会导致个别服务器存储压力较大。 所有需要手动调整分区副本的存储。

副本

broker storage capacity before_replicas after_replicas
broker0 32T 1_Leader 2_Follower 1_Leader 2_Leader 3_Follower 4_Follower
broker1 32T 2_Leader 3_Follower 3_Leader 4_Leader 1_Follower 2_Follower
broker2 4T 3_Leader 4_Follower
broker3 4T 4_Leader 1_Follower

手动调整分区副本存储的步骤如下:

  1. 创建一个新的 topic, 名称为 three。

    1
    [test@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 4 --replication-factor 2 --topic three
  2. 查看分区副本存储情况。

    1
    [test@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three
  3. 创建副本存储计划(所有副本都指定存储在 broker0、 broker1 中)。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    [test@hadoop102 kafka]$ vim increase-replication-factor.json

    {
    "version":1,
    "partitions":[{"topic":"three","partition":0,"replicas":[0,1]},
    {"topic":"three","partition":1,"replicas":[0,1]},
    {"topic":"three","partition":2,"replicas":[1,0]},
    {"topic":"three","partition":3,"replicas":[1,0]}]
    }
  4. 执行副本存储计划。

    1
    [test@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
  5. 验证副本存储计划。

    1
    [test@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify
  6. 查看分区副本存储情况。

    1
    2
    [test@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server
    hadoop102:9092 --describe --topic three

Leader Partition 负载平衡

正常情况下, Kafka本身会自动把Leader Partition均匀分散在各个机器上, 来保证每台机器的读写吞吐量都是均匀的。 但是如果某些broker宕机, 会导致Leader Partition过于集中在其他少部分几台broker上, 这会导致少数几台broker的读写请求压力过高, 其他宕机的broker重启之后都是follower partition, 读写请求很低, 造成集群负载不均衡。

配置参数

参数名称 描述
auto.leader.rebalance.enable 默认是 true。 自动 Leader Partition 平衡。 生产环 境中, leader 重选举的代价比较大,可能会带来 性能影响,建议设置为 false 关闭。
leader.imbalance.per.broker.percentage 默认是 10%。 每个 broker 允许的不平衡的 leader 的比率。如果每个 broker 超过了这个值,控制器 会触发 leader 的平衡。
leader.imbalance.check.interval.seconds 默认值 300 秒。检查 leader 负载是否平衡的间隔 时间。

例子

broker Topic Partition Leader Replicas Isr
2 test 0 0 3,0,2,1 3,0,2,1
1 test 1 1 1,2,3,0 1,2,3,0
0 test 2 2 0,3,1,2 0,3,1,2
3 test 3 3 2,1,0,3 2,1,0,3

针对broker0节点,分区2的AR优先副本是0节点,但是0节点却不是Leader节点,所以不平衡数加1, AR副本总数是4

所以broker0节点不平衡率为1/4>10%,需要再平衡。

broker2和broker3节点和broker0不平衡率一样,需要再平衡。

Broker1的不平衡数为0,不需要再平衡

增加副本因子

在生产环境当中,由于某个主题的重要等级需要提升,我们考虑增加副本。副本数的增加需要先制定计划,然后根据计划执行。

  1. 创建 topic

    1
    [test@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 3 --replication-factor 1 -- topic four
  2. 手动增加副本存储

    创建副本存储计划(所有副本都指定存储在 broker0、 broker1、 broker2 中)。

    1
    2
    3
    [test@hadoop102 kafka]$ vim increase-replication-factor.json

    {"version":1,"partitions":[{"topic":"four","partition":0,"replicas":[0,1,2]},{"topic":"four","partition":1,"replicas":[0,1,2]},{"topic":"four","partition":2,"replicas":[0,1,2]}]}

    执行副本存储计划

    1
    [test@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute

文件存储

文件存储机制

Topic是逻辑上的概念, 而partition是物理上的概念, 每个partition对应于一个log文件, 该log文件中存储的就是Producer生产的数据。 Producer生产的数据会被不断追加到该log文件末端, 为防止log文件过大导致数据定位效率低下, Kafka采取了分片索引机制,将每个partition分为多个segment。 每个segment包括: “.index”文件、 “.log”文件和.timeindex等文件。 这些文件位于一个文件夹下, 该文件夹的命名规则为: topic名称+分区序号, 例如: first-0。

File Storage

一个topic分为多个partition

一个partition分为多个segment

每个 segment

.log 日志文件
.index 偏移量索引文件
.timeindex 时间戳索引文件
其他文件

说明: index和log文件以当前 segment 的第一条消息的offset命名。

Topic 数据到底存储在什么位置?

  1. 启动生产者,并发送消息。

    1
    2
    [test@hadoop102 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
    >hello world
  2. 查看 hadoop102(或者 hadoop103、 hadoop104)的/opt/module/kafka/datas/first-1 (first-0、 first-2)路径上的文件。

    1
    2
    3
    4
    5
    6
    7
    [test@hadoop104 first-1]$ ls
    00000000000000000092.index
    00000000000000000092.log
    00000000000000000092.snapshot
    00000000000000000092.timeindex
    leader-epoch-checkpoint
    partition.metadata
  3. 通过工具查看 index 和 log 信息。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    [test@hadoop104 first-1]$ kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.index

    Dumping ./00000000000000000000.index
    offset: 3 position: 152

    [test@hadoop104 first-1]$ kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log

    Dumping datas/first-0/00000000000000000000.log
    Starting offset: 0
    baseOffset: 0 lastOffset: 1 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1636338440962 size: 75 magic: 2 compresscodec: none crc: 2745337109 isvalid: true

    baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 75 CreateTime: 1636351749089 size: 77 magic: 2 compresscodec: none crc: 273943004 isvalid: true

    baseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 152 CreateTime: 1636351749119 size: 77 magic: 2 compresscodec: none crc: 106207379 isvalid: true

    baseOffset: 4 lastOffset: 8 count: 5 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 229 CreateTime: 1636353061435 size: 141 magic: 2 compresscodec: none crc: 157376877 isvalid: true

    baseOffset: 9 lastOffset: 13 count: 5 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 370 CreateTime: 1636353204051 size: 146 magic: 2 compresscodec: none crc: 4058582827 isvalid: true

index 文件和 log 文件详解

index & log

  1. 根据目标offset定位Segment文件
  2. 找到小于等于目标offset的最大offset对应的索引项
  3. 定位到log文件
  4. 向下遍历找到目标Record

注意

  1. .index为稀疏索引,大约每往log文件写入4kb数据,会往index文件写入一条索引。参数log.index.interval.bytes默认4kb。
  2. Index文件中保存的offset为相对offset,这样能确保offset的值所占空间不会过大,
    因此能将offset的值控制在固定大小

日志存储参数配置

参数 描述
log.segment.bytes Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分 成块的大小, 默认值 1G
log.index.interval.bytes 默认 4kb, kafka 里面每当写入了 4kb 大小的日志(.log),然后就 往 index 文件里面记录一个索引。 稀疏索引。

文件清理策略

Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间:

log.retention.hours, 最低优先级小时,默认 7 天。

log.retention.minutes, 分钟。

log.retention.ms, 最高优先级毫秒。

log.retention.check.interval.ms, 负责设置检查周期,默认 5 分钟。

Kafka 中提供的日志清理策略deletecompact 两种。

delete 日志删除

将过期数据删除

log.cleanup.policy = delete 所有数据启用删除策略

  1. 基于时间:默认打开。 以 segment 中所有记录中的最大时间戳作为该文件时间戳。
  2. 基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。 log.retention.bytes,默认等于-1,表示无穷大。

compact 日志压缩

对于相同key的不同value值, 只保留最后一个版本。

log.cleanup.policy = compact 所有数据启用压缩策略

压缩前:

Offset 0 1 2 3 4 5 6 7 8
key K1 K2 K1 K1 K3 K4 K5 K5 K2
value V1 V2 V3 V4 V5 V6 V7 V8 V9

压缩后:

Offset 3 4 5 7 8
key K1 K3 K4 K5 K2
value V4 V5 V6 V8 V9

压缩后的offset可能是不连续的, 比如上图中没有6, 当从这些offset(6)消费消息时, 将会拿到比这个offset大的offset(7)对应的消息, 实际上会拿到offset为7的消息, 并从这个位置开始消费。

这种策略只适合特殊场景, 比如消息的key是用户ID, value是用户的资料, 通过这种压缩策略, 整个消息集里就保存了所有用户最新的资料。

高效读写数据

  1. Kafka 本身是分布式集群,可以采用分区技术,并行度高
  2. 读数据采用稀疏索引, 可以快速定位要消费的数据
  3. 顺序写磁盘 (追加)
  4. 页缓存 + 零拷贝技术

页缓存 + 零拷贝技术

零拷贝:Kafka的数据加工处理操作交由Kafka生产者和Kafka消费者处理。 Kafka Broker应用层不关心存储的数据, 所以就不用走应用层, 传输效率高。

PageCache页缓存: Kafka重度依赖底层操作系统提供的PageCache功能。 当上层有写操作时, 操作系统只是将数据写入PageCache。 当读操作发生时, 先从PageCache中查找, 如果找不到, 再去磁盘中读取。 实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用。

页缓存 + 零拷贝技术

参数 描述
log.flush.interval.messages 强制页缓存刷写到磁盘的条数,默认是 long 的最大值, 9223372036854775807。 一般不建议修改,交给系统自己管 理。
log.flush.interval.ms 每隔多久,刷数据到磁盘,默认是 null。一般不建议修改, 交给系统自己管理。

Kafka 消费者

Kafka 消费方式

pull( 拉) 模式:

consumer采用从broker中主动拉取数据。 (Kafka采用这种方式。

push(推)模式:

Kafka没有采用这种方式,因为由broker决定消息发送速率, 很难适应所有消费者的
消费速率。 例如推送的速度是50m/s,Consumer1、 Consumer2就来不及处理消息。

pull模式不足之处是, 如果Kafka没有数据, 消费者可能会陷入循环中, 一直返回空数据。

Kafka 消费者总体工作流程

consumer working procedure

消费者组原理

消费者组

Consumer Group(CG) :消费者组, 由多个consumer组成。 形成一个消费者组的条件, 是所有消费者的groupid相同。

消费者组内每个消费者负责消费不同分区的数据, 一个分区只能由一个组内消费者消费

消费者组之间互不影响。 所有的消费者都属于某个消费者组, 即消费者组是逻辑上的一个订阅者。

如果向消费组中添加更多的消费者, 超过主题分区数量, 则有一部分消费者就会闲置, 不会接收任何消息。

消费者组初始化流程

consumer group initial procedure

  1. coordinator:辅助实现消费者组的初始化和分区的分配。

    coordinator节点选择 = groupid的hashcode值 % 50(___consumer_offsets的分区数量)例如: groupid的hashcode值 = 1, 1% 50 = 1,那么 __consumer_offsets topic 的1号分区,哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset。 (每个消费者的offset由消费者提交到系统主题保存 )

消费者组详细消费流程

consumer group consuming procedure

消费者重要参数

参数名称 描述
bootstrap.servers 向 Kafka 集群建立初始连接用到的 host/port 列表。
key.deserializer 和 value.deserializer 指定接收消息的 key 和 value 的反序列化类型。一定要写全 类名。
group.id 标记消费者所属的消费者组。
enable.auto.commit 默认值为 true,消费者会自动周期性地向服务器提交偏移 量。
auto.commit.interval.ms 如果设置了 enable.auto.commit 的值为 true, 则该值定义了 消费者偏移量向 Kafka 提交的频率, 默认 5s
auto.offset.reset 当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在 (如,数据被删除了),该如何处理? earliest:自动重置偏 移量到最早的偏移量。 latest: 默认, 自动重置偏移量为最 新的偏移量。 none:如果消费组原来的( previous)偏移量 不存在,则向消费者抛异常。 anything:向消费者抛异常。
offsets.topic.num.partitions __consumer_offsets 的分区数, 默认是 50 个分区。
heartbeat.interval.ms Kafka 消费者和 coordinator 之间的心跳时间, 默认 3s。 该条目的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的 1/3。
session.timeout.ms Kafka 消费者和 coordinator 之间连接超时时间, 默认 45s。 超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms 消费者处理消息的最大时长, 默认是 5 分钟。超过该值,该 消费者被移除,消费者组执行再平衡。
fetch.min.bytes 默认 1 个字节。消费者获取服务器端一批消息最小的字节数。
fetch.max.wait.ms 默认 500ms。如果没有从服务器端获取到一批数据的最小字 节数。该时间到,仍然会返回数据。
fetch.max.bytes 默认 Default: 52428800( 50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值 (50m)仍然可以拉取回来这批数据,因此,这不是一个绝 对最大值。一批次的大小受 message.max.bytes ( broker config) or max.message.bytes (topic config) 影响。
max.poll.records 一次 poll 拉取数据返回消息的最大条数, 默认是 500 条。

分区的分配以及再平衡(针对消费者组)

  1. 一个consumer group中有多个consumer组成, 一个 topic有多个partition组成, 现在的问题是, 到底由哪个consumer来消费哪个partition的数据。
  2. Kafka有四种主流的分区分配策略: Range、 RoundRobin、 Sticky、 CooperativeSticky。可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range + CooperativeSticky。 Kafka可以同时使用多个分区分配策略

参数配置

参数名称 描述
heartbeat.interval.ms Kafka 消费者和 coordinator 之间的心跳时间, 默认 3s。 该条目的值必须小于 session.timeout.ms,也不应该高于 session.timeout.ms 的 1/3。
session.timeout.ms Kafka 消费者和 coordinator 之间连接超时时间, 默认 45s。超 过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms 消费者处理消息的最大时长, 默认是 5 分钟。超过该值,该 消费者被移除,消费者组执行再平衡。
partition.assignment.strategy 消 费 者 分 区 分 配 策 略 , 默 认 策 略 是 Range + CooperativeSticky。 Kafka 可以同时使用多个分区分配策略。 可 以 选 择 的 策 略 包 括 : Range 、 RoundRobin 、 Sticky 、 CooperativeSticky

Range 以及再平衡

Range 分区策略原理

Range

Range 是对每个 topic 而言的。

首先对同一个 topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序

假如现在有 7 个分区, 3 个消费者,排序后的分区将会是0,1,2,3,4,5,6;消费者排序完之后将会是C0,C1,C2。

通过 partitions数/consumer数 来决定每个消费者应该消费几个分区。 如果除不尽,那么前面几个消费者将会多消费 1 个分区。

例如, 7/3 = 2 余 1 ,除不尽,那么 消费者 C0 便会多消费 1 个分区。 8/3=2余2,除不尽,那么C0和C1分别多消费一个。

注意: 如果只是针对 1 个 topic 而言, C0消费者多消费1个分区影响不是很大。但是如果有 N 多个 topic,那么针对每个 topic,消费者 C0都将多消费 1 个分区, topic越多, C0消费的分区会比其他消费者明显多消费 N 个分区。

容易产生数据倾斜!

Range 分区分配策略案例

  1. 修改主题 first 为 7 个分区。 注意:分区数可以增加,但是不能减少。

  2. 复制 CustomConsumer 类, 创建 CustomConsumer2。这样可以由三个消费者 CustomConsumer、 CustomConsumer1、 CustomConsumer2 组成消费者组,组名都为“ test”,同时启动 3 个消费者。

  3. 启动 CustomProducer 生产者,发送 500 条消息,随机发送到不同的分区。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    package com.test.kafka.producer;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import java.util.Properties;
    public class CustomProducer {
    public static void main(String[] args) throws InterruptedException {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    for (int i = 0; i < 7; i++) {
    kafkaProducer.send(new ProducerRecord<>("first", i, "test", "atguigu"));
    }
    kafkaProducer.close();
    }
    }

    说明: Kafka 默认的分区分配策略就是 Range + CooperativeSticky, 所以不需要修改策略。

Range 分区分配再平衡案例

  1. 停止掉 0 号消费者, 快速重新发送消息观看结果(45s 以内,越快越好)。

    1 号消费者:消费到 3、 4 号分区数据。

    2 号消费者:消费到 5、 6 号分区数据。

    0 号消费者的任务会整体被分配到 1 号消费者或者 2 号消费者 。

    说明: 0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

  2. 再次重新发送消息观看结果(45s 以后)。

    1 号消费者:消费到 0、 1、 2、 3 号分区数据。

    2 号消费者:消费到 4、 5、 6 号分区数据

    说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配。

RoundRobin 以及再平衡

RoundRobin 针对集群中所有Topic而言。

RoundRobin 轮询分区策略,是把所有的 partition所有的 consumer(?同一个消费者组) 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 给到各个消费者。

RoundRobin

RoundRobin 分区分配策略案例

  1. 依次在 CustomConsumer、 CustomConsumer1、 CustomConsumer2 三个消费者代码中修改分区分配策略为 RoundRobin。

    1
    2
    // 修改分区分配策略
    properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
  2. 重启 3 个消费者,重复发送消息的步骤,观看分区结果。

RoundRobin 分区分配再平衡案例

  1. 停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。

    1 号消费者:消费到 2、 5 号分区数据

    2 号消费者:消费到 4、 1 号分区数据

    0 号消费者的任务会按照 RoundRobin 的方式,把数据轮询分成 0 、 6 和 3 号分区数据,分别由 1 号消费者或者 2 号消费者消费。

    说明: 0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需
    要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

  2. 再次重新发送消息观看结果(45s 以后)。

    1 号消费者:消费到 0、 2、 4、 6 号分区数据

    2 号消费者:消费到 1、 3、 5 号分区数据

    说明:消费者 0 已经被踢出消费者组,所以重新按照 RoundRobin 方式分配。

Sticky 以及再平衡

粘性分区定义: 可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销

粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略, 首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化

  1. 设置主题为 first, 7 个分区;准备 3 个消费者,采用粘性分区策略,并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。

  2. 修改分区分配策略为粘性。

    注意: 3 个消费者都应该注释掉,之后重启 3 个消费者,如果出现报错,全部停止等会再重启,或者修改为全新的消费者组。

    1
    2
    3
    4
    // 修改分区分配策略
    ArrayList<String> startegys = new ArrayList<>();
    startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
    properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);
  3. 使用同样的生产者发送 500 条消息。

    可以看到会尽量保持分区的个数近似划分分区。

Sticky 分区分配再平衡案例

  1. 停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。

    1 号消费者:消费到 2、 5、 3 号分区数据。

    2 号消费者:消费到 4、 6 号分区数据。

    0 号消费者的任务会按照粘性规则,尽可能均衡的随机分成 0 和 1 号分区数据,分别由 1 号消费者或者 2 号消费者消费。

    说明: 0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行

  2. 再次重新发送消息观看结果(45s 以后)。

    1 号消费者:消费到 2、 3、 5 号分区数据。

    2 号消费者:消费到 0、 1、 4、 6 号分区数据。

    说明:消费者 0 已经被踢出消费者组,所以重新按照粘性方式分配。

offset 位移

Kafka0.9版本之前,consumer默认将offset保存在Zookeeper中,从0.9版本开始, consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets

__consumer_offsets 主题里面采用 key 和 value 的方式存储数据key 是 group.id+topic+分区号, value 就是当前 offset 的值每隔一段时间, kafka 内部会对这个 topic 进行 compact,也就是每个 group.id+topic+分区号就保留最新数据。

消费 offset 案例

  1. 思想:__consumer_offsets 为 Kafka 中的 topic,那就可以通过消费者进行消费。

  2. 在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。

  3. 采用命令行方式, 创建一个新的 topic。

1
[test@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic test --partitions 2 --replication-factor 2
  1. 启动生产者往 test 生产数据。

    1
    [test@hadoop102 kafka]$ bin/kafka-console-producer.sh --topic test --bootstrap-server hadoop102:9092

    注意:指定消费者组名称,更好观察数据存储位置(key 是 group.id+topic+分区号)。

  2. 查看消费者消费主题__consumer_offsets。

    1
    2
    3
    4
    5
    6
    7
    8
    [test@hadoop102 kafka]$ bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server hadoop102:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

    [offset,test,1]::OffsetAndMetadata(offset=7,
    leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203,
    expireTimestamp=None)
    [offset,test,0]::OffsetAndMetadata(offset=8,
    leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203,
    expireTimestamp=None)

自动提交 offset

为了使我们能够专注于自己的业务逻辑, Kafka提供了自动提交offset的功能。
自动提交offset的相关参数

参数名称 描述
enable.auto.commit 默认值为 true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消 费者偏移量向 Kafka 提交的频率, 默认 5s

auto-commit offsets

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.test.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class CustomConsumerAutoOffset {
public static void main(String[] args) {

// 1. 创建 kafka 消费者配置类
Properties properties = new Properties();
// 2. 添加配置参数
// 添加连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// 配置序列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 配置消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 提交 offset 的时间周期 1000ms,默认 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
//3. 创建 kafka 消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
//4. 设置消费主题 形参是列表
consumer.subscribe(Arrays.asList("first"));
//5. 消费数据
while (true){
// 读取消息
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
// 输出消息
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.value());
}
}
}
}

手动提交 offset

虽然自动提交offset十分简单便利, 但由于其是基于时间提交的, 开发人员难以把握offset提交的时机。 因此Kafka还提供了手动提交offset的API。

commit offset by hand

手动提交offset的方法有两种:分别是commitSync( 同步提交)commitAsync( 异步提交)

两者的相同点:都会将本次提交一批数据最高的偏移量提交;

不同点: 同步提交阻塞当前线程, 一直到提交成功, 并且会自动失败重试(由不可控因素导致, 也会出现提交失败) ;而异步提交则没有失败重试机制, 故有可能提交失败。

同步提交 offset

由于同步提交 offset 有失败重试机制,故更加可靠, 但是由于一直等待提交结果,提交的效率比较低。 以下为同步提交 offset 的示例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.test.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class CustomConsumerByHandSync {
public static void main(String[] args) {
// 1. 创建 kafka 消费者配置类
Properties properties = new Properties();
// 2. 添加配置参数
// 添加连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// 配置序列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 配置消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//3. 创建 kafka 消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
//4. 设置消费主题 形参是列表
consumer.subscribe(Arrays.asList("first"));
//5. 消费数据
while (true){
// 读取消息
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
// 输出消息
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.value());
}
// 同步提交 offset
consumer.commitSync();
}
}
}

异步提交 offset

虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。

以下为异步提交 offset 的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.test.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class CustomConsumerByHandSync {
public static void main(String[] args) {
// 1. 创建 kafka 消费者配置类
Properties properties = new Properties();
// 2. 添加配置参数
// 添加连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// 配置序列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 配置消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//3. 创建 kafka 消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
//4. 设置消费主题 形参是列表
consumer.subscribe(Arrays.asList("first"));
//5. 消费数据
while (true){
// 读取消息
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
// 输出消息
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.value());
}
// 同步提交 offset
consumer.commitAsync();
}
}
}

指定 Offset 消费

auto.offset.reset = earliest | latest | none 默认是 latest。

当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?

  1. earliest:自动将偏移量重置为最早的偏移量, –from-beginning。
  2. latest(默认值):自动将偏移量重置为最新偏移量。
  3. none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。

任意指定 offset 位移开始消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.test.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
public class CustomConsumerSeek {
public static void main(String[] args) {
// 0 配置信息
Properties properties = new Properties();
// 连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// key value 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");
// 1 创建一个消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 订阅一个主题
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
Set<TopicPartition> assignment= new HashSet<>();
while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
}
// 遍历所有分区,并指定 offset 从 1700 的位置开始消费
for (TopicPartition tp: assignment) {
kafkaConsumer.seek(tp, 1700);
}
// 3 消费该主题数据
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}

注意:每次执行完,需要修改消费者组名;

指定时间消费

需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据,怎么处理?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.test.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
public class CustomConsumerSeek {
public static void main(String[] args) {
// 0 配置信息
Properties properties = new Properties();
// 连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// key value 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");
// 1 创建一个消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 订阅一个主题
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
Set<TopicPartition> assignment= new HashSet<>();
while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
}
HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
// 封装集合存储,每个分区对应一天前的数据
for (TopicPartition topicPartition : assignment) {
timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
}
// 获取从 1 天前开始消费的每个分区的 offset
Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);
// 遍历每个分区,对每个分区设置消费时间。
for (TopicPartition topicPartition : assignment) {
OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
// 根据时间指定开始消费的位置
if (offsetAndTimestamp != null){
kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
}
}
// 3 消费该主题数据
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}

漏消费和重复消费

重复消费: 已经消费了数据,但是 offset 没提交。

漏消费: 先提交 offset 后消费,有可能会造成数据的漏消费。

场景1: 重复消费。 自动提交offset引起 (对一个消息接受的ack并未收到)。

Consuming error

场景2: 漏消费。 设置offset为手动提交, 当offset被提交时, 数据还在内存中未落盘, 此时刚好消费者线程被kill掉, 那么offset已经提交, 但是数据未处理, 导致这部分内存中的数据丢失。

既不漏消费也不重复消费 ?消费者事务

生产经验

消费者事务

如果想完成Consumer端的精准一次性消费, 那么需要Kafka消费端将消费过程和提交offset 过程做原子绑定。 此时我们需要将Kafka的offset保存到支持事务的自定义介质( 比如 MySQL) 。

consumer transaction

数据积压(消费者如何提高吞吐量)

  1. 如果是Kafka消费能力不足, 则可以考虑增加Topic的分区数, 并且同时提升消费组的消费者数量, 消费者数 = 分区数。 (两者缺一不可)
  2. 如果是下游的数据处理不及时: 提高每批次拉取的数量。 批次拉取数据过少(拉取数据/处理时间 < 生产速度) ,使处理的数据小于生产的数据, 也会造成数据积压。
参数名称 描述
fetch.max.bytes 默认 Default: 52428800(50 m)。消费者获取服务器端一批 消息最大的字节数。如果服务器端一批次的数据大于该值 (50m)仍然可以拉取回来这批数据,因此,这不是一个绝 对最大值。一批次的大小受 message.max.bytes (broker config) or max.message.bytes (topic config)影响。
max.poll.records 一次 poll 拉取数据返回消息的最大条数, 默认是 500 条

Kafka-Kraft 模式

Kafka 现有架构, 元数据在 zookeeper 中, 运行时动态选举 controller, 由controller 进行 Kafka 集群管理。

kraft 模式架构(实验性), 不再依赖 zookeeper 集群,而是用三台 controller 节点代替 zookeeper, 元数据保存在 controller 中, 由 controller 直接进行 Kafka 集群管理。

这样做的好处有以下几个:

Kafka 不再依赖外部框架, 而是能够独立运行;

controller 管理集群时, 不再需要从 zookeeper 中先读取数据, 集群性能上升;

由于不依赖 zookeeper, 集群扩展时不再受到 zookeeper 读写能力限制;

controller 不再动态选举, 而是由配置文件规定。 这样我们可以有针对性的加强 controller 节点的配置, 而不是像以前一样对随机 controller 节点的高负载束手无策。

总结 补充

副本数设定

一般我们设置成2个或3个,很多企业设置为2个。

副本的优势:提高可靠性;副本劣势:增加了网络IO传输

Kafka压测

Kafka官方自带压力测试脚本(kafka-consumer-perf-test.sh、kafka-producer-perf-test.sh)。Kafka压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。一般都是网络IO达到瓶颈。

Kafka日志保存时间

默认保存7天;生产环境建议3天

Kafka中数据量计算

每天总数据量100g,每天产生1亿条日志,10000万/24/60/60=1150条/每秒钟

平均每秒钟:1150条

低谷每秒钟:50条

高峰每秒钟:1150条 *(2-20倍)= 2300条 - 23000条

每条日志大小:0.5k - 2k(取1k)

每秒多少数据量:2.0M - 20MB

Kafka的硬盘大小

每天的数据量100g * 2个副本 * 3天 / 70%

Kafka监控

公司自己开发的监控器;

开源的监控器:KafkaManager、KafkaMonitor、KafkaEagle

Kakfa分区数

  1. 创建一个只有1个分区的topic

  2. 测试这个topic的producer吞吐量和consumer吞吐量。

  3. 假设他们的值分别是Tp和Tc,单位可以是MB/s。

  4. 然后假设总的目标吞吐量是Tt,那么分区数=Tt / min(Tp,Tc)

    例如:producer吞吐量 = 20m/s;consumer吞吐量 = 50m/s,期望吞吐量100m/s;

    分区数 = 100 / 20 = 5分区

    https://blog.csdn.net/weixin_42641909/article/details/89294698

    分区数一般设置为:3-10个

多少个Topic

通常情况:多少个日志类型就多少个Topic。也有对日志类型进行合并的。

Kafka挂掉

  1. Flume记录
  2. 日志有记录
  3. 短期没事

Kafka数据重复

幂等性 + ack-1 + 事务

Kafka数据重复,可以再下一级:SparkStreaming、redis或者Hive中dwd层去重,去重的手段:分组、按照id开窗只取第一个值;

Kafka参数优化

  1. Broker参数配置(server.properties)

    1
    2
    3
    1、日志保留策略配置
    # 保留三天,也可以更短 (log.cleaner.delete.retention.ms)
    log.retention.hours=72
    1
    2
    2Replica相关配置
    default.replication.factor:1 默认副本1
    1
    2
    3
    3、网络通信延时	
    replica.socket.timeout.ms:30000 #当集群之间网络不稳定时,调大该参数
    replica.lag.time.max.ms= 600000# 如果网络不好,或者kafka集群压力较大,会出现副本丢失,然后会频繁复制副本,导致集群压力更大,此时可以调大该参数
  2. Producer优化(producer.properties)

    1
    2
    compression.type:none                 gzip  snappy  lz4  
    #默认发送不进行压缩,推荐配置一种适合的压缩算法,可以大幅度的减缓网络压力和Broker的存储压力。
  3. Kafka内存调整(kafka-server-start.sh

    默认内存1个G,生产环境尽量不要超过6个G。

    1
    export KAFKA_HEAP_OPTS="-Xms4g -Xmx4g"

Kafka单条日志传输大小

Kafka对于消息体的大小默认为单条最大值是1M但是在我们应用场景中,常常会出现一条消息大于1M,如果不对Kafka进行配置。则会出现生产者无法将消息推送到Kafka或消费者无法去消费Kafka里面的数据,这时我们就要对Kafka进行以下配置:server.properties

1
2
replica.fetch.max.bytes: 1048576  broker可复制的消息的最大字节数, 默认为1M
message.max.bytes: 1000012 kafka 会接收单个消息size的最大限制, 默认为1M左右

注意:message.max.bytes必须小于等于replica.fetch.max.bytes,否则就会导致replica之间数据同步失败。

Kafka过期数据清理

保证数据没有被引用(没人消费他)

日志清理保存的策略只有delete和compact两种

log.cleanup.policy = delete启用删除策略

log.cleanup.policy = compact启用压缩策略

https://www.jianshu.com/p/fa6adeae8eb5

Kafka消费者角度考虑是拉取数据还是推送数据

拉取数据

Kafka中的数据是有序的吗

单分区内有序;多分区,分区与分区间无序;

扩展:

kafka producer发送消息的时候,可以指定key:

key

这个key的作用是为消息选择存储分区,key可以为空,当指定key且不为空的时候,Kafka是根据key的hash值与分区数取模来决定数据存储到那个分区。

solution

有序解决方案:同一张表的数据放到同一个分区

​ => ProducerRecord里传入key,会根据key取hash算出分区号

​ => key使用表名,如果有库名,拼接上库名


Kafka
http://example.com/2022/07/17/Kafka/
作者
Zhao Zhuoyue
发布于
2022年7月17日
许可协议