iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >Spring中怎么实现响应式Redis交互
  • 670
分享到

Spring中怎么实现响应式Redis交互

2023-06-05 03:06:27 670人浏览 薄情痞子
摘要

今天小编给大家分享一下spring中怎么实现响应式Redis交互的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。本文将模拟一个

今天小编给大家分享一下spring中怎么实现响应式Redis交互的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。

本文将模拟一个用户服务,并使用Redis作为数据存储服务器
涉及两个java bean,用户与权益

public class User {    private long id;    private String name;    // 标签    private String label;    // 收货地址经度    private Double deliveryAddressLon;    // 收货地址维度    private Double deliveryAddressLat;    // 最新签到日    private String lastSigninDay;    // 积分    private Integer score;    // 权益    private List<Rights> rights;    ...}public class Rights {    private Long id;    private Long userId;    private String name;    ...}

启动

引入依赖

    <dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-data-redis-Reactive</artifactId>    </dependency>

添加Redis配置

spring.redis.host=192.168.56.102spring.redis.port=6379spring.redis.passWord=spring.redis.timeout=5000

SpringBoot启动

@SpringBootApplicationpublic class UserServiceReactive {    public static void main(String[] args) {        new SpringApplicationBuilder(                UserServiceReactive.class)                .WEB(WebApplicationType.REACTIVE).run(args);    }}

应用启动后,Spring会自动生成ReactiveRedisTemplate(它的底层框架是Lettuce)。
ReactiveRedisTemplate与RedisTemplate使用类似,但它提供的是异步的,响应式Redis交互方式。
这里再强调一下,响应式编程是异步的,ReactiveRedisTemplate发送Redis请求后不会阻塞线程,当前线程可以去执行其他任务。
等到Redis响应数据返回后,ReactiveRedisTemplate再调度线程处理响应数据。
响应式编程可以通过优雅的方式实现异步调用以及处理异步结果,正是它的最大的意义。

序列化

ReactiveRedisTemplate默认使用的序列化是jdk序列化,我们可以配置为JSON序列化

@Beanpublic RedisSerializationContext redisSerializationContext() {    RedisSerializationContext.RedisSerializationContextBuilder builder = RedisSerializationContext.newSerializationContext();    builder.key(StringRedisSerializer.UTF_8);    builder.value(RedisSerializer.json());    builder.hashKey(StringRedisSerializer.UTF_8);    builder.hashValue(StringRedisSerializer.UTF_8);    return builder.build();}@Beanpublic ReactiveRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) {    RedisSerializationContext serializationContext = redisSerializationContext();    ReactiveRedisTemplate reactiveRedisTemplate = new ReactiveRedisTemplate(connectionFactory,serializationContext);    return reactiveRedisTemplate;}

builder.hashValue方法指定Redis列表值的序列化方式,由于本文Redis列表值只存放字符串,所以还是设置为StringRedisSerializer.UTF_8。

基本数据类型

ReactiveRedisTemplate支持Redis字符串,散列,列表,集合,有序集合等基本的数据类型。
本文使用散列保存用户信息,列表保存用户权益,其他基本数据类型的使用本文不展开。

public Mono<Boolean>  save(User user) {    ReactiveHashOperations<String, String, String> opsForHash = redisTemplate.opsForHash();    Mono<Boolean>  userRs = opsForHash.putAll("user:" + user.getId(), beanToMap(user));    if(user.getRights() != null) {        ReactiveListOperations<String, Rights> opsForRights = redisTemplate.opsForList();        opsForRights.leftPushAll("user:rights:" + user.getId(), user.getRights()).subscribe(l -> {            logger.info("add rights:{}", l);        });    }    return userRs;}

beanToMap方法负责将User类转化为map。

HyperLogLog

Redis HyperLogLog结构可以统计一个集合内不同元素的数量。
使用HyperLogLog统计每天登录的用户量

public Mono<Long>  login(User user) {    ReactiveHyperLogLoGoperations<String, Long> opsForHyperLogLog = redisTemplate.opsForHyperLogLog();    return opsForHyperLogLog.add("user:login:number:" + LocalDateTime.now().toString().substring(0, 10), user.getId());}

BitMap

Redis BitMap(位图)通过一个Bit位表示某个元素对应的值或者状态。由于Bit是计算机存储中最小的单位,使用它进行储存将非常节省空间。
使用BitMap记录用户本周是否有签到

public void addSignInFlag(long userId) {    String key = "user:signIn:" + LocalDateTime.now().getDayOfYear()/7 + (userId >> 16);    redisTemplate.opsForValue().setBit(            key, userId & 0xffff , true)    .subscribe(b -> logger.info("set:{},result:{}", key, b));}

userId高48位用于将用户划分到不同的key,低16位作为位图偏移参数offset。
offset参数必须大于或等于0,小于2^32(bit 映射被限制在 512 MB 之内)。

Geo

Redis Geo可以存储地理位置信息,并对地理位置进行计算。
如查找给定范围内的仓库信息

public Flux getWarehouseInDist(User u, double dist) {    ReactiveGeoOperations<String, String> geo = redisTemplate.opsForGeo();    Circle circle = new Circle(new Point(u.getDeliveryAddressLon(), u.getDeliveryAddressLat()), dist);    RedisGeoCommands.GeoRadiusCommandArgs args =            RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().sortAscending();    return geo.radius("warehouse:address", circle, args);}

warehouse:address这个集合中需要先保存好仓库地理位置信息。
ReactiveGeoOperations#radius方法可以查找集合中地理位置在给定范围内的元素,它中还支持添加元素到集合,计算集合中两个元素地理位置距离等操作。

lua

ReactiveRedisTemplate也可以执行Lua脚本。
下面通过Lua脚本完成用户签到逻辑:如果用户今天未签到,允许签到,积分加1,如果用户今天已签到,则拒接操作。

public Flux<String> addScore(long userId) {    DefaultRedisScript<String> script = new DefaultRedisScript<>();    script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/signin.lua")));    List<String> keys = new ArrayList<>();    keys.add(String.valueOf(userId));    keys.add(LocalDateTime.now().toString().substring(0, 10));    return redisTemplate.execute(script, keys);}

signin.lua内容如下

local score=redis.call('hget','user:'..KEYS[1],'score')local day=redis.call('hget','user:'..KEYS[1],'lastSigninDay')if(day==KEYS[2])    then    return '0'else    redis.call('hset','user:'..KEYS[1],'score', score+1,'lastSigninDay',KEYS[2])    return '1'end

Stream

Redis Stream 是 Redis 5.0 版本新增加的数据类型。该类型可以实现消息队列,并提供消息的持久化和主备复制功能,并且可以记住每一个客户端的访问位置,还能保证消息不丢失。

Redis借鉴了kafka的设计,一个Stream内可以存在多个消费组,一个消费组内可以存在多个消费者。
如果一个消费组内某个消费者消费了Stream中某条消息,则这消息不会被该消费组其他消费者消费到,当然,它还可以被其他消费组中某个消费者消费到。

下面定义一个Stream消费者,负责处理接收到的权益数据

@Componentpublic class RightsStreamConsumer implements ApplicationRunner, DisposableBean {    private static final Logger logger = LoggerFactory.getLogger(RightsStreamConsumer.class);    @Autowired    private RedisConnectionFactory redisConnectionFactory;    private StreamMessageListenerContainer<String, ObjectRecord<String, Rights>> container;    // Stream队列    private static final String STREAM_KEY = "stream:user:rights";    // 消费组    private static final String STREAM_GROUP = "user-service";    // 消费者    private static final String STREAM_CONSUMER = "consumer-1";    @Autowired    @Qualifier("reactiveRedisTemplate")    private ReactiveRedisTemplate redisTemplate;    public void run(ApplicationArguments args) throws Exception {        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Rights>> options =                StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()                        .batchSize(100) //一批次拉取的最大count数                        .executor(Executors.newSingleThreadExecutor())  //线程池                        .pollTimeout(Duration.ZERO) //阻塞式轮询                        .targetType(Rights.class) //目标类型(消息内容的类型)                        .build();        // 创建一个消息监听容器        container = StreamMessageListenerContainer.create(redisConnectionFactory, options);        // prepareStreamAndGroup查找Stream信息,如果不存在,则创建Stream        prepareStreamAndGroup(redisTemplate.opsForStream(), STREAM_KEY , STREAM_GROUP)                .subscribe(stream -> {            // 为Stream创建一个消费者,并绑定处理类            container.receive(Consumer.from(STREAM_GROUP, STREAM_CONSUMER),                    StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()),                    new StreamMessageListener());            container.start();        });    }    @Override    public void destroy() throws Exception {        container.stop();    }    // 查找Stream信息,如果不存在,则创建Stream    private Mono<StreamInfo.XInfoStream> prepareStreamAndGroup(ReactiveStreamOperations<String, ?, ?> ops, String stream, String group) {        // info方法查询Stream信息,如果该Stream不存在,底层会报错,这时会调用onErrorResume方法。        return ops.info(stream).onErrorResume(err -> {            logger.warn("query stream err:{}", err.getMessage());            // createGroup方法创建Stream            return ops.createGroup(stream, group).flatMap(s -> ops.info(stream));        });    }    // 消息处理对象    class  StreamMessageListener implements StreamListener<String, ObjectRecord<String, Rights>> {        public void onMessage(ObjectRecord<String, Rights> message) {            // 处理消息            RecordId id = message.getId();            Rights rights = message.getValue();            logger.info("receive id:{},rights:{}", id, rights);            redisTemplate.opsForList().leftPush("user:rights:" + rights.getUserId(), rights).subscribe(l -> {                logger.info("add rights:{}", l);            });        }    }}

下面看一下如何发送信息

public Mono<RecordId> addRights(Rights r) {    String streamKey = "stream:user:rights";//stream key    ObjectRecord<String, Rights> record = ObjectRecord.create(streamKey, r);    Mono<RecordId> mono = redisTemplate.opsForStream().add(record);    return mono;}

创建一个消息记录对象ObjectRecord,并通过ReactiveStreamOperations发送信息记录。

Sentinel、Cluster

ReactiveRedisTemplate也支持Redis Sentinel、Cluster集群模式,只需要调整配置即可。
Sentinel配置如下

spring.redis.sentinel.master=mymasterspring.redis.sentinel.nodes=172.17.0.4:26379,172.17.0.5:26379,172.17.0.6:26379spring.redis.sentinel.password=

spring.redis.sentinel.nodes配置的是Sentinel节点IP地址和端口,不是Redis实例节点IP地址和端口。

Cluster配置如下

spring.redis.cluster.nodes=172.17.0.2:6379,172.17.0.3:6379,172.17.0.4:6379,172.17.0.5:6379,172.17.0.6:6379,172.17.0.7:6379spring.redis.lettuce.cluster.refresh.period=10000spring.redis.lettuce.cluster.refresh.adaptive=true

如Redis Cluster中node2是node1的从节点,Lettuce中会缓存该信息,当node1宕机后,Redis Cluster会将node2升级为主节点。但Lettuce不会自动将请求切换到node2,因为它的缓冲没有刷新。
开启spring.redis.lettuce.cluster.refresh.adaptive配置,Lettuce可以定时刷新Redis Cluster集群缓存信息,动态改变客户端的节点情况,完成故障转移。

暂时未发现ReactiveRedisTemplate实现pipeline,事务的方案。

以上就是“Spring中怎么实现响应式Redis交互”这篇文章的所有内容,感谢各位的阅读!相信大家阅读完这篇文章都有很大的收获,小编每天都会为大家更新不同的知识,如果还想学习更多的知识,请关注编程网精选频道。

--结束END--

本文标题: Spring中怎么实现响应式Redis交互

本文链接: https://www.lsjlt.com/news/240845.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • Spring中怎么实现响应式Redis交互
    今天小编给大家分享一下Spring中怎么实现响应式Redis交互的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。本文将模拟一个...
    99+
    2023-06-05
  • python怎么使用redis模块来跟redis实现交互
    本篇内容主要讲解“python怎么使用redis模块来跟redis实现交互”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“python怎么使用redis模块来跟redis实现交互”吧!redis模...
    99+
    2023-07-02
  • Hive怎么实现交互式查询数据
    Hive是一个基于Hadoop的数据仓库工具,它可以让用户通过类SQL语言来进行查询数据。为了实现交互式查询数据,可以使用Hive的...
    99+
    2024-04-03
    Hive
  • css3中怎么实现响应式导航
    这篇文章将为大家详细讲解有关css3中怎么实现响应式导航,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。  html代码:代码如下:<div class...
    99+
    2024-04-02
  • spring redis分布式锁怎么实现
    在Spring中实现Redis分布式锁可以使用RedisTemplate来操作Redis进行加锁和解锁。 首先,我们需要定义一个分布...
    99+
    2023-10-27
    spring redis
  • Flutter中实现交互式Webview的方法
    前言: Flutter是一款强大的跨平台移动应用开发框架,而Webview则是在应用中展示Web内容的重要组件。本文将介绍如何在Flutter应用中实现交互式的Webview,以便为用户提供更加丰...
    99+
    2023-09-10
    flutter
  • vue+element怎么实现响应式
    本篇内容介绍了“vue+element怎么实现响应式”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!一、如何使用Element UI在Vue项...
    99+
    2023-07-05
  • 怎么实现Vue的响应式
    这篇文章主要介绍了怎么实现Vue的响应式,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。简易版 以watch为切入点watch是平时开发中使用...
    99+
    2024-04-02
  • Node中怎么实现前后端交互
    Node中怎么实现前后端交互,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。index.html:<!doctype> <...
    99+
    2024-04-02
  • 怎么在Java中实现内存交互
    这期内容当中小编将会给大家带来有关怎么在Java中实现内存交互,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。Java的特点有哪些Java的特点有哪些1.Java语言作为静态面向对象编程语言的代表,实现了面...
    99+
    2023-06-14
  • VB6.0中怎么实现多窗体交互
    这期内容当中小编将会给大家带来有关VB6.0中怎么实现多窗体交互,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。1、保持窗体引用的全局性前面提到,进行 .NET 窗体编程时应该牢牢把握下列原则:在访问窗体之...
    99+
    2023-06-17
  • 怎么在css中实现响应式布局
    今天就跟大家聊聊有关怎么在css中实现响应式布局,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。响应式布局的四种方式总的html代码<body>  &n...
    99+
    2023-06-15
  • Spring Boot中怎么利用Redis实现分布式锁
    Spring Boot中怎么利用Redis实现分布式锁,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。分布式锁介绍Spring Boot 实现 Redis 分布式锁在 sprin...
    99+
    2023-06-16
  • JavaScript与HTML怎么实现交互
    这篇文章主要介绍“JavaScript与HTML怎么实现交互”,在日常操作中,相信很多人在JavaScript与HTML怎么实现交互问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解...
    99+
    2024-04-02
  • PHP与Javascript怎么实现交互
    PHP与Javascript怎么实现交互,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。PHP与Javascript交互的方法:通过Cookie交互。一共是三个文件,分别为:...
    99+
    2023-06-17
  • Vue2响应式系统之深度响应怎么实现
    本文小编为大家详细介绍“Vue2响应式系统之深度响应怎么实现”,内容详细,步骤清晰,细节处理妥当,希望这篇“Vue2响应式系统之深度响应怎么实现”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。1、场景import&...
    99+
    2023-06-30
  • python语音交互怎么实现
    要实现Python语音交互,可以使用第三方库SpeechRecognition。首先,需要安装SpeechRecognition库。...
    99+
    2023-08-31
    python
  • CSS怎么实现响应式布局
    小编给大家分享一下CSS怎么实现响应式布局,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!用CSS实现响应式布局响应式布局感觉很高大上,很难,但实际上只用CSS也能实现响应式布局要用的就是CSS中的没接查询,下面就介绍一下怎...
    99+
    2023-06-08
  • JavaScript中怎么实现各种交互效果
    今天就跟大家聊聊有关JavaScript中怎么实现各种交互效果,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。一、了解HTML5 details, s...
    99+
    2024-04-02
  • bootstrap怎么实现响应式布局
    这篇文章主要介绍“bootstrap怎么实现响应式布局”,在日常操作中,相信很多人在bootstrap怎么实现响应式布局问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”boot...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作