如何在 Spring 使用@Async,@EnableAsync注释进行异步处理:
异步处理适用那些与业务逻辑(横切关注点)不直接相关或者不作为其他业务逻辑输入的部分,也可在分布式系统中解耦。
*译注:横切关注点(cross-cutting concerns)指一些具有横越多个模块的行为,使用传统的软件开发方法不能够达到有效模块化的一类特殊关注点。*
Spring 中,`@Async`注解可以标记异步操作。然而,使用`@Async`时有一些限制,仅仅把它加在方法上并不能确保方法会在独立的线程中执行。如果你只是偶尔用到 `@Async`,需要格外当心。
1. @Async 的工作机制
首先为方法添加 `Async` 注解。接着,Spring 会基于 `proxyTargetClass` 属性,为包含 `Async` 定义的对象创建代理(JDK Proxy/CGlib)。
最后,Spring 会尝试搜索与当前上下文相关的线程池,把该方法作为独立的执行路径提交。确切地说,Spring 会搜索唯一的 `TaskExecutor` bean 或者名为 `taskExecutor` 的 bean。如果找不到,则使用默认的 `SimpleAsyncTaskExecutor`。
要完成上面的过程,使用中需要注意几个限制,否则会出现 `Async` 不起作用的情况。
2. @Async 的限制
1. 必须在标记 `@ComponentScan` 或 `@configuration` 的类中使用 `@Async`。
未来实现类获取异步处理结果
如果想要获取异步处理的结果,可以通过未来接口的实现类调用得到()方法获得。
未来接口的常见实现类有FutureTask。
在SpringBoot中,一般用AsyncResult作为异步结果。
future 缺点:
使用Future
获得初始化执行结果时,可以使用初始化附加方法get()
,或者替换看isDone()
是否为true
,这两种方法都不是很好,因为主线程也会被迫等待。
从Java 8开始约会了CompletableFuture
,它针对Future
了改进之处,可以针对某些对象,当初始化任务完成或发生异常时,自动调用对象的替代方法。下面会详细解释:
示例:spring boot工程初步处理业务类
1.AsyncTaskManager
@Service
@EnableAsync
public class AsyncTaskManager {
/**
* 这个业务注入的类
*/
@Autowired
private MessageDao messageDao;
/**
* @Async注解表示异步,后面的参数对应于线程池配置类ExecutorConfig中的方法名asyncServiceExecutor()
* 如果不写后面的参数,直接使用@Async注解,则是使用默认的线程池
* Future<String>为异步返回的结果。可以通过get()方法获取结果。
* @param s
* @throws Exception
*/
@Async(value = "asyncTaskExecutor")
public void transTask(String s) throws Exception {
messageDao.getMessage(s);
System.out.println(Thread.currentThread().getName()+"--"+s+" ;time="+ DateFormatUtils.format(new Date(),"yyyy-MM-dd HH:mm:ss"));
TimeUnit.SECONDS.sleep(6);
}
/**
* 异步调用,有返回值,必须是Future类型,不然报错
* 如果不写后面的参数,直接用@Async,则是使用默认的线程池。
* 使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待
* @param s
* @return
*/
@Async(value = "asyncTaskExecutor")
public Future<String> transTaskForFuture(String s) {
String result=null;
try {
result=messageDao.getMessage(s);
System.out.println(Thread.currentThread().getName()+" 子线程开始执行...result=" + result);
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
return AsyncResult.forExecutionException(e);
}
return AsyncResult.forValue(result);
}
/**
* 基于回调的listenableFuture比上种子线程直接返回Future优质是,主线程不用等待,任务在完成后会自动执行回调代码。
* 因此在调用时要注册回调代码,包括成功回调和失败回调
* @param s
* @return
*/
@Async(value = "asyncTaskExecutor")
public ListenableFuture<String> transTaskForCallback(String s) {
String result=null;
try {
result=messageDao.getMessage(s);
System.out.println(Thread.currentThread().getName()+" 子线程开始执行...result=" + result);
TimeUnit.SECONDS.sleep(6);
} catch (InterruptedException e) {
return AsyncResult.forExecutionException(e);
}
return AsyncResult.forValue(result);
}
/**
* 从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法
* 最主要是可以提供复杂的
* CompletableFuture可以指定异步处理流程:
* thenAccept()处理正常结果;
* exceptional()处理异常结果;
* thenApplyAsync()用于串行化另一个CompletableFuture;
* anyOf()和allOf()用于并行化多个CompletableFuture。
* 详解请看 https://www.liaoxuefeng.com/wiki/1252599548343744/1306581182447650
* @param s
* @return
*/
@Async(value = "asyncTaskExecutor")
public CompletableFuture<Object> transTaskForCompletableFuture(String s) {
Object result=null;
try {
result=messageDao.getMessage(s);
System.out.println(Thread.currentThread().getName()+" 子线程开始执行...result=" + result);
TimeUnit.SECONDS.sleep(6);
} catch (Exception e) {
return AsyncResult.forExecutionException(e).completable();
}
return AsyncResult.forValue(result).completable();
}
@Async(value = "asyncTaskExecutor")
public CompletableFuture<Object> transTaskForCompletableFuture2(int s) {
Object result=null;
try {
result=messageDao.getUserCode(s);
System.out.println(Thread.currentThread().getName()+" 子线程开始执行...result=" + result);
TimeUnit.SECONDS.sleep(2);
} catch (Exception e) {
return AsyncResult.forExecutionException(e).completable();
}
return AsyncResult.forValue(result).completable();
}
}
Dao层业务类:
@Repository
public class MessageDao {
public String getMessage(String s){
return s;
}
public String callBackMessage(String s){
return "这是注册回调返回结果s="+s;
}
public String getUserCode(int id){
return "000"+id;
}
public String getUserName(String code){
return "李四";
}
public String getUserDepartment(String code){
return "技术开发部";
}
}
线程池ThreadPoolTaskExecutor
SpringBoot中的线程池一般用ThreadPoolTaskExecutor类
。ThreadPoolTaskExecutor继承关系如下:
ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor
关系结构图为:
2.自定义线程池配置如下:
@Configuration
public class AsyncTaskConfig {
/**
* IO密集型任务 = 一般为2*CPU核心数(常出现于线程中:数据库数据交互、文件上传下载、网络数据传输等等)
* CPU密集型任务 = 一般为CPU核心数+1(常出现于线程中:复杂算法)
* 混合型任务 = 视机器配置和复杂度自测而定
*/
@Bean(name = "asyncTaskExecutor")
public ThreadPoolTaskExecutor asyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//1: 核心线程数目
executor.setCorePoolSize(4);
//2: 指定最大线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
executor.setMaxPoolSize(10);
//3: 队列中最大的数目
executor.setQueueCapacity(200);
//4: 线程名称前缀
executor.setThreadNamePrefix("LocustTask-");
//5:当pool已经达到max size的时候,如何处理新任务
// CallerRunsPolicy: 会在execute 方法的调用线程中运行被拒绝的任务,如果执行程序已关闭,则会丢弃该任务
// AbortPolicy: 抛出java.util.concurrent.RejectedExecutionException异常
// DiscardOldestPolicy: 抛弃旧的任务
// DiscardPolicy: 抛弃当前的任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//6: 线程空闲后的最大存活时间(默认值 60),当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
executor.setKeepAliveSeconds(60);
//7:线程空闲时间,当线程空闲时间达到keepAliveSeconds(秒)时,线程会退出,直到线程数量等于corePoolSize,如果allowCoreThreadTimeout=true,则会直到线程数量等于0
executor.setAllowCoreThreadTimeOut(false);
executor.initialize();
return executor;
}
}
@EnableAsync开启初步
@EnableAsync表示开启初始,可以放在@Controller层上方,也可以放在Applicationclass的上方,也可以直接放在业务类上例:
AsyncTaskManager
@Controller
@EnableAsync
public class XXXController {
@Autowired
private
AsyncTaskManager
asyncTaskManager;
@GetMapping("/user/getList")
@ResponseBody
public String getUserData(){
return
asyncTaskManager
.getAsyncResult();
}
}
Junint 4单元测试类如下
AsyncTaskTest:
1 public class AsyncTaskTest extends BaseTest {
2
3 @Autowired
4 private AsyncTaskManager asyncTaskManager;
5
6 @Autowired
7 private MessageDao messageDao;
8
9 /**
10 * 单无测试方法,没有办法测试多线程池郊果,因为单测试方法运行完后,整个JVM进程会水销毁,所有测试只能启动tomcat进行测试。
11 *
12 * @throws Exception
13 */
14 @Test
15 public void testAsyncTask() throws Exception {
16 for (int i = 1; i <= 10; i++) {
17 asyncTaskManager.transTask("2222");
18 }
19 }
20
21 /**
22 * 主线等待子线完成后,获取返回结果
23 *
24 * @throws Exception
25 */
26 @Test
27 public void testAsyncTaskForFuture() throws Exception {
28 Future<String> future = asyncTaskManager.transTaskForFuture("AAA---BBB");
29 while (true) {
30 if (future.isDone() && !future.isCancelled()) {
31 System.out.println(Thread.currentThread().getName() + "子线程执行完毕");
32 break;
33 } else {
34 Thread.sleep(2000);
35 System.out.println("主线程" + Thread.currentThread().getName() + "待子线程执行完毕");
36 }
37 }
38 }
39
40 /**
41 * 在调用时候,主线不用等待,可以注册回调类和方法进行
42 *
43 * @throws Exception
44 */
45 @Test
46 public void testAsyncTaskForCallback() throws Exception {
47 // 在主要线程设置 独有上下文变量
48 ThreadContext.setUserId(222222222222L);
49 ListenableFuture<String> future = asyncTaskManager.transTaskForCallback("AAA---BBB");
50 future.addCallback(
51 successCallback -> {
52 try {
53 String s = future.get(2L, TimeUnit.SECONDS);
54 String result = messageDao.callBackMessage(s);
55 //在线程池中子线程获取父线程设置变量
56 System.out.println("回调结果:" + result + ";parent thread value:" + ThreadContext.getUserId());
57 } catch (Exception e) {
58 e.printStackTrace();
59 }
60 },
61 FailureCallback -> {
62 System.out.println("子线程执行失败.");
63 }
64 );
65 Thread.sleep(20000);
66 }
67
68 /**
69 * 验证多线程常用的场景比如有: 4个任务需要4个线程去执行,同时成功后才执行相应操作
70 * A,B,C,D 4 个任务
71 * CompletableFuture.allOf()方法
72 * 由于 allOf 聚合了多个 CompletableFuture 实例,所以它是没有返回值的。这也是它的一个缺点
73 * @throws Exception
74 */
75 @Test
76 public void testAsyncTaskForAllOf() throws Exception {
77 CompletableFuture<Object> completableFuture_1 = asyncTaskManager.task1("task-1");
78 CompletableFuture<Object> completableFuture_2 = asyncTaskManager.task2("task-2");
79 CompletableFuture<Object> completableFuture_3 = asyncTaskManager.task3("task-3");
80 CompletableFuture<Object> completableFuture_4 = asyncTaskManager.task4("task-4");
81 // 1: 把4个线程返回 completableFuture_3 组合成一个
82 CompletableFuture alloff=CompletableFuture.allOf(completableFuture_1,completableFuture_2,completableFuture_3,completableFuture_4);
83 // 2:如果没有后续的动作,可以直接 join()和get() 执行结果,主线程一直被阻塞,一直等到用户线程返回,如果不使用join 和get 主线程不会被阻塞
84 // CompletableFuture 提供了 join() 方法,它的功能和 get() 方法是一样的,都是阻塞获取值,它们的区别在于 join() 抛出的是 unchecked Exception。
85 String result= (String)alloff.join();
86 System.out.println("所有任务同时完成"+result);
87 Thread.sleep(20000);
88 }
89
90 /**
91 * 验证多线程常用的场景比如有: 4个任务需要4个线程去执行,同时成功后才执行相应操作
92 * A,B,C,D 4 个任务
93 * CompletableFuture.anyOf()方法 其中有一个执行成功,就算完成
94 *
95 * @throws Exception
96 */
97 @Test
98 public void testAsyncTaskForAnyOf() throws Exception {
99 CompletableFuture<Object> completableFuture_1 = asyncTaskManager.task1("task-1");
100 CompletableFuture<Object> completableFuture_2 = asyncTaskManager.task2("task-2");
101 CompletableFuture<Object> completableFuture_3 = asyncTaskManager.task3("task-3");
102 CompletableFuture<Object> completableFuture_4 = asyncTaskManager.task4("task-4");
103 CompletableFuture anyOf=CompletableFuture.anyOf(completableFuture_1,completableFuture_2,completableFuture_3,completableFuture_4);
104 //这里利用Jdk8函数式接口lambda表达式来实现匿名内部类,?是泛型通配符
105 Object s=anyOf.get(1500,TimeUnit.MILLISECONDS);
106 System.out.println(" anyof 输出结果 s="+s);
107 Thread.sleep(20000);
108 }
109
110 /**
111 * 验证多线程常用的场景比如有: 3个任务需要3个线程去执行
112 * 根据 A 方法 异步返回结果,分别去异步执行 查询员工名称和部门,然后返回结果
113 * @throws Exception
114 */
115 @Test
116 public void testAsyncTaskForCompletableFuture2() throws Exception {
117 CompletableFuture<Object> completableFuture_A = asyncTaskManager.task1("task-1");
118 // 1: 如果A成功后返回结果,作为B的入参去执行(thenApply 方法 都是在自己当前线程中执行)
119 CompletableFuture<Object> fetchNameFuture_B = completableFuture_A.thenApplyAsync((result) ->{
120 return messageDao.getUserName((String)result);
121 }
122 );
123 //2:B 执行成功后结果作为入参,执行C,然后返回
124 CompletableFuture<Object> fetchNameFuture_C=fetchNameFuture_B.thenApplyAsync((result)->{
125 return messageDao.getUserDepartment((String)result);
126 });
127 // join()会一直程序会一直block
128 System.out.println(fetchNameFuture_C.join());
129 // 手动完成一个complete,会立即执行,可以看到future调用complete(T t)会立即执行。但是complete(T t)只能调用一次,后续的重复调用会失效
130 //future已经执行完毕能够返回结果,此时再调用complete(T t)则会无效
131 System.out.println(fetchNameFuture_B.complete("complete"));
132 Thread.sleep(90000);
133 }
134
135 /**
136 * 这个方法验证把两个异步线程的结果聚合起来返回
137 * @throws Exception
138 */
139 @Test
140 public void testAsyncTaskForThenCombine() throws Exception {
141 //1: 第一个查询查询员工消息,
142 CompletableFuture<Object> futureA = asyncTaskManager.task1("task-1");
143 CompletableFuture<Object> futureB = asyncTaskManager.task2("task-2");
144
145 CompletableFuture<Object> future=futureA.thenCombine(futureB,(resultA,resultB)->{
146 return resultA+";"+resultB;
147 });
148 Object s=future.join();
149 System.out.println(" result:"+ s);
150 // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
151 Thread.sleep(20000);
152 }
153 /**
154 * 这个方法验证thenAcceptBoth接口是指,接受两个异步线程,等待两个完成后,做下一步动作,它的第二个参数是一个消费型的函数接口
155 * BiConsumer 这就标明它可以对上边传入的异步线程的结果做处理(改变传入线程结果的值),并且没有返回值
156 * @throws Exception
157 */
158 @Test
159 public void testAsyncTaskForThenAcceptBoth() throws Exception {
160 //1: 第一个查询查询员工消息
161 CompletableFuture<Object> futureA = asyncTaskManager.task1("task-1");
162 CompletableFuture<Object> futureB = asyncTaskManager.task2("task-2");
163
164 CompletableFuture<Void> allResult=futureA.thenAcceptBoth(futureB,(resultA,resultB)->{
165 String result=messageDao.getUserDepartment(resultA+";"+resultB);
166 System.out.println("======result="+result);
167 });
168 // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
169 Thread.sleep(20000);
170 }
171
172 /**
173 * 验证futureA,futureB 两个异步线程,其中一个返回,就返回。
174 * @throws Exception
175 */
176
177 @Test
178 public void testAsyncTaskForAcceptEither() throws Exception {
179 //1: 第一个查询查询员工消息
180 CompletableFuture <Object> futureA = asyncTaskManager.task1(“ task-1”);
181 CompletableFuture <Object> futureB = asyncTaskManager.task2(“ task-2”);
182 futureA.acceptEither(futureB,(result)-> {
183 字符串s = messageDao.getUserName(result +“”);
184 System.out.println(“它的一个串行返回返回的结果:” + s);
185 });
186 // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:187
线程。睡眠(20000);
188 }
189 }
190