Kafka实战
01、Kafka:为什么要使用消息队列
02、Kafka:消息队列的流派
03、Kafka:安装Kafka服务器
04、Kafka:实现生产者和消费者
05、Kafka:消息的偏移量和顺序消费原理
06、Kafka:单播和多播消息的实现
07、Kafka:主题和分区的概念
08、Kafka:搭建Kafka集群
09、Kafka:副本的概念
10、Kafka:集群消费问题
11、Kafka:Java中Kafka生产者的基本实现
12、Kafka:生产者端的同步发送和异步发送
13、Kafka:生产者中的ack配置
14、Kafka:发送消息的缓冲区机制
15、Kafka:消费者消费消息的基本实现
16、Kafka:Offset的自动提交和手动提交
17、Kafka:消费者poll消息的细节与消费者心跳配置
18、Kafka:指定分区和偏移量,时间消费
19、Kafka:新消费组的消费offset规则
20、Kafka:SpringBoot中使用Kafka的基本实现
21、Kafka:消费者的配置细节
22、Kafka:Kafka中Controller,Rebalance,HW,LEO的概念
23、Kafka:Kafka优化之防止消息丢失和重复消费
24、Kafka:Kafka优化之顺序消费的实现
25、Kafka:Kafka优化之解决消息积压问题
26、Kafka:Kafka优化之实现延时队列
27、Kafka:Kafka-eagle监控平台
28、Kafka:Linux部署Kafka集群
29、Kafka:Docker-compose部署Kafka集群
本文档使用 MrDoc 发布
-
+
首页
17、Kafka:消费者poll消息的细节与消费者心跳配置
### **消费者poll消息的细节与消费者心跳配置** **长轮询poll消息** 默认情况下,消费者⼀次会poll500条消息。 ```python //⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); ``` 代码中设置了⻓轮询的时间是1000毫秒 ```python while (true) { /* * 3.poll() API 是拉取消息的⻓轮询 */ ConsumerRecords< String, String > records = consumer.poll(Duration.ofMillis(10000)); for (ConsumerRecord< String, String > record: records) { //4.打印消息 System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n ", record.partition(), record.offset(), record.key(), record.value()); } //所有的消息已消费完 if (records.count() > 0) { //有消息 // ⼿动同步提交offset,当前线程会阻塞直到offset提交成功 // ⼀般使⽤同步提交,因为提交之后⼀般也没有什么逻辑代码了 consumer.commitSync();//=======阻塞=== 提交成功 } } ``` 意味着: - 如果⼀次poll到500条,就直接执⾏for循环 - 如果这⼀次没有poll到500条。且时间在1秒内,那么⻓轮询继续poll,要么到500条,要么到1s - 如果多次poll都没达到500条,且1秒时间到了,那么直接执⾏for循环 - 如果两次poll的间隔超过30s,集群会认为该消费者的消费能⼒过弱,该消费者被踢出消费组,触发rebalance机制,rebalance机制会造成性能开销。可以通过设置这个参数,让⼀次poll的消息条数少⼀点 ```python //⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); //如果两次poll的时间如果超出了30s的时间间隔,kafka会认为其消费能⼒过弱,将其踢出消费组。将分区分配给其他消费者。-rebalance props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000); ``` ### **消费者心跳检测配置** ```python //consumer给broker发送⼼跳的间隔时间 props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000); //kafka如果超过10秒没有收到消费者的⼼跳,则会把消费者踢出消费组,进⾏ rebalance, //把分区分配给其他消费者。 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000); ```
李智
2025年3月17日 13:29
转发文档
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
分享
链接
类型
密码
更新密码