Kafka实战
01、Kafka:为什么要使用消息队列
02、Kafka:消息队列的流派
03、Kafka:安装Kafka服务器
04、Kafka:实现生产者和消费者
05、Kafka:消息的偏移量和顺序消费原理
06、Kafka:单播和多播消息的实现
07、Kafka:主题和分区的概念
08、Kafka:搭建Kafka集群
09、Kafka:副本的概念
10、Kafka:集群消费问题
11、Kafka:Java中Kafka生产者的基本实现
12、Kafka:生产者端的同步发送和异步发送
13、Kafka:生产者中的ack配置
14、Kafka:发送消息的缓冲区机制
15、Kafka:消费者消费消息的基本实现
16、Kafka:Offset的自动提交和手动提交
17、Kafka:消费者poll消息的细节与消费者心跳配置
18、Kafka:指定分区和偏移量,时间消费
19、Kafka:新消费组的消费offset规则
20、Kafka:SpringBoot中使用Kafka的基本实现
21、Kafka:消费者的配置细节
22、Kafka:Kafka中Controller,Rebalance,HW,LEO的概念
23、Kafka:Kafka优化之防止消息丢失和重复消费
24、Kafka:Kafka优化之顺序消费的实现
25、Kafka:Kafka优化之解决消息积压问题
26、Kafka:Kafka优化之实现延时队列
27、Kafka:Kafka-eagle监控平台
28、Kafka:Linux部署Kafka集群
29、Kafka:Docker-compose部署Kafka集群
本文档使用 MrDoc 发布
-
+
首页
11、Kafka:Java中Kafka生产者的基本实现
### **Java中Kafka生产者的基本实现** ------------ **生产者的基本实现** 引入依赖 ```python <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency> ```  具体实现 设置参数-》创建生产者客户端-》创建消息-》发送消息 ```python package com.example.kafka; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; public class MySimpleProducer { private final static String TOPIC_NAME = "test"; public static void main(String[] args) throws ExecutionException, InterruptedException { //1.设置参数 Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "81.68.232.188:9092,81.68.232.188:9093,81.68.232.188:9094"); //把发送的key从字符串序列化为字节数组 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //把发送消息value从字符串序列化为字节数组 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //2.创建⽣产消息的客户端,传⼊参数 Producer<String, String> producer = new KafkaProducer<>(props); //3.创建消息 //key:作⽤是决定了往哪个分区上发,value:具体要发送的消息内容 ProducerRecord<String, String> producerRecord = new ProducerRecord<> (TOPIC_NAME, "key1", "hello, kafka"); //4.发送消息,得到消息发送的元数据并输出 RecordMetadata metadata = producer.send(producerRecord).get(); System.out.println("同步⽅式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset()); } } ```  **指定发送主题分区** ```python ProducerRecord < String, String > producerRecord = new ProducerRecord < String, String > (TOPIC_NAME, 0, order.getOrderId().toString(), JSON.toJSONString(order)); ```
李智
2025年3月17日 13:29
转发文档
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
分享
链接
类型
密码
更新密码