文章目录 一、Redisson简单介绍二、Redisson简单使用1. maven引用2. RedisConfig配置3. StockRedissonService4. 测试 三、Redisson源码1. 加锁2. 解锁3. 自
Redisson是一个在Redis的基础上实现的Java驻内存数据网格,可参考Redisson官方文档使用,它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.19.1</version></dependency>
@Configurationpublic class RedissonConfig { @Value("${spring.redis.host}") private String redisAddress; @Bean public RedissonClient redissonClient(){ // 默认连接地址 127.0.0.1:6379 RedissonClient redisson = Redisson.create();// Config config = new Config();// config.useSingleServer().setAddress("redis://" + redisAddress); //redis服务器地址// .setDatabase(0) //制定redis数据库编号// .setUsername("").setPassword() // redis用户名密码// .setConnectionMinimumIdleSize(10) //连接池最小空闲连接数// .setConnectionPoolSize(50) //连接池最大线程数// .setIdleConnectionTimeout(60000) //线程的超时时间// .setConnectTimeout(6000) //客户端程序获取redis链接的超时时间// .setTimeout(60000) //响应超时时间// RedissonClient redisson = Redisson.create(config); return redisson; }}
@Servicepublic class StockRedissonService { @Autowired private StringRedisTemplate redisTemplate; @Autowired private RedissonClient redissonClient; public void deduct() { RLock rlock = redissonClient.getLock("lock"); rlock.lock(); try{ // 1。 查询库存 String stockStr = redisTemplate.opsForValue().get("stock"); if (!StringUtil.isNullOrEmpty(stockStr)) { int stock = Integer.parseInt(stockStr); // 2。 判断条件是否满足 if (stock > 0) { // 3 更新redis redisTemplate.opsForValue().set("stock", String.valueOf(stock - 1)); } } }finally { rlock.unlock(); } }}
简单来说,就是直接
RLock rlock = redissonClient.getLock(“lock”);
获取到锁,然后lock()和unlock()即可。
并发达到了660,比之前自己写的redis lua脚本还要高。
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LonGCodec.INSTANCE, command, "if ((redis.call('exists', KEYS[1]) == 0) " +"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); }
这就是加锁的核心代码,其实本质上也就是LUA脚本+hash数据类型
// 如果这个key不存在,或者这个key是我自己的,那么state+1,并且重置失效时间"if ((redis.call('exists', KEYS[1]) == 0) " +"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +// 如果不满足,则以毫秒为单位,返回key的过期时间"return redis.call('pttl', KEYS[1]);",
pttl: 以毫秒为单位返回 key 的剩余过期时间。
当 key 不存在时,返回 -2 。
当 key 存在但没有设置剩余生存时间时,返回 -1 。
否则,以毫秒为单位,返回 key 的剩余生存时间。
这其实和我们之前手写的lua脚本并没有什么太大差别,只是它这里return nil表示的是加锁成功
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
解锁的核心代码,和加锁一样,本质上也是LUA脚本+hash数据类型
// 如果key不是我的,直接返回nil,解锁失败"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +// 如果key是我的,那么state-1"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +// state-1后的值,如果>0,说明重入过了,那更新下失效时间"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +// state-1后的值,如果=0,说明全部解锁成功了,删除锁,并且publish"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end; " +"return nil;",
private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } CompletionStage<Boolean> future = renewExpirationAsync(threadId); future.whenComplete((res, e) -> { if (e != null) { log.error("Can't update lock {} expiration", getRawName(), e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } if (res) { // reschedule itself renewExpiration(); } else { cancelExpirationRenewal(null); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); }
它也是使用的new TimerTask() ,同样也是一次性续期,若res=1,再次续期。
官方文档:如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。
不同的是,即使redisson节点宕机,它还是可以自动续期,保证你的业务逻辑正常结束。
其实和我们之前手写实现的redis分布式锁原理差不多,但是它是配合着CompletableFuture去异步实现,更加高效
本质上还是LUA脚本+hash数据类型
非公平锁:来个线程就先试试能不能插队,不能插队才去后面排队
公平锁:线程都乖乖去后面排队去,不准插队
StockController新增一个接口,用于调用公平锁
@GetMapping("fair/lock/{id}") public String fairLock(@PathVariable("id")Long id){ stockService.fairLock(id); return "hello fair lock"; }
StockRedissonService
public void fairLock(Long id) { RLock fairLock = redissonClient.getFairLock("fairLock"); fairLock.lock(); try { TimeUnit.SECONDS.sleep(3); System.out.println("-----测试公平锁-----"); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { fairLock.unlock(); } }
redis中有个等待队列
访问是顺序执行。
RLock fairLock = redissonClient.getLock("fairLock");
ps:若使用Nginx转发会超时重发,修改conf即可
添加如下:
proxy_connect_timeout 12000;
proxy_send_timeout 12000;
proxy_read_timeout 12000;
读写 不能并发 加锁
写写 不能并发 加锁
读读 能并发 不加锁
StockController
@GetMapping("read/lock") public String readLock(){ stockService.readLock(); return "read read lock"; } @GetMapping("write/lock") public String writeLock(){ stockService.writeLock(); return "write write lock"; }
StockRedissonService
public void readLock() { RReadWriteLock lock = redissonClient.getReadWriteLock("rwLock"); lock.readLock().lock(10, TimeUnit.SECONDS); // TODO 疯狂的读// lock.readLock().unlock(); } public void writeLock() { RReadWriteLock lock = redissonClient.getReadWriteLock("rwLock"); lock.writeLock().lock(10, TimeUnit.SECONDS); // TODO 疯狂的写// lock.writeLock().unlock(); }
简单测试下,两个页面分别访问
http://localhost:10010/write/lock 写操作
http://localhost:10010/read/lock 读操作
写写: 多次写操作明显等待
写读:明显等待
读写:明显等待
读锁有多个,但写锁只有一个
读读:无等待现象
可以用来控制同时访问特定资源的线程数量,常用于限流场景。
Semaphore接收一个int整型值,表示 许可证数量。
线程通过调用acquire()获取许可证,执行完成之后通过调用release()归还许可证。
只有获取到许可证的线程才能运行,获取不到许可证的线程将会阻塞。
public static void main(String[] args) { Semaphore semaphore = new Semaphore(3); for (int i = 0; i < 6; i++) { new Thread(()->{ try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + ":抢到了停车位"); TimeUnit.SECONDS.sleep(new Random().nextInt(10)); System.out.println(Thread.currentThread().getName() + "停了会儿就开走了"); semaphore.release(); } catch (InterruptedException e) { throw new RuntimeException(e); } }, i + "号车").start(); } }
每次只会停三辆车,走一辆后才可以停下一辆,完美实现限流。
StockRedissonService
public void semaphoreLock() { RSemaphore semaphore = redissonClient.getSemaphore("semaphore"); semaphore.trySetPermits(3); //设置资源量,限流的线程数 try { semaphore.acquire(); // 获取资源,获取资源成功的线程可以继续处理业务操作,否则会被阻塞住 redisTemplate.opsForList().rightPush("log",port + "端口获取了资源,开始处理业务逻辑" + Thread.currentThread().getName()); TimeUnit.SECONDS.sleep(new Random().nextInt(10)); redisTemplate.opsForList().rightPush("log",port + "端口处理完,释放了资源" + Thread.currentThread().getName()); semaphore.release(); // 手动释放资源,后续请求线程可以获取该资源 } catch (InterruptedException e) { throw new RuntimeException(e); } }
用redis的list数据类型来记录日志,方便集群部署的日志查看。
semaphore.trySetPermits(3);//设置资源量,限流的线程数
这会在redis中成semaphore的key,如果要修改资源量,必须手动把redis中该key删除,否则只在代码中修改,重启后无法生效。
简单测试下,多次访问后
集群部署下,共用了信号量的限流,两个端口启动,同一时间限流了3个线程。
public static void main(String[] args) throws InterruptedException { // 班长线程 CountDownLatch countDownLatch = new CountDownLatch(6); for (int i = 0; i < 6; i++) { new Thread(() -> { System.out.println(Thread.currentThread().getName() + "准备出门了"); try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread().getName() + "睡了一会,终于出门了"); countDownLatch.countDown(); }, i + "号同学" ).start(); } countDownLatch.await(); System.out.println("班长锁门了"); }
班长等六个同学全部出门后才可以锁门
@GetMapping("test/countDown") public String countDown(){ stockService.countDown(); return "出来了一位同学"; } @GetMapping("test/latch") public String testLatch(){ stockService.testLatch(); return "班长锁门了"; }
public void countDown() { RCountDownLatch cdl = redissonClient.getCountDownLatch("cdl"); // TODO 出门准备完成 cdl.countDown(); }
public void testLatch() { RCountDownLatch cdl = redissonClient.getCountDownLatch("cdl"); cdl.trySetCount(6); try { cdl.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } // TODO 准备锁门 }
RCountDownLatch cdl = redissonClient.getCountDownLatch("cdl");
其中的“cdl"即是存放在redis的key,两个方法必须命名一致。
分别访问接口
http://localhost/test/countDown 出门
http://localhost/test/latch 班长锁门
只有访问六次出门后,才可以成功锁门
redis中存放着cdl的key,也就是目前的等待线程数。
redisson: redis的java客户端,分布式锁
玩法:
RLock rlock = redissonClient.getLock("xxx");rlock.lock()/unlock()
RLock fairLock = redissonClient.getLock("xxx");fairLock.lock()/unlock()
RReadWriteLock lock = redissonClient.getReadWriteLock("xxx");lock.readLock().lock()/unlock();lock.writeLock().lock()/unlock();
RSemaphore semaphore = redissonClient.getSemaphore("xxx");semaphore.trySetPermits(3); //设置资源量,限流的线程数semaphore.acquire()/release()
RCountDownLatch cdl = redissonClient.getCountDownLatch("xxx");cdl.trySetCount(6);cdl.await()/countDown()
--结束END--
本文标题: Redisson分布式锁
本文链接: https://www.lsjlt.com/news/375070.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
下载Word文档到电脑,方便收藏和打印~
2024-04-01
2024-04-03
2024-04-03
2024-01-21
2024-01-21
2024-01-21
2024-01-21
2023-12-23
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
一口价域名售卖能注册吗?域名是网站的标识,简短且易于记忆,为在线用户提供了访问我们网站的简单路径。一口价是在域名交易中一种常见的模式,而这种通常是针对已经被注册的域名转售给其他人的一种方式。
一口价域名买卖的过程通常包括以下几个步骤:
1.寻找:买家需要在域名售卖平台上找到心仪的一口价域名。平台通常会为每个可售的域名提供详细的描述,包括价格、年龄、流
443px" 443px) https://www.west.cn/docs/wp-content/uploads/2024/04/SEO图片294.jpg https://www.west.cn/docs/wp-content/uploads/2024/04/SEO图片294-768x413.jpg 域名售卖 域名一口价售卖 游戏音频 赋值/切片 框架优势 评估指南 项目规模
0