Python 官方文档:入门教程 => 点击学习
目录spring动态管理定时任务ThreadPoolTaskScheduler实现思路ThreadPoolTaskScheduler 定时任务实现总结Spring动态管理定时任务Th
Spring任务调度核心类ThreadPoolTaskScheduler,api文档解释如下:
Implementation of Spring's TaskScheduler interface, wrapping a native java.util.concurrent.ScheduledThreadPoolExecutor.
Spring的TaskScheduler接口的实现,包装了一个本地java.util.concurrent.ScheduledThreadPoolExecutor。
注入调度类bean,初始化一个ConcurrentHashMap容器,用来保存多个定时任务的状态,每一个任务的运行状态被封装在ScheduledFuture中,借此类可取消对应的定时任务。
import java.time.LocalDateTime;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.Resource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.WEB.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.cjia.spidercommon.model.SpiderJob;
import com.cjia.spiderjob.mapper.SpiderJobMapper;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RestController
@RequestMapping("spiderJob/cron")
public class CronJobController extends SpiderJobController {
@Autowired
private ThreadPoolTaskScheduler threadPoolTaskScheduler;
private Map<Integer, ScheduledFuture<?>> futureMap = new ConcurrentHashMap<>();
@Resource
private SpiderJobMapper spiderJobMapper;
@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
return new ThreadPoolTaskScheduler();
}
@RequestMapping("/start/{jobId}")
public String start(@PathVariable Integer jobId) {
SpiderJob job = spiderJobMapper.selectById(jobId);
if (job == null) {
log.warn("任务[{}]已不存在,无法启动!", jobId);
return "任务[" + jobId + "]已不存在,无法启动!";
}
int enable = job.getEnable();
if (enable == 0) {
log.warn("任务[{}]已被禁用,无法启动!", jobId);
return "任务[" + jobId + "]已被禁用,无法启动!";
}
// 检测该任务是否已在运行调度中
if (futureMap.get(jobId) != null) {
log.warn("任务[{}]已在调度运行,无法重复启动!", jobId);
return "任务[" + jobId + "]已在调度运行,无法重复启动!";
}
String cron = job.getCron();
// TODO check cron
ScheduledFuture<?> future = threadPoolTaskScheduler.schedule(new MyRunnable(job), new CronTrigger(cron));
log.info("任务[{}]已被启动!", jobId);
futureMap.put(jobId, future);
return "任务[" + jobId + "]已被启动!";
}
@RequestMapping("/startBatch/{jobIds}")
public String startBatch(@PathVariable String jobIds) {
// TODO jobIds valid
String[] jobIdsArr = jobIds.split(",");
StringBuffer sb = new StringBuffer();
for (String jobId : jobIdsArr) {
String result = start(Integer.valueOf(jobId));
sb.append(result).append("<br>");
}
return sb.toString();
}
@RequestMapping("/stop/{jobId}")
public String stop(@PathVariable Integer jobId) {
// 检测该任务是否已在运行调度中
ScheduledFuture<?> future = futureMap.get(jobId);
if (future == null) {
log.warn("任务[{}]已不在调度中,无法停止!", jobId);
return "任务[" + jobId + "]已不在调度中,无法停止!";
} else {
future.cancel(true);
futureMap.remove(jobId);
log.info("任务[{}]已被停止!", jobId);
return "任务[" + jobId + "]已被停止!";
}
}
@RequestMapping("/stopBatch/{jobIds}")
public String stopBatch(@PathVariable String jobIds) {
// TODO jobIds valid
String[] jobIdsArr = jobIds.split(",");
StringBuffer sb = new StringBuffer();
for (String jobId : jobIdsArr) {
String result = stop(Integer.valueOf(jobId));
sb.append(result).append("<br>");
}
return sb.toString();
}
@RequestMapping("/status")
public String getAllStatus() {
Set<Integer> runningKeys = futureMap.keySet();
return "当前正在调度的任务列表:" + runningKeys.toString();
}
@Data
private class MyRunnable implements Runnable {
private SpiderJob job;
public MyRunnable(SpiderJob job) {
this.job = job;
}
@Override
public void run() {
log.info("运行定时任务[{}: {}] at {}!", job.getId(), job.getBizName(), LocalDateTime.now());
executeIncrementJob(job.getBizName());
}
}
}
org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler定时任务调度线程池
CREATE TABLE `sys_job` (
`id` bigint(20) NOT NULL COMMENT '任务key',
`job_name` varchar(64) NOT NULL COMMENT '任务名称',
`bean_class` varchar(128) NOT NULL COMMENT '类路径',
`cron_expression` varchar(64) NOT NULL COMMENT 'cron表达式',
`status` tinyint(1) NOT NULL COMMENT '状态值 @JobStatusEnum 详见具体枚举类',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT '删除标识 1是 0否',
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
@Configuration
@Slf4j
public class SchedulinGConfigure {
@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
log.info("开始创建定时任务调度线程池");
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(20);
threadPoolTaskScheduler.setThreadNamePrefix("schedule-task-");
threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true);
threadPoolTaskScheduler.setAwaitTerminationSeconds(60);
log.info("创建定时任务调度线程池完成!");
return threadPoolTaskScheduler;
}
}
public enum JobStatusEnum {
NOT_SCHEDULE(0, "未加入调度器"),
SCHEDULED_BUT_NOT_RUNNING(1, "加入调度器,但未运行"),
DELETED(2, "从调度器中已删除"),
;
private Integer status;
private String detail;
JobStatusEnum(Integer status, String detail) {
this.status = status;
this.detail = detail;
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer status) {
this.status = status;
}
public String getDetail() {
return detail;
}
public void setDetail(String detail) {
this.detail = detail;
}
}
@Component
@Slf4j
public class ScheduledJobService {
private final ReentrantLock lock = new ReentrantLock();
@Autowired
private ThreadPoolTaskScheduler threadPoolTaskScheduler;
@Autowired
private SysJobService jobService;
@Autowired
private SpringBeanUtils springBeanUtils;
private final ConcurrentHashMap<Long, ScheduledFuture<?>> scheduledFutureMap = new ConcurrentHashMap<>();
public void initAllJob(List<SysJob> sysJobs) {
if (CollectionUtils.isEmpty(sysJobs)) {
return;
}
for (SysJob sysJob : sysJobs) {
if (JobStatusEnum.NOT_SCHEDULE.getStatus().equals(sysJob.getStatus())
|| JobStatusEnum.DELETED.getStatus().equals(sysJob.getStatus())
|| this.isScheduled(sysJob.getId())) {
// 任务初始化状态或已删除或已加载到调度器中
continue;
}
// 将任务加入调度器
this.doScheduleJob(sysJob);
}
}
public void start(Long jobId) {
log.info("启动任务:-> jobId_{}", jobId);
// 加入调度器
schedule(jobId);
log.info("启动任务结束:-> jobId_{}", jobId);
// 更新任务状态
jobService.updateJobStatus(jobId, JobStatusEnum.SCHEDULED_BUT_NOT_RUNNING.getStatus());
}
public void stop(Long jobId) {
log.info("停止任务:-> jobId_{}", jobId);
// 取消任务
cancel(jobId);
log.info("停止任务结束:-> jobId_{}", jobId);
// 更新表中任务状态为已停止
jobService.updateJobStatus(jobId, JobStatusEnum.NOT_SCHEDULE.getStatus());
}
public void remove(Long jobId) {
log.info("移除任务:-> jobId_{}", jobId);
// 取消任务
cancel(jobId);
log.info("移除任务结束:-> jobId_{}", jobId);
// 更新表中任务状态为已删除
jobService.updateJobStatus(jobId, JobStatusEnum.DELETED.getStatus());
}
private void cancel(Long jobId) {
// 任务是否存在
if (scheduledFutureMap.containsKey(jobId)) {
ScheduledFuture<?> scheduledFuture = scheduledFutureMap.get(jobId);
if (!scheduledFuture.isCancelled()) {
// 取消调度
scheduledFuture.cancel(true);
}
}
}
private void schedule(Long jobId) {
// 添加锁,只允许单个线程访问,防止任务启动多次
lock.lock();
try {
if (isScheduled(jobId)) {
log.error("任务jobId_{}已经加入调度器,无需重复操作", jobId);
return;
}
// 通过jobKey查询jobBean对象
SysJob sysJob = jobService.getById(jobId);
// 启动定时任务
doScheduleJob(sysJob);
} finally {
// 释放锁资源
lock.unlock();
}
}
private void doScheduleJob(SysJob sysJob) {
Long jobId = sysJob.getId();
String beanClass = sysJob.getBeanClass();
String jobName = sysJob.getJobName();
String cron = sysJob.getCronExpression();
// 从Spring中获取目标的job业务实现类
ScheduledJob scheduledJob = parseFrom(beanClass);
if (scheduledJob == null) {
return;
}
scheduledJob.setJobId(jobId);
scheduledJob.setJobName(jobName);
ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(scheduledJob,
triggerContext -> {
CronTrigger cronTrigger = new CronTrigger(cron);
return cronTrigger.nextExecutionTime(triggerContext);
});
log.info("任务加入调度器 -> jobId:{},jobName:{}", jobId, jobName);
// 将启动的任务放入map
assert scheduledFuture != null;
scheduledFutureMap.put(jobId, scheduledFuture);
}
private Boolean isScheduled(Long jobId) {
if (scheduledFutureMap.containsKey(jobId)) {
return !scheduledFutureMap.get(jobId).isCancelled();
}
return false;
}
private ScheduledJob parseFrom(String beanClass) {
try {
Class<?> clazz = Class.forName(beanClass);
return (ScheduledJob) springBeanUtils.getBean(clazz);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
return null;
}
}
@Component
public class SpringBeanUtils implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringBeanUtils.applicationContext = applicationContext;
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
public Object getBean(String name) {
return getApplicationContext().getBean(name);
}
public <T> T getBean(Class<T> clazz) {
return getApplicationContext().getBean(clazz);
}
public <T> T getBean(String name, Class<T> clazz) {
return getApplicationContext().getBean(name, clazz);
}
}
@Data
public abstract class ScheduledJob implements Runnable {
private Long jobId;
private String jobName;
}
@Component
public class SchedulerTestDemo extends ScheduledJob {
@Override
public void run() {
System.out.println("我是定时任务要执行的类..");
System.out.println(SchedulerTestDemo.class.getName() + ":" + LocalDateTime.now());
}
}
@Component
public class GrapeApplicationListener {
private final ScheduledJobService scheduledJobService;
private final ISysJobService sysJobService;
public GrapeApplicationListener(ISysJobService sysJobService, ScheduledJobService scheduledJobService) {
this.sysJobService = sysJobService;
this.scheduledJobService = scheduledJobService;
}
@PostConstruct
public void initStartJob() {
// 初始化job
scheduledJobService.initAllJob(sysJobService.list());
}
}
@SpringBootApplication(scanBasePackages = {"com.example.grape"})
@MapperScan("com.example.grape.dao.mapper")
@EnableScheduling
public class GrapeApplication {
public static void main(String[] args) {
SpringApplication.run(GrapeApplication.class, args);
}
}
以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程网。
--结束END--
本文标题: Spring动态管理定时任务之ThreadPoolTaskScheduler解读
本文链接: https://www.lsjlt.com/news/176073.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
下载Word文档到电脑,方便收藏和打印~
2024-03-01
2024-03-01
2024-03-01
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0