Kafka实战
01、Kafka:为什么要使用消息队列
02、Kafka:消息队列的流派
03、Kafka:安装Kafka服务器
04、Kafka:实现生产者和消费者
05、Kafka:消息的偏移量和顺序消费原理
06、Kafka:单播和多播消息的实现
07、Kafka:主题和分区的概念
08、Kafka:搭建Kafka集群
09、Kafka:副本的概念
10、Kafka:集群消费问题
11、Kafka:Java中Kafka生产者的基本实现
12、Kafka:生产者端的同步发送和异步发送
13、Kafka:生产者中的ack配置
14、Kafka:发送消息的缓冲区机制
15、Kafka:消费者消费消息的基本实现
16、Kafka:Offset的自动提交和手动提交
17、Kafka:消费者poll消息的细节与消费者心跳配置
18、Kafka:指定分区和偏移量,时间消费
19、Kafka:新消费组的消费offset规则
20、Kafka:SpringBoot中使用Kafka的基本实现
21、Kafka:消费者的配置细节
22、Kafka:Kafka中Controller,Rebalance,HW,LEO的概念
23、Kafka:Kafka优化之防止消息丢失和重复消费
24、Kafka:Kafka优化之顺序消费的实现
25、Kafka:Kafka优化之解决消息积压问题
26、Kafka:Kafka优化之实现延时队列
27、Kafka:Kafka-eagle监控平台
28、Kafka:Linux部署Kafka集群
29、Kafka:Docker-compose部署Kafka集群
本文档使用 MrDoc 发布
-
+
首页
12、Kafka:生产者端的同步发送和异步发送
### **生产者端的同步发送和异步发送** ------------ **生产者的同步发送**  如果⽣产者发送消息没有收到ack,⽣产者会阻塞,阻塞到3s的时间,如果还没有收到消息,会进⾏重试。重试的次数3次。 ```python RecordMetadata metadata = producer.send(producerRecord).get(); System.out.println("同步⽅式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset()); ``` **生产者的异步发送消息**  异步发送,⽣产者发送完消息后就可以执⾏之后的业务,broker在收到消息后异步调⽤⽣产者提供的callback回调⽅法。 ```python //5.异步发送消息 producer.send(producerRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { System.err.println("发送消息失败:" + exception.getStackTrace()); } if (metadata != null) { System.out.println("异步⽅式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset()); } } }); ``` **测试** ```python package com.example.kafka; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; public class MySimpleProducer { private final static String TOPIC_NAME = "test"; public static void main(String[] args) throws ExecutionException, InterruptedException { //1.设置参数 Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "81.68.232.188:9092,81.68.232.188:9093,81.68.232.188:9094"); //把发送的key从字符串序列化为字节数组 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //把发送消息value从字符串序列化为字节数组 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //2.创建⽣产消息的客户端,传⼊参数 Producer<String, String> producer = new KafkaProducer<>(props); //3.创建消息 //key:作⽤是决定了往哪个分区上发,value:具体要发送的消息内容 ProducerRecord<String, String> producerRecord = new ProducerRecord<> (TOPIC_NAME,1, "key1", "hello, kafka"); //4.发送消息,得到消息发送的元数据并输出 // RecordMetadata metadata = producer.send(producerRecord).get(); // System.out.println("同步⽅式发送消息结果:" + "topic-" + // metadata.topic() + "|partition-" + metadata.partition() // + "|offset-" + metadata.offset()); //5.异步发送消息 producer.send(producerRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { System.err.println("发送消息失败:" + exception.getStackTrace()); } if (metadata != null) { System.out.println("异步⽅式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset()); } } }); Thread.sleep(100000); } } ```  **注意**  ### **总结** 异步发送会存在数据丢失的问题,同步发送更为常用
李智
2025年3月17日 13:29
转发文档
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
分享
链接
类型
密码
更新密码