Kafka实战
01、Kafka:为什么要使用消息队列
02、Kafka:消息队列的流派
03、Kafka:安装Kafka服务器
04、Kafka:实现生产者和消费者
05、Kafka:消息的偏移量和顺序消费原理
06、Kafka:单播和多播消息的实现
07、Kafka:主题和分区的概念
08、Kafka:搭建Kafka集群
09、Kafka:副本的概念
10、Kafka:集群消费问题
11、Kafka:Java中Kafka生产者的基本实现
12、Kafka:生产者端的同步发送和异步发送
13、Kafka:生产者中的ack配置
14、Kafka:发送消息的缓冲区机制
15、Kafka:消费者消费消息的基本实现
16、Kafka:Offset的自动提交和手动提交
17、Kafka:消费者poll消息的细节与消费者心跳配置
18、Kafka:指定分区和偏移量,时间消费
19、Kafka:新消费组的消费offset规则
20、Kafka:SpringBoot中使用Kafka的基本实现
21、Kafka:消费者的配置细节
22、Kafka:Kafka中Controller,Rebalance,HW,LEO的概念
23、Kafka:Kafka优化之防止消息丢失和重复消费
24、Kafka:Kafka优化之顺序消费的实现
25、Kafka:Kafka优化之解决消息积压问题
26、Kafka:Kafka优化之实现延时队列
27、Kafka:Kafka-eagle监控平台
28、Kafka:Linux部署Kafka集群
29、Kafka:Docker-compose部署Kafka集群
本文档使用 MrDoc 发布
-
+
首页
20、Kafka:SpringBoot中使用Kafka的基本实现
### **SpringBoot中使用Kafka的基本实现** **POM** ```python <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.8</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>kafka-springboot</artifactId> <version>0.0.1-SNAPSHOT</version> <name>kafka-springboot</name> <description>kafka-springboot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <excludes> <exclude> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins> </build> </project> ``` **YML** ```python server: port: 80 spring: kafka: bootstrap-servers: 81.68.232.188:9092,81.68.232.188:9093,81.68.232.188:9094 producer: # ⽣产者 retries: 3 # 设置⼤于0的值,则客户端会将发送失败的记录重新发送 batch-size: 16384 buffer-memory: 33554432 acks: 1 # 指定消息key和消息体的编解码⽅式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: default-group enable-auto-commit: false auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 500 listener: # 当每⼀条记录被消费者监听器(ListenerConsumer)处理之后提交 # RECORD # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交 # BATCH # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上 # 次提交时间⼤于TIME时提交 # TIME # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理 # record数量⼤于等于COUNT时提交 # COUNT # TIME | COUNT 有⼀个条件满⾜时提交 # COUNT_TIME # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后, ⼿动调 # ⽤Acknowledgment.acknowledge()后提交 # MANUAL # ⼿动调⽤Acknowledgment.acknowledge()后⽴即提交,⼀般使⽤这种 # MANUAL_IMMEDIATE ack-mode: MANUAL_IMMEDIATE # redis: # host: 172.16.253.21 ``` **LOGBACK** ```python <?xml version="1.0" encoding="UTF-8"?> <configuration> <logger name="org.apache.kafka.clients" level="info" /> </configuration> ``` **生产者** ```python package com.example.kafka.springboot.controller; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @RestController @RequestMapping("/msg") public class MyKafkaController { private final static String TOPIC_NAME = "test"; @Resource private KafkaTemplate<String,String> kafkaTemplate; @RequestMapping("/send") public String sendMessage(){ kafkaTemplate.send(TOPIC_NAME,0,"key","this is a message!"); return "send success!"; } } ``` **消费者** ```python package com.example.kafka.springboot.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; @Component public class MyConsumer { @KafkaListener(topics = "test",groupId = "default-group") public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) { String value = record.value(); System.out.println(value); System.out.println(record); //⼿动提交offset ack.acknowledge(); } } ``` **测试** 通过Restful向/msg/send接口发请求,消费者会实时收到消息,可以做简单的即时通讯软件 
李智
2025年3月17日 13:29
转发文档
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
分享
链接
类型
密码
更新密码