广告
返回顶部
首页 > 资讯 > 数据库 >Java中怎么利用Redis 实现一个分布式任务调度器
  • 739
分享到

Java中怎么利用Redis 实现一个分布式任务调度器

2024-04-02 19:04:59 739人浏览 泡泡鱼
摘要

Java中怎么利用Redis 实现一个分布式任务调度器,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。代码实例在深入讲解实现方法之前,我们先

Java中怎么利用Redis 实现一个分布式任务调度器,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。

代码实例

在深入讲解实现方法之前,我们先来看看这个调度器是如何使用的

class Demo {
    public static void main(String[] args) {
        var redis = new RedisStore();
        // sample 为任务分组名称
        var store = new RedisTaskStore(redis, "sample");
        // 5s 为任务寿命
        var scheduler = new DistributedScheduler(store, 5);
        // 注册一个单次任务
        scheduler.reGISter(Trigger.onceOfDelay(5), Task.of("once1", () -> {
            System.out.println("once1");
        }));
        // 注册一个循环任务
        scheduler.register(Trigger.periodOfDelay(5, 5), Task.of("period2", () -> {
            System.out.println("period2");
        }));
        // 注册一个 CRON 任务
        scheduler.register(Trigger.cronOfMinutes(1), Task.of("cron3", () -> {
            System.out.println("cron3");
        }));
        // 设置全局版本号
        scheduler.version(1);
        // 注册监听器
        scheduler.listener(ctx -> {
            System.out.println(ctx.task().name() + " is complete");
        });
        // 启动调度器
        scheduler.start();
    }
}

当代码升级任务需要增加减少时(或者变更调度时间),只需要递增全局版本号,现有的进程中的任务会自动被重新调度,那些没有被注册的任务(任务减少)会自动清除。新增的任务(新任务)在老代码的进程里是不会被调度的(没有新任务的代码无法调度),被清除的任务(老任务)在老代码的进程里会被取消调度。

比如我们要取消 period2 任务,增加 period4 任务

class Demo {
    public static void main(String[] args) {
        var redis = new RedisStore();
        // sample 为任务分组名称
        var store = new RedisTaskStore(redis, "sample");
        // 5s 为任务锁寿命
        var scheduler = new DistributedScheduler(store, 5);
        // 注册一个单次任务
        scheduler.register(Trigger.onceOfDelay(5), Task.of("once1", () -> {
            System.out.println("once1");
        }));
        // 注册一个 CRON 任务
        scheduler.register(Trigger.cronOfMinutes(1), Task.of("cron3", () -> {
            System.out.println("cron3");
        }));
        // 注册一个循环任务
        scheduler.register(Trigger.periodOfDelay(5, 10), Task.of("period4", () -> {
            System.out.println("period4");
        }));
        // 递增全局版本号
        scheduler.version(2);
        // 注册监听器
        scheduler.listener(ctx -> {
            System.out.println(ctx.task().name() + " is complete");
        });
        // 启动调度器
        scheduler.start();
    }
}

cron4j

<dependency>
    <groupId>it.sauronsoftware.cron4j</groupId>
    <artifactId>cron4j</artifactId>
    <version>2.2.5</version>
</dependency>

这个开源的 library 包含了基础的 cron 表达式解析功能,它还提供了任务的调度功能,不过这里并不需要使用它的调度器。我只会用到它的表达式解析功能,以及一个简单的方法用来判断当前的时间是否匹配表达式(是否该运行任务了)。

我们对 cron 的时间精度要求很低,1 分钟判断一次当前的时间是否到了该运行任务的时候就可以了。

class SchedulingPattern {
    // 表达式是否有效
    boolean validate(String cronExpr);
    // 是否应该运行任务了(一分钟判断一次)
    boolean match(long nowTs);
}

任务的互斥性

因为是分布式任务调度器,多进程环境下要控制同一个任务在调度的时间点只能有一个进程运行。使用 Redis 分布式锁很容易就可以搞定。锁需要保持一定的时间(比如默认 5s)。

所有的进程都会在同一时间调度这个任务,但是只有一个进程可以抢到锁。因为分布式环境下时间的不一致性,不同机器上的进程会有较小的时间差异窗口,锁必须保持一个窗口时间,这里我默认设置为 5s(可定制),这就要求不同机器的时间差不能超过 5s,超出了这个值就会出现重复调度。

public boolean grabTask(String name) {
    var holder = new Holder<Boolean>();
    redis.execute(jedis -> {
        var lockKey = keyFor("task_lock", name);
        var ok = jedis.set(lockKey, "true", SetParams.setParams().nx().ex(lockAge));
        holder.value(ok != null);
    });
    return holder.value();
}

全局版本号

我们给任务列表附上一个全局的版本号,当业务上需要增加或者减少调度任务时,通过变更版本号来触发进程的任务重加载。这个重加载的过程包含轮询全局版本号(Redis 的一个key),如果发现版本号变动,立即重新加载任务列表配置并重新调度所有的任务。

private void scheduleReload() {
    // 1s 对比一次
    this.scheduler.scheduleWithFixedDelay(() -> {
        try {
            if (this.reloadIfChanged()) {
                this.rescheduleTasks();
            }
        } catch (Exception e) {
            LOG.error("reloading tasks error", e);
        }
    }, 0, 1, TimeUnit.SECONDS);
}

重新调度任务先要取消当前所有正在调度的任务,然后调度刚刚加载的所有任务。

private void rescheduleTasks() {
    this.cancelAllTasks();
    this.scheduleTasks();
}

private void cancelAllTasks() {
    this.futures.forEach((name, future) -> {
        LOG.warn("cancelling task {}", name);
        future.cancel(false);
    });
    this.futures.clear();
}

因为需要将任务持久化,所以设计了一套任务的序列化格式,这个也很简单,使用文本符号分割任务配置属性就行。

// 一次性任务(startTime)
ONCE@2019-04-29T15:26:29.946+0800
// 循环任务,(startTime,endTime,period),这里任务的结束时间是天荒地老
PERIOD@2019-04-29T15:26:29.949+0800|292278994-08-17T15:12:55.807+0800|5
// cron 任务,一分钟一次
CRON@*/1 * * * *

$ redis-cli
127.0.0.1:6379> hgetall sample_triggers
1) "task3"
2) "CRON@*/1 * * * *"
3) "task2"
4) "PERIOD@2019-04-29T15:26:29.949+0800|292278994-08-17T15:12:55.807+0800|5"
5) "task1"
6) "ONCE@2019-04-29T15:26:29.946+0800"
7) "task4"
8) "PERIOD@2019-04-29T15:26:29.957+0800|292278994-08-17T15:12:55.807+0800|10"

线程池

时间调度会有一个单独的线程(单线程线程池),任务的运行由另外一个线程池来完成(数量可定制)。

class DistributedScheduler {
    private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private ExecutorService executor = Executors.newFixedThreadPool(threads);
}

之所以要将线程池分开,是为了避免任务的执行(IO)影响了时间的精确调度。

支持无互斥任务

互斥任务要求任务的单进程运行,无互斥任务就是没有加分布式锁的任务,可以多进程同时运行。默认需要互斥。

class Task {
    
    private boolean concurrent;
    private String name;
    private Runnable runner;
    ...
    public static Task of(String name, Runnable runner) {
        return new Task(name, false, runner);
    }

    public static Task concurrent(String name, Runnable runner) {
        return new Task(name, true, runner);
    }
}

增加回调接口

考虑到调度器的使用者可能需要对任务运行状态进行监控,这里增加了一个简单的回调接口,目前功能比较简单。能汇报运行结果(成功还是异常)和运行的耗时

class TaskContext {
    private Task task;
    private long cost;  // 运行时间
    private boolean ok;
    private Throwable e;
}

interface ISchedulerListener {
    public void onComplete(TaskContext ctx);
}

支持存储扩展

目前只实现了 Redis 和 Memory 形式的任务存储,扩展到 zk、etcd、关系数据库也是可行的,实现下面的接口即可。

interface ITaskStore {
  public long getRemoteVersion();
  public Map<String, String> getAllTriggers();
  public void saveAllTriggers(long version, Map<String, String> triggers);
  public boolean grabTask(String name);
}

关于Java中怎么利用Redis 实现一个分布式任务调度器问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注编程网数据库频道了解更多相关知识。

您可能感兴趣的文档:

--结束END--

本文标题: Java中怎么利用Redis 实现一个分布式任务调度器

本文链接: https://www.lsjlt.com/news/58609.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • Java中怎么利用Redis 实现一个分布式任务调度器
    Java中怎么利用Redis 实现一个分布式任务调度器,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。代码实例在深入讲解实现方法之前,我们先...
    99+
    2022-10-18
  • 利用Redis实现分布式任务调度
    利用Redis实现分布式任务调度随着业务的扩展和系统的发展,很多业务都需要实现分布式任务调度,以确保任务能够在多个节点上同时执行,从而提高系统的稳定性和可用性。而Redis作为一款高性能的内存数据存储产品,具备分布式、高可用、高性能等特点,...
    99+
    2023-11-07
    分布式 redis 任务调度
  • java分布式任务调度怎么实现
    实现Java分布式任务调度可以使用以下方法:1. 使用Quartz:Quartz是一个功能强大的开源任务调度框架,可以在Java应用...
    99+
    2023-10-23
    java
  • 分布式系统中如何利用Java关键字和API实现分布式任务调度和监控?
    随着互联网的快速发展,分布式系统的应用越来越广泛,而分布式任务调度和监控是分布式系统中的重要组成部分。Java作为一种高级编程语言,提供了丰富的API和关键字,可以帮助我们更好地实现分布式任务调度和监控。 本文将介绍Java关键字和API...
    99+
    2023-10-23
    关键字 分布式 api
  • 利用MongoDB实现分布式任务调度与执行的经验分享
    MongoDB是一个开源的NoSQL数据库,具有高性能、伸缩性和灵活性的特点。在分布式系统中,任务调度与执行是一个关键的问题,通过利用MongoDB的特性,可以实现分布式任务调度与执行的方案。一、分布式任务调度的需求分析在分布式系统中,任务...
    99+
    2023-11-02
    分布式 MongoDB 任务调度
  • Redis中怎么利用Redlock实现分布式锁
    Redis中怎么利用Redlock实现分布式锁,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。普通实现说道Redis分布式锁大部分人都会想到:setnx+lua,或者知道se...
    99+
    2023-06-20
  • Spring Boot中怎么利用Redis实现分布式锁
    Spring Boot中怎么利用Redis实现分布式锁,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。分布式锁介绍Spring Boot 实现 Redis 分布式锁在 sprin...
    99+
    2023-06-16
  • Java中怎么使用Redis实现分布式锁
    这篇“Java中怎么使用Redis实现分布式锁”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇...
    99+
    2023-05-25
    java redis
  • java中怎么利用mongodb实现分布式锁
    今天就跟大家聊聊有关java中怎么利用mongodb实现分布式锁,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。原理 通过线程安全findAndModify 实现锁实现 定义...
    99+
    2023-06-20
  • 怎么在Java中使用redis实现分布式锁
    本篇文章给大家分享的是有关怎么在Java中使用redis实现分布式锁,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。原理剖析上述三种分布式锁都是通过各自为依据对各个请求进行上锁,...
    99+
    2023-06-15
  • 如何使用Python中的异步IO和协程实现一个高并发的分布式任务调度系统
    如何使用Python中的异步IO和协程实现一个高并发的分布式任务调度系统在当今高速发展的信息时代,分布式系统变得越来越普遍。而高并发的任务调度系统也成为许多企业和组织中不可或缺的一部分。本文以Python为例,介绍了如何使用异步IO和协程来...
    99+
    2023-10-27
    Python 协程 异步IO
  • android应用中怎么利用onLayout()实现一个流式布局
    这期内容当中小编将会给大家带来有关android应用中怎么利用onLayout()实现一个流式布局,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。在onLayout方法中有四个参数,我画了一个简单的图来分清...
    99+
    2023-05-31
    android roi onlayout()
  • 怎么在Java中利用Redis实现一个高并发计数器功能
    这篇文章主要介绍了怎么在Java中利用Redis实现一个高并发计数器功能,编程网小编觉得不错,现在分享给大家,也给大家做个参考,一起跟随编程网小编来看看吧!Java是什么Java是一门面向对象编程语言,可以编写桌面应用程序、Web应用程序、...
    99+
    2023-06-06
  • 如何在Python中实现一个分布式计算框架,以及任务调度和结果收集的机制和策略
    标题:Python中的分布式计算框架实现及任务调度与结果收集机制摘要:分布式计算是一个有效利用多台计算机资源来加速任务处理的方法。本文将介绍如何使用Python实现一个简单的分布式计算框架,包括任务调度和结果收集的机制与策略,并提供相关代码...
    99+
    2023-10-22
    分布式计算 任务调度 结果收集
  • 怎么在java中利用Semaphore实现一个限流器
    怎么在java中利用Semaphore实现一个限流器?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。Java可以用来干什么Java主要应用于:1. web开发;2...
    99+
    2023-06-14
  • 使用Java怎么实现一个Web应用中的定时任务
    使用Java怎么实现一个Web应用中的定时任务?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。定时任务,是指定一个未来的时间范围执行一定任务的功能。在当前WEB应...
    99+
    2023-05-30
    java web
  • 怎么在Java中利用JDBC实现一个事务功能
    本文章向大家介绍怎么在Java中利用JDBC实现一个事务功能,主要包括怎么在Java中利用JDBC实现一个事务功能的使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。Java是什么Java是一门面...
    99+
    2023-05-30
  • 怎么在java中利用GUI实现一个加法计算器
    怎么在java中利用GUI实现一个加法计算器?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。java基本数据类型有哪些Java的基本数据类型分为:1、整数类型,用来表示整数的数据...
    99+
    2023-06-14
  • java中怎么利用gui实现一个计算器小程序
    java中怎么利用gui实现一个计算器小程序,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。代码:package gui;  imp...
    99+
    2023-06-20
  • 怎么在Java中利用AQS实现一个自定义同步器
    这篇文章主要为大家详细介绍了怎么在Java中利用AQS实现一个自定义同步器,文中示例代码介绍的非常详细,具有一定的参考价值,发现的小伙伴们可以参考一下:Java的特点有哪些Java的特点有哪些1.Java语言作为静态面向对象编程语言的代表,...
    99+
    2023-06-06
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作