iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > Python >spring中websocket定时任务实现实时推送
  • 416
分享到

spring中websocket定时任务实现实时推送

Python 官方文档:入门教程 => 点击学习

摘要

有时候业务要求websocket连接后,服务端实时每隔一段时间就将数据推送给客户端进行响应,这时就需要WEBSocket+定时任务一起来实现实时推送数据给客户端了。使用的定时任务方式

有时候业务要求websocket连接后,服务端实时每隔一段时间就将数据推送给客户端进行响应,这时就需要WEBSocket+定时任务一起来实现实时推送数据给客户端了。
使用的定时任务方式为spring的TaskScheduler对象实现任务调度。

TaskScheduler定时任务实现

TaskScheduler接口提供了多种调度方法来实现运行任务的执行。

public interface TaskScheduler {
 
 	//通过触发器来决定task是否执行
    ScheduledFuture schedule(Runnable task, Trigger trigger); 
 
 	//在starttime的时候执行一次
    ScheduledFuture schedule(Runnable task, Date startTime);  
    ScheduledFuture schedule(Runnable task, Instant startTime); 
 
 	//从starttime开始每个period时间段执行一次task
    ScheduledFuture scheduleAtFixedRate(Runnable task, Instant startTime, Duration period); 
    ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period); 
 
 	//每隔period执行一次
    ScheduledFuture scheduleAtFixedRate(Runnable task, long period);  
    ScheduledFuture scheduleAtFixedRate(Runnable task, Duration period);  
 
 	//从startTime开始每隔delay长时间执行一次
    ScheduledFuture scheduleWithFixedDelay(Runnable task, Date startTime, long delay); 
 
 	//每隔delay时间执行一次
    ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay); 
}

简单测试一下

import cn.hutool.core.date.DateUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;


@Slf4j
@Component
@RequiredArgsConstructor
public class TaskSchedulerTest {

    private final TaskScheduler taskScheduler;

    @Bean
    public void test() {
        //每隔3秒执行一次
        Trigger trigger = new CronTrigger("0/3 * * * * *");
        //每隔1秒执行一次
        //Trigger trigger1 = new PeriodicTrigger(1, TimeUnit.SECONDS);
        taskScheduler.schedule(new MyThread(), trigger);
    }

    private class MyThread implements Runnable {
        @Override
        public void run() {
            log.info("定时执行线程名称=【{}】,执行时间=【{}】", Thread.currentThread().getName(), DateUtil.date());
        }
    }

}

效果就是每个3秒执行一次

在这里插入图片描述

websocket+定时任务实时推送

实现的业务需求如下:客户端连上来以后就每隔3秒向客户端实时推送消息。有关websocket的实现见文章websocket简单实现

TestWebsocket.java

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.JSON.jsONUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;


@Slf4j
@Component
@RequiredArgsConstructor
public class TestWebsocket implements WebSocketHandler {

    protected static final CopyOnWriteArrayList<WebSocketSession> WEB_SOCKET_SESSIONS = new CopyOnWriteArrayList<>();

    
    Map<String, ScheduledFuture<?>> stringScheduledFutureMap = new ConcurrentHashMap<>();

    
    private final TaskScheduler taskScheduler;

    
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        sendMessage("连接成功~~~~~~,sessionId=" + session.getId());
        WEB_SOCKET_SESSIONS.add(session);
        //设置定时任务,每隔3s执行一次
        Trigger trigger = new CronTrigger("0/3 * * * * *");
        //开启一个定时任务
        ScheduledFuture<?> schedule = taskScheduler.schedule(new CustomizeTask(session.getId()), trigger);
        //根据session连接id定时任务线程存到map中
        stringScheduledFutureMap.put(session.getId(), schedule);
    }

    private class CustomizeTask implements Runnable {
        private final String sessionId;

        CustomizeTask(String sessionId) {
            this.sessionId = sessionId;
        }

        @Override
        public void run() {
            try {
                String message = CharSequenceUtil.fORMat("定时执行线程名称=【{}】,执行时间=【{}】", Thread.currentThread().getName(), DateUtil.date());
                sendMessage(JSONUtil.toJsonStr(message), sessionId);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    
    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
        sendMessage("接收到的消息为=【" + message + "】,sessionId=【" + session.getId() + "】,回复消息=【你好呀!】");
    }

    
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        if (session.isOpen()) {
            sendMessage("ws连接出错,即将关闭此session,sessionId=【" + session.getId() + "】");
            session.close();
        }
        WEB_SOCKET_SESSIONS.remove(session);
    }

    
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
        if (session.isOpen()) {
            sendMessage("ws连接即将关闭此session,sessionId=【" + session.getId() + "】");
            session.close();
        }
        WEB_SOCKET_SESSIONS.remove(session);
        String sessionId = session.getId();
        ScheduledFuture<?> scheduledFuture = MapUtil.get(stringScheduledFutureMap, sessionId, ScheduledFuture.class);
        if (scheduledFuture != null) {
            //暂停对应session的开启的定时任务
            scheduledFuture.cancel(true);
            //集合移除
            stringScheduledFutureMap.remove(sessionId);
        }
    }

    
    @Override
    public boolean supportsPartialMessages() {
        return false;
    }

    
    public void sendMessage(String message) throws IOException {
        if (CollUtil.isNotEmpty(WEB_SOCKET_SESSIONS)) {
            for (WebSocketSession webSocketSession : WEB_SOCKET_SESSIONS) {
                webSocketSession.sendMessage(new TextMessage(message));
            }
        }
    }

    
    public void sendMessage(String message, String sessionId) throws IOException {
        if (CollUtil.isNotEmpty(WEB_SOCKET_SESSIONS)) {
            for (WebSocketSession webSocketSession : WEB_SOCKET_SESSIONS) {
                if (sessionId.equals(webSocketSession.getId())) {
                    webSocketSession.sendMessage(new TextMessage(message));
                }
            }
        }
    }
}

websocket绑定URL

import com.yjj.test.websocket.TestWebsocket;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerReGIStry;

import javax.annotation.Resource;


@EnableWebSocket
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {

    @Resource
    private TestWebsocket testWebsocket;

    
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(testWebsocket, "/test").setAllowedOrigins("*");
    }
}

websocket与定时任务同时存在时,需要加入配置定义线程池进行线程的管理

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;


@Configuration
public class ScheduledConfig {

    
    @Bean
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler scheduling = new ThreadPoolTaskScheduler();
        scheduling.setPoolSize(20);
        return scheduling;
    }
}

效果如下
连接上以后服务每隔3秒会向客户端实时推送消息

在这里插入图片描述

 到此这篇关于spring中websocket定时任务实现实时推送的文章就介绍到这了,更多相关spring websocket实时推送内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

--结束END--

本文标题: spring中websocket定时任务实现实时推送

本文链接: https://www.lsjlt.com/news/176918.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作