SpringBoot怎么实现异步处理
这篇文章给大家分享的是有关SpringBoot怎么实现异步处理的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
异步请求与同步请求我们先通过一张图来区分一下异步请求和同步请求的区别:
在上图中有三个角色:客户端、Web容器和业务处理线程。
两个流程中客户端对Web容器的请求,都是同步的。因为它们在请求客户端时都处于阻塞等待状态,并没有进行异步处理。
在Web容器部分,第一个流程采用同步请求,第二个流程采用异步回调的形式。
通过异步处理,可以先释放容器分配给请求的线程与相关资源,减轻系统负担,从而增加了服务器对客户端请求的吞吐量。但并发请求量较大时,通常会通过负载均衡的方案来解决,而不是异步。
Servlet3.0中的异步Servlet 3.0之前,Servlet采用Thread-Per-Request的方式处理请求,即每一次Http请求都由一个线程从头到尾处理。当涉及到耗时操作时,性能问题便比较明显。
Servlet 3.0中提供了异步处理请求。可以先释放容器分配给请求的线程与相关资源,减轻系统负担,从而增加服务的吞吐量。
Servlet 3.0的异步是通过AsyncContext对象来完成的,它可以从当前线程传给另一个线程,并归还初始线程。新的线程处理完业务可以直接返回结果给客户端。
AsyncContext对象可以从HttpServletRequest中获取:
@RequestMapping("/async")publicvoidasync(HttpServletRequestrequest){AsyncContextasyncContext=request.getAsyncContext();}
在AsyncContext中提供了获取ServletRequest、ServletResponse和添加监听(addListener)等功能:
publicinterfaceAsyncContext{ServletRequestgetRequest();ServletResponsegetResponse();voidaddListener(AsyncListenervar1);voidsetTimeout(longvar1);//省略其他方法}
不仅可以通过AsyncContext获取Request和Response等信息,还可以设置异步处理超时时间。通常,超时时间(单位毫秒)是需要设置的,不然无限等下去不就与同步处理一样了。
通过AsyncContext的addListener还可以添加监听事件,用来处理异步线程的开始、完成、异常、超时等事件回调。
addListener方法的参数AsyncListener的源码如下:
publicinterfaceAsyncListenerextendsEventListener{//异步执行完毕时调用voidonComplete(AsyncEventvar1)throwsIOException;//异步线程执行超时调用voidonTimeout(AsyncEventvar1)throwsIOException;//异步线程出错时调用voidonError(AsyncEventvar1)throwsIOException;//异步线程开始时调用voidonStartAsync(AsyncEventvar1)throwsIOException;}
通常,异常或超时时返回调用方错误信息,而异常时会处理一些清理和关闭操作或记录异常日志等。
基于Servlet方式实现异步请求下面直接看一个基于Servlet方式的异步请求示例:
@GetMapping(value="/email/send")publicvoidservletReq(HttpServletRequestrequest){AsyncContextasyncContext=request.startAsync();//设置监听器:可设置其开始、完成、异常、超时等事件的回调处理asyncContext.addListener(newAsyncListener(){@OverridepublicvoidonTimeout(AsyncEventevent){System.out.println("处理超时了...");}@OverridepublicvoidonStartAsync(AsyncEventevent){System.out.println("线程开始执行");}@OverridepublicvoidonError(AsyncEventevent){System.out.println("执行过程中发生错误:"+event.getThrowable().getMessage());}@OverridepublicvoidonComplete(AsyncEventevent){System.out.println("执行完成,释放资源");}});//设置超时时间asyncContext.setTimeout(6000);asyncContext.start(newRunnable(){@Overridepublicvoidrun(){try{Thread.sleep(5000);System.out.println("内部线程:"+Thread.currentThread().getName());asyncContext.getResponse().getWriter().println("asyncprocessing");}catch(Exceptione){System.out.println("异步处理发生异常:"+e.getMessage());}//异步请求完成通知,整个请求完成asyncContext.complete();}});//此时request的线程连接已经释放了System.out.println("主线程:"+Thread.currentThread().getName());}
启动项目,访问对应的URL,打印日志如下:
主线程:http-nio-8080-exec-4
内部线程:http-nio-8080-exec-5
执行完成,释放资源
可以看出,上述代码先执行完了主线程,也就是程序的最后一行代码的日志打印,然后才是内部线程的执行。内部线程执行完成,AsyncContext的onComplete方法被调用。
如果通过浏览器访问对应的URL,还可以看到该方法的返回值“async processing”。说明内部线程的结果同样正常的返回到客户端了。
基于Spring实现异步请求基于Spring可以通过Callable、DeferredResult或者WebAsyncTask等方式实现异步请求。
基于Callable实现对于一次请求(/email),基于Callable的处理流程如下:
1、Spring MVC开启副线程处理业务(将Callable提交到TaskExecutor);
2、DispatcherServlet和所有的Filter退出Web容器的线程,但是response保持打开状态;
3、Callable返回结果,SpringMVC将原始请求重新派发给容器(再重新请求一次/email),恢复之前的处理;
4、DispatcherServlet重新被调用,将结果返回给用户;
代码实现示例如下:
@GetMapping("/email")publicCallable<String>order(){System.out.println("主线程开始:"+Thread.currentThread().getName());Callable<String>result=()->{System.out.println("副线程开始:"+Thread.currentThread().getName());Thread.sleep(1000);System.out.println("副线程返回:"+Thread.currentThread().getName());return"success";};System.out.println("主线程返回:"+Thread.currentThread().getName());returnresult;}
访问对应URL,控制台输入日志如下:
主线程开始:http-nio-8080-exec-1
主线程返回:http-nio-8080-exec-1
副线程开始:task-1
副线程返回:task-1
通过日志可以看出,主线程已经完成了,副线程才进行执行。同时,URL返回结果“success”。这也说明一个问题,服务器端的异步处理对客户端来说是不可见的。
Callable默认使用SimpleAsyncTaskExecutor类来执行,这个类非常简单而且没有重用线程。在实践中,需要使用AsyncTaskExecutor类来对线程进行配置。
这里通过实现WebMvcConfigurer接口来完成线程池的配置。
@ConfigurationpublicclassWebConfigimplementsWebMvcConfigurer{@ResourceprivateThreadPoolTaskExecutormyThreadPoolTaskExecutor;/***配置线程池*/@Bean(name="asyncPoolTaskExecutor")publicThreadPoolTaskExecutorgetAsyncThreadPoolTaskExecutor(){ThreadPoolTaskExecutortaskExecutor=newThreadPoolTaskExecutor();taskExecutor.setCorePoolSize(2);taskExecutor.setMaxPoolSize(10);taskExecutor.setQueueCapacity(25);taskExecutor.setKeepAliveSeconds(200);taskExecutor.setThreadNamePrefix("thread-pool-");//线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者taskExecutor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());taskExecutor.initialize();returntaskExecutor;}@OverridepublicvoidconfigureAsyncSupport(finalAsyncSupportConfigurerconfigurer){//处理callable超时configurer.setDefaultTimeout(60*1000);configurer.setTaskExecutor(myThreadPoolTaskExecutor);configurer.registerCallableInterceptors(timeoutCallableProcessingInterceptor());}@BeanpublicTimeoutCallableProcessingInterceptortimeoutCallableProcessingInterceptor(){returnnewTimeoutCallableProcessingInterceptor();}}
为了验证打印的线程,我们将实例代码中的System.out.println替换成日志输出,会发现在使用线程池之前,打印日志如下:
2021-02-21 09:45:37.144 INFO 8312 --- [nio-8080-exec-1] c.s.learn.controller.AsynController : 主线程开始:http-nio-8080-exec-1
2021-02-21 09:45:37.144 INFO 8312 --- [nio-8080-exec-1] c.s.learn.controller.AsynController : 主线程返回:http-nio-8080-exec-1
2021-02-21 09:45:37.148 INFO 8312 --- [ task-1] c.s.learn.controller.AsynController : 副线程开始:task-1
2021-02-21 09:45:38.153 INFO 8312 --- [ task-1] c.s.learn.controller.AsynController : 副线程返回:task-1
线程名称为“task-1”。让线程池生效之后,打印日志如下:
2021-02-21 09:50:28.950 INFO 8339 --- [nio-8080-exec-1] c.s.learn.controller.AsynController : 主线程开始:http-nio-8080-exec-1
2021-02-21 09:50:28.951 INFO 8339 --- [nio-8080-exec-1] c.s.learn.controller.AsynController : 主线程返回:http-nio-8080-exec-1
2021-02-21 09:50:28.955 INFO 8339 --- [ thread-pool-1] c.s.learn.controller.AsynController : 副线程开始:thread-pool-1
2021-02-21 09:50:29.956 INFO 8339 --- [ thread-pool-1] c.s.learn.controller.AsynController : 副线程返回:thread-pool-1
线程名称为“thread-pool-1”,其中前面的“thread-pool”正是我们配置的线程池前缀。
除了线程池的配置,还可以配置统一异常处理,这里就不再演示了。
基于WebAsyncTask实现Spring提供的WebAsyncTask是对Callable的包装,提供了更强大的功能,比如:处理超时回调、错误回调、完成回调等。
@GetMapping("/webAsyncTask")publicWebAsyncTask<String>webAsyncTask(){log.info("外部线程:"+Thread.currentThread().getName());WebAsyncTask<String>result=newWebAsyncTask<>(60*1000L,newCallable<String>(){@OverridepublicStringcall(){log.info("内部线程:"+Thread.currentThread().getName());return"success";}});result.onTimeout(newCallable<String>(){@OverridepublicStringcall(){log.info("timeoutcallback");return"timeoutcallback";}});result.onCompletion(newRunnable(){@Overridepublicvoidrun(){log.info("finishcallback");}});returnresult;}
访问对应请求,打印日志:
基于DeferredResult实现2021-02-21 10:22:33.028 INFO 8547 --- [nio-8080-exec-1] c.s.learn.controller.AsynController : 外部线程:http-nio-8080-exec-1
2021-02-21 10:22:33.033 INFO 8547 --- [ thread-pool-1] c.s.learn.controller.AsynController : 内部线程:thread-pool-1
2021-02-21 10:22:33.055 INFO 8547 --- [nio-8080-exec-2] c.s.learn.controller.AsynController : finish callback
DeferredResult使用方式与Callable类似,但在返回结果时不一样,它返回的时实际结果可能没有生成,实际的结果可能会在另外的线程里面设置到DeferredResult中去。
DeferredResult的这个特性对实现服务端推技术、订单过期时间处理、长轮询、模拟MQ的功能等高级应用非常重要。
关于DeferredResult的使用先来看一下官方的例子和说明:
@RequestMapping("/quotes")@ResponseBodypublicDeferredResult<String>quotes(){DeferredResult<String>deferredResult=newDeferredResult<String>();//SavethedeferredResultinin-memoryqueue...returndeferredResult;}//Insomeotherthread...deferredResult.setResult(data);
上述示例中我们可以发现DeferredResult的调用并不一定在Spring MVC当中,它可以是别的线程。官方的解释也是如此:
In this case the return value will also be produced from a separate thread. However, that thread is not known to Spring MVC. For example the result may be produced in response to some external event such as a JMS message, a scheduled task, etc.
也就是说,DeferredResult返回的结果也可能是由MQ、定时任务或其他线程触发。来个实例:
@Controller@RequestMapping("/async/controller")publicclassAsyncHelloController{privateList<DeferredResult<String>>deferredResultList=newArrayList<>();@ResponseBody@GetMapping("/hello")publicDeferredResult<String>helloGet()throwsException{DeferredResult<String>deferredResult=newDeferredResult<>();//先存起来,等待触发deferredResultList.add(deferredResult);returndeferredResult;}@ResponseBody@GetMapping("/setHelloToAll")publicvoidhelloSet()throwsException{//让所有hold住的请求给与响应deferredResultList.forEach(d->d.setResult("sayhellotoall"));}}
第一个请求/hello,会先将deferredResult存起来,前端页面是一直等待(转圈)状态。直到发第二个请求:setHelloToAll,所有的相关页面才会有响应。
整个执行流程如下:
controller返回一个DeferredResult,把它保存到内存里或者List里面(供后续访问);
Spring MVC调用request.startAsync(),开启异步处理;与此同时将DispatcherServlet里的拦截器、Filter等等都马上退出主线程,但是response仍然保持打开的状态;
应用通过另外一个线程(可能是MQ消息、定时任务等)给DeferredResult#setResult值。然后SpringMVC会把这个请求再次派发给servlet容器;
DispatcherServlet再次被调用,然后处理后续的标准流程;
通过上述流程可以发现:利用DeferredResult可实现一些长连接的功能,比如当某个操作是异步时,可以先保存对应的DeferredResult对象,当异步通知回来时,再找到这个DeferredResult对象,在setResult处理结果即可。从而提高性能。
SpringBoot中的异步实现在SpringBoot中将一个方法声明为异步方法非常简单,只需两个注解即可@EnableAsync和@Async。其中@EnableAsync用于开启SpringBoot支持异步的功能,用在SpringBoot的启动类上。@Async用于方法上,标记该方法为异步处理方法。
需要注意的是@Async并不支持用于被@Configuration注解的类的方法上。同一个类中,一个方法调用另外一个有@Async的方法,注解也是不会生效的。
@EnableAsync的使用示例:
@SpringBootApplication@EnableAsyncpublicclassApp{publicstaticvoidmain(String[]args){SpringApplication.run(App.class,args);}}
@Async的使用示例:
@ServicepublicclassSyncService{@AsyncpublicvoidasyncEvent(){//业务处理}}
@Async注解的使用与Callable有类似之处,在默认情况下使用的都是SimpleAsyncTaskExecutor线程池,可参考Callable中的方式来自定义线程池。
下面通过一个实例来验证一下,启动类上使用@EnableAsync,然后定义Controller类:
@RestControllerpublicclassIndexController{@ResourceprivateUserServiceuserService;@RequestMapping("/async")publicStringasync(){System.out.println("--IndexController--1");userService.sendSms();System.out.println("--IndexController--4");return"success";}}
定义Service及异步方法:
@ServicepublicclassUserService{@AsyncpublicvoidsendSms(){System.out.println("--sendSms--2");IntStream.range(0,5).forEach(d->{try{Thread.sleep(1000);}catch(InterruptedExceptione){e.printStackTrace();}});System.out.println("--sendSms--3");}}
如果先注释掉@EnableAsync和@Async注解,即正常情况下的业务请求,打印日志为:
--IndexController--1
--sendSms--2
--sendSms--3
--IndexController--4
使用@EnableAsync和@Async注解时,打印日志如下:
--IndexController--1
--IndexController--4
--sendSms--2
--sendSms--3
通过日志的对比我们可以看出,使用了@Async的方法,会被当成一个子线程。所以,整个sendSms方法会在主线程执行完了之后执行。
这样的效果是不是跟我们上面使用的其他形式的异步异曲同工?所以在文章最开始已经说到,网络上所谓的“异步调用与异步请求的区别”是并不存储在的,本质上都是一回事,只不过实现形式不同而已。这里所提到异步方法,也就是将方法进行异步处理而已。
@Async、WebAsyncTask、Callable、DeferredResult的区别所在的包不同:
@Async:org.springframework.scheduling.annotation;
WebAsyncTask:org.springframework.web.context.request.async;
Callable:java.util.concurrent;
DeferredResult:org.springframework.web.context.request.async;
通过所在的包,我们应该隐隐约约感到一些区别,比如@Async是位于scheduling包中,而WebAsyncTask和DeferredResult是用于Web(Spring MVC)的,而Callable是用于concurrent(并发)处理的。
对于Callable,通常用于Controller方法的异步请求,当然也可以用于替换Runable的方式。在方法的返回上与正常的方法有所区别:
//普通方法publicStringaMethod(){}//对照Callable方法publicCallable<String>aMethod(){}
而WebAsyncTask是对Callable的封装,提供了一些事件回调的处理,本质上区别不大。
DeferredResult使用方式与Callable类似,重点在于跨线程之间的通信。
@Async也是替换Runable的一种方式,可以代替我们自己创建线程。而且适用的范围更广,并不局限于Controller层,而可以是任何层的方法上。
当然,大家也可以从返回结果,异常处理等角度来分析一下,这里就不再展开了。
感谢各位的阅读!关于“SpringBoot怎么实现异步处理”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。