Redis进阶
01、Redis进阶:Nosql数据库简介
02、Redis进阶:多种方式安装Redis6.2.2
03、Redis进阶:Redis常用五大数据类型
04、Redis进阶:Redis三种特殊数据类型之Bitmaps、HyperLogLog、Geospatial
05、Redis进阶:Redis持久化规则之RDB及AOF
06、Redis进阶:Redis发布订阅及SpringBoot集成Redis实现发布订阅消息
07、Redis进阶:Redis事务详解
08、Redis进阶:Redis之主从复制详解
09、Redis进阶:Redis之哨兵模式(sentinel)详解
10、Redis进阶:Redis集群搭建
11、Redis进阶:Redis6新特性
12、Redis进阶:SpringBoot集成Redis环境搭建及配置详解
13、Redis进阶:RedisTemplate操作Redis详解之连接Redis及自定义序列化
14、Redis进阶:RedisTemplate操作Redis之API详解
15、Redis进阶:Redis之面试常问缓存穿透+缓存击穿+缓存雪崩
本文档使用 MrDoc 发布
-
+
首页
06、Redis进阶:Redis发布订阅及SpringBoot集成Redis实现发布订阅消息
### 发布订阅 ------------ 在软件架构中,发布/订阅是一种消息模式,消息的发送者不会将消息直接发送给特定的接收者,而是通过消息通道广播出去,让订阅该消息主题的订阅者消费到。 客户端可以订阅频道如下图:  当给这个频道发布消息后,消息就会发送给订阅的客户端:  Redis提供了发布订阅功能,可以用于消息的传输,Redis的发布订阅机制包括三个部分:发布者(Publisher),订阅者(Subscriber)和频道(Channel)。 ### 命令行实现 ------------ `SUBSCRIBE channel [channel …]`:订阅给指定频道的信息。一旦客户端进入订阅状态,客户端就只可接受订阅相关的命令SUBSCRIBE、PSUBSCRIBE、UNSUBSCRIBE和PUNSUBSCRIBE除了这些命令,其他命令一律失效。 `PUBLISH channel message`:将信息 message 发送到指定的频道 channel。 ```c # 订阅频道 127.0.0.1:6379> subscribe demo-channel Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "demo-channel" 3) (integer) 1 # 开启另外一个客户端,发送信息 127.0.0.1:6379> publish demo-channel hello (integer) 1 # 订阅的客户端会接收到消息 1) "message" 2) "demo-channel" 3) "hello" ``` ### Spring Boot集成Redis实现发布订阅消息 [代码地址](https://gitee.com/pearl88/study-demo/tree/master/spring-boot-redis-demo "代码地址") Spring Data为Redis提供了专用的消息传递集成,其功能和命名与Spring Framework中的JMS集成相似。 Redis消息传递可以大致分为两个功能区域: - 消息的发布或产生 - 订阅或消费消息 这是通常称为“发布/订阅”(简称“发布/订阅”)的模式的示例。所述RedisTemplate类用于消息生成。对于类似于Java EE的消息驱动bean样式的异步接收,Spring Data提供了一个专用的消息侦听器容器,该容器用于创建消息驱动的POJO(MDP),并用于同步接收RedisConnection。 在org.springframework.data.redis.connection和org.springframework.data.redis.listener软件包提供了对Redis的消息的核心功能。 ### 初始化项目 1、 创建一个普通SpringBoot项目; 2、 添加pom; ```c <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.4.5</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>org.pearl</groupId> <artifactId>spring-boot-redis-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-boot-redis-demo</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> ``` 添加配置; ```c server.port=9000 spring.application.name=spring-boot-redis-demo spring.redis.host=127.0.0.1 spring.redis.port=6379 spring.redis.password=123456 ``` 初始化RedisTemplate; ```c @Configuration @EnableScheduling public class RedisConfig { /** * 创建RedisTemplate * * @param redisConnectionFactory 连接工厂 * @return RedisTemplate */ @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); jackson2JsonRedisSerializer.setObjectMapper(objectMapper); // 序列化 redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.setHashKeySerializer(jackson2JsonRedisSerializer); redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); redisTemplate.afterPropertiesSet(); return redisTemplate; } } ``` ### 消息发布 要发布消息,您可以与其他操作一起使用低级RedisConnection或高级RedisTemplate。这两个实体都提供了该publish方法,该方法接受消息和目标通道作为参数。虽然RedisConnection需要原始数据(字节数组),但是RedisTemplate让任意对象作为消息传递 创建一个定时任务循环发送消息; ```c @Component public class PubMsgTask { @Autowired RedisTemplate<String, Object> redisTemplate; /** * 每秒向redis-channel同调发送一Hello World */ @Scheduled(cron = "* * * * * ?") public void pubMsg() { System.out.println("redisTemplate正在发送消息"); redisTemplate.convertAndSend("redis-channel", "Hello World"); } } ``` ### 消息订阅 在接收方,可以通过直接命名一个频道或多个频道或使用模式匹配来订阅一个或多个频道。 由于其阻塞性质,低级订阅并不吸引人,因为它需要每个侦听器都进行连接和线程管理。为了减轻这个问题,Spring Data提供了RedisMessageListenerContainer,它可以完成所有繁重的工作。 RedisMessageListenerContainer充当消息侦听器容器。它用于接收来自Redis通道的消息并驱动MessageListener注入到该通道中的实例。侦听器容器负责消息接收的所有线程,并分派到侦听器中进行处理。消息侦听器容器是MDP与消息传递提供程序之间的中介,并负责注册接收消息,资源获取和释放,异常转换等。 创建消息监听类; ```c @Component public class SubMsgListener implements MessageListener { @Override public void onMessage(Message message, byte[] bytes) { System.out.println("收到消息:" + new String(message.getBody())); } } ``` 创建消息侦听器容器、消息监听适配器; ```c @Autowired SubMsgListener subMsgListener; @Autowired MessageListenerAdapter messageListener; /** * 消息侦听器容器 * * @param factory 连接工厂 * @return RedisMessageListenerContainer */ @Bean RedisMessageListenerContainer redisContainer(final RedisConnectionFactory factory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(factory); container.addMessageListener(messageListener, new ChannelTopic("redis-channel")); return container; } /** * 消息监听适配器 * MessageListenerAdapter * * @return */ @Bean MessageListenerAdapter messageListener() { return new MessageListenerAdapter(subMsgListener); } ``` 启动项目,验证;  ### 消息丢失问题 **1. Redis的输出缓冲限制** Redis为每个客户端分配了输出缓冲区(output buffer)。处理完客户端的命令后,Redis将返回结果临时储存在输出缓冲区中,然后将这些数据发送给客户端。如果不对输出缓冲区的大小做出限制,输出缓冲区可能积累大量数据,甚至达到最大内存限制,导致服务崩溃。以下是出现该问题的两种典型场景。 - 客户端命令的返回值过大。 - 发布者(publisher)发布消息的速度大于订阅者(subscriber)消费消息的速度。 当缓冲数据超过限制时,Redis将断开客户端的连接,防止这些数据占用过多的内存,影响Redis服务的性能。换言之,消费消息的速度过慢,会导致输出缓冲区堆积数据过多,默认大小限制是8M,当输出缓冲区超过8M时,会关闭连接,导致消息丢失。 通过自定义`client-output-buffer-limit pubsub`的值,您可以调整Redis为发布订阅客户端分配的缓冲区大小。 `client-output-buffer-limit pubsub`包含三个选项:`hard limit`、`soft limit`和`soft seconds`。 - hard limit指定一个强限制值,单位为Byte。当一个发布订阅客户端的输出缓冲区占用内存达到或超过hard limit的限制时,Redis断开该客户端的连接。 - soft limit指定一个弱限制值,单位为Byte;soft seconds指定持续触发弱限制的时间,单位为s。当一个发布订阅客户端的输出缓冲区占用内存达到或超过soft limit的限制,且该状态的持续时间达到soft seconds限定的秒数时,Redis断开该客户端的连接。 **2. 订阅者断开连接** 发布/订阅模式时,channel只接收publish发送的消息,自身不存储消息,如果channel没有被订阅,则消息丢弃,订阅的消费者需要一直在线,阻塞获取消息,连接断开表示立即退订。如果订阅者不在线(服务没有启动)消息将丢失,消息没有持久化。  ### 总结 ------------ Redis发布订阅 提供了简单的消息队列功能,但实际应用较少,因为其模式单一、消息丢失性高,但其简单易用,常用于日志等不重要的数据传输。
李智
2025年3月17日 13:34
转发文档
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
分享
链接
类型
密码
更新密码