iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > Python >spring boot使用@Async异步注解的实现原理+源码
  • 601
分享到

spring boot使用@Async异步注解的实现原理+源码

2024-04-02 19:04:59 601人浏览 安东尼

Python 官方文档:入门教程 => 点击学习

摘要

1、java的大部分接口的方法都是串行执行的,但是有些业务场景是不需要同步返回结果的,可以把结果直接返回,具体业务异步执行,也有些业务接口是需要并行获取数据,最后把数据聚合在统一返回

1、java的大部分接口的方法都是串行执行的,但是有些业务场景是不需要同步返回结果的,可以把结果直接返回,具体业务异步执行,也有些业务接口是需要并行获取数据,最后把数据聚合在统一返回给前端
通常我们都是采用多线程的方式来实现上述业务功能,但spring 提供更优雅的方式来实现上述功能,就是@Async 异步注解,在方法上添加@Async,spring就会借助aop,异步执行方法。

1、如何启用@Async

Spring Boot通过@EnableAsync 注解启用@Async异步注解
实现AsyncConfigurer接口,getAsyncExecutor是默认自定义的线程池



@Slf4j
@EnableAsync
@Configuration
public class SimpleExecutorConfig implements AsyncConfigurer {
    
    @Value("${executor.corePoolSize}")
    private Integer corePoolSize;

    
    @Value("${executor.maxPoolSize}")
    private Integer maxPoolSize;

    
    @Value("${executor.queueCapacity}")
    private Integer queueCapacity;

    
    @Value("${executor.threadNamePrefix}")
    private String threadNamePrefix;

    
    @Value("${executor.threadNamePrefix_2}")
    private String threadNamePrefix_2;


    @Bean(ExecutorConstant.simpleExecutor_1)
    @Override
    public Executor getAsyncExecutor() {
        //线程池
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(corePoolSize);
        taskExecutor.setMaxPoolSize(maxPoolSize);
        taskExecutor.setQueueCapacity(queueCapacity);
        taskExecutor.setThreadNamePrefix(threadNamePrefix);
        taskExecutor.initialize();
        return taskExecutor;
    }


    @Bean(ExecutorConstant.simpleExecutor_2)
    public Executor asyncExecutor2() {
        //线程池
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(corePoolSize);
        taskExecutor.setMaxPoolSize(maxPoolSize);
        taskExecutor.setQueueCapacity(queueCapacity);
        taskExecutor.setThreadNamePrefix(threadNamePrefix_2);
        taskExecutor.initialize();
        return taskExecutor;
    }
}

2、如何使用@Async

下面是代码:
TestAsyncService类:


@Slf4j
@Service
public class TestAsyncService implements ITestAsyncService {

    
    @Async
    @Override
    public void asyncFunction_1(){
        handleBusinessTime();
        log.info("asyncFunction_1 当前线程名称是:{}",Thread.currentThread().getName());
    };

    /////////////////异步方法,无返回值(指定线程池) start
    
    @Async(value = ExecutorConstant.simpleExecutor_2)
    @Override
    public void asyncFunction_2(){
        handleBusinessTime();
        log.info("asyncFunction_2 当前线程名称是:{}",Thread.currentThread().getName());
    };

    @Async(ExecutorConstant.simpleExecutor_2)
    @Override
    public void asyncFunction_3(){
        handleBusinessTime();
        log.info("asyncFunction_3 当前线程名称是:{}",Thread.currentThread().getName());

    };
    /////////////////异步方法,无返回值(指定线程池) end

    
    @Async
    @Override
    public Future<Integer> asyncReturnDta_1(){
        handleBusinessTime();
        log.info("asyncReturnDta_1 当前线程名称是:{}",Thread.currentThread().getName());
        return new AsyncResult<Integer>(1);
    };
    
    @Async(ExecutorConstant.simpleExecutor_2)
    @Override
    public Future<Integer> asyncReturnDta_2(){
        handleBusinessTime();
        log.info("asyncReturnDta_2 当前线程名称是:{}",Thread.currentThread().getName());
        return new AsyncResult<Integer>(1);
    };


    
    @Async
    @Override
    public Future<Integer> asyncReturnDtaTimeOut(){
        handleBusinessTime();
        handleBusinessTime();
        handleBusinessTime();
        handleBusinessTime();
        log.info("asyncReturnDta_3 当前线程名称是:{}",Thread.currentThread().getName());
        return new AsyncResult<Integer>(1);
    };

    
    public static void handleBusinessTime(){
        //去数据库查询数据耗时 start
        int[] sleepTime = NumberUtil.generateRandomNumber(2000,5000,1);
        try {
            //Thread.sleep 休眠的时候 相当于 业务操作,或者请求数据库的需要消耗的时间
            Thread.sleep(sleepTime[0]);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //去数据库查询数据耗时 end
    }
}

TestAsyncController


@Slf4j
@RestController
@RequestMapping(value = "/v1/async")
public class TestAsyncController {

    @Autowired
    ITestAsyncService testAsyncService;

    @apiOperation(value = "调用接口")
    @RequestMapping(value = "/test", method = RequestMethod.GET)
    public Resp<Integer> test() throws ExecutionException, InterruptedException {
        log.info("asyncFunction_1 start");
        testAsyncService.asyncFunction_1();
        log.info("asyncFunction_1 start");

        log.info("asyncFunction_2 start");
        testAsyncService.asyncFunction_2();
        log.info("asyncFunction_2 end");

        log.info("asyncFunction_3 start");
        testAsyncService.asyncFunction_3();
        log.info("asyncFunction_3 end");


        log.info("asyncReturnDta_1 & asyncReturnDta_2 start");
        Future<Integer> future = testAsyncService.asyncReturnDta_1();
        testAsyncService.asyncReturnDta_2();
        log.info("asyncReturnDta_1 & asyncReturnDta_2 end");

        Integer resp = future.get();
        log.info("future.get() resp:{}",resp);
        return Resp.buildDataSuccess(resp);
    }

    @ApiOperation(value = "调用接口-超时")
    @RequestMapping(value = "/async_timeOut", method = RequestMethod.GET)
    public Resp<Integer> async_timeOut() throws ExecutionException, InterruptedException {
        TimeInterval timeInterval = DateUtil.timer();
        log.info("asyncReturnDtaTimeOut start");
        Future<Integer> future = testAsyncService.asyncReturnDtaTimeOut();
        log.info("asyncReturnDtaTimeOut end");

        Integer resp = null;
        try {
            //一秒内 返回不了数据就报错
            resp = future.get(1, TimeUnit.SECONDS);
        } catch (TimeoutException e) {
            resp = -1;//请求超时了,相当于熔断,服务降级
            log.error("asyncReturnDtaTimeOut future.get(1, TimeUnit.SECONDS) timeout:",e);
        }

        log.info("future.get() resp:{}  耗时:{}毫秒",resp,timeInterval.intervalRestart());
        return Resp.buildDataSuccess(resp);
    }

}

/v1/async/test 接口:

2021-06-20 21:09:30.490 INFO 14207 --- [NIO-8666-exec-7] c.e.m.controller.TestAsyncController : asyncFunction_1 start
2021-06-20 21:09:30.490 INFO 14207 --- [nio-8666-exec-7] c.e.m.controller.TestAsyncController : asyncFunction_1 start
2021-06-20 21:09:30.490 INFO 14207 --- [nio-8666-exec-7] c.e.m.controller.TestAsyncController : asyncFunction_2 start
2021-06-20 21:09:30.491 INFO 14207 --- [nio-8666-exec-7] c.e.m.controller.TestAsyncController : asyncFunction_2 end
2021-06-20 21:09:30.491 INFO 14207 --- [nio-8666-exec-7] c.e.m.controller.TestAsyncController : asyncFunction_3 start
2021-06-20 21:09:30.491 INFO 14207 --- [nio-8666-exec-7] c.e.m.controller.TestAsyncController : asyncFunction_3 end
2021-06-20 21:09:30.491 INFO 14207 --- [nio-8666-exec-7] c.e.m.controller.TestAsyncController : asyncReturnDta_1 & asyncReturnDta_2 start
2021-06-20 21:09:30.492 INFO 14207 --- [nio-8666-exec-7] c.e.m.controller.TestAsyncController : asyncReturnDta_1 & asyncReturnDta_2 end
2021-06-20 21:09:32.679 INFO 14207 --- [le-1-executor-9] c.e.multi.service.impl.TestAsyncService : asyncFunction_1 当前线程名称是:my-simple-1-executor-9
2021-06-20 21:09:33.454 INFO 14207 --- [le-2-executor-8] c.e.multi.service.impl.TestAsyncService : asyncFunction_3 当前线程名称是:my-simple-2-executor-8
2021-06-20 21:09:33.578 INFO 14207 --- [le-2-executor-9] c.e.multi.service.impl.TestAsyncService : asyncReturnDta_2 当前线程名称是:my-simple-2-executor-9
2021-06-20 21:09:34.101 INFO 14207 --- [e-1-executor-10] c.e.multi.service.impl.TestAsyncService : asyncReturnDta_1 当前线程名称是:my-simple-1-executor-10
2021-06-20 21:09:34.102 INFO 14207 --- [nio-8666-exec-7] c.e.m.controller.TestAsyncController : future.get() resp:1
2021-06-20 21:09:34.357 INFO 14207 --- [le-2-executor-7] c.e.multi.service.impl.TestAsyncService : asyncFunction_2 当前线程名称是:my-simple-2-executor-7

日志上可以看出,都是异步执行的

/v1/async/async_timeOut 接口:

2021-06-20 21:20:58.886 INFO 14427 --- [nio-8666-exec-1] c.e.m.controller.TestAsyncController : asyncReturnDtaTimeOut start
2021-06-20 21:20:58.890 INFO 14427 --- [nio-8666-exec-1] c.e.m.controller.TestAsyncController : asyncReturnDtaTimeOut end
2021-06-20 21:20:59.899 ERROR 14427 --- [nio-8666-exec-1] c.e.m.controller.TestAsyncController : asyncReturnDtaTimeOut future.get(1, TimeUnit.SECONDS) timeout:

java.util.concurrent.TimeoutException: null
at java.util.concurrent.FutureTask.get(FutureTask.java:205) ~[na:1.8.0_231]
at com.example.multi.controller.TestAsyncController.async_timeOut(TestAsyncController.java:69) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_231]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_231]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_231]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_231]
at org.springframework.WEB.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197) [spring-web-5.3.8.jar:5.3.8]
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:141) [spring-web-5.3.8.jar:5.3.8]
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106) [spring-webmvc-5.3.8.jar:5.3.8]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:894) [spring-webmvc-5.3.8.jar:5.3.8]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) [spring-webmvc-5.3.8.jar:5.3.8]
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) [spring-webmvc-5.3.8.jar:5.3.8]
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1063) [spring-webmvc-5.3.8.jar:5.3.8]
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:963) [spring-webmvc-5.3.8.jar:5.3.8]
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) [spring-webmvc-5.3.8.jar:5.3.8]
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) [spring-webmvc-5.3.8.jar:5.3.8]
at javax.servlet.Http.httpservlet.service(HttpServlet.java:626) [Tomcat-embed-core-9.0.46.jar:4.0.FR]
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) [spring-webmvc-5.3.8.jar:5.3.8]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:733) [tomcat-embed-core-9.0.46.jar:4.0.FR]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) [tomcat-embed-webSocket-9.0.46.jar:9.0.46]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) [spring-web-5.3.8.jar:5.3.8]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) [spring-web-5.3.8.jar:5.3.8]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.springframework.web.filter.FORMContentFilter.doFilterInternal(FormContentFilter.java:93) [spring-web-5.3.8.jar:5.3.8]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) [spring-web-5.3.8.jar:5.3.8]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) [spring-web-5.3.8.jar:5.3.8]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) [spring-web-5.3.8.jar:5.3.8]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1707) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.46.jar:9.0.46]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_231]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_231]
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.46.jar:9.0.46]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_231]

2021-06-20 21:20:59.900 INFO 14427 --- [nio-8666-exec-1] c.e.m.controller.TestAsyncController : future.get() resp:-1 耗时:1014毫秒
2021-06-20 21:21:12.105 INFO 14427 --- [le-1-executor-1] c.e.multi.service.impl.TestAsyncService : asyncReturnDta_3 当前线程名称是:my-simple-1-executor-1

从日志上看出,如果future.get(1, TimeUnit.SECONDS) 到了超时时间,直接抛出超时异常,走主线程后续代码。
比较适合 规定时间范围内要返回数据(超时可以根据业务场景,返回一个默认值,或者返回值值就是空的)的业务场景

@Async原理+源码

原理:是通过spring aop + 线程池的方式来实现的
源码:
源码的方法位置是:AsyncExecutionInterceptor.invoke

107行:是获取一个线程池108行:如果没有设置线程池抛出异常113行:是创建一个线程对象 他的run方法执行invocation.proceed()【走实际业务代码】121和124行:走的是统一的异常处理 主要是调用handleUncaughtException方法,SimpleExecutorConfig 实现了AsyncConfigurer接口它就有getAsyncUncaughtExceptionHandler方法,可以重写这个方法,实现自定义的异常处理

下图是AsyncConfigurer接口可以实现的方法:

doSubmit方法:

实际上就是调用线程池的submit方法:

具体代码,我上传到gitee,大家感兴趣可以clone 传送门~

@Async源码参考于:
https://www.jb51.net/article/141542.htm

以上就是spring boot使用@Async异步注解的实现原理+源码的详细内容,更多关于spring boot异步注解的资料请关注编程网其它相关文章!

--结束END--

本文标题: spring boot使用@Async异步注解的实现原理+源码

本文链接: https://www.lsjlt.com/news/128663.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作