keywords:Java, Spring, Transactional, Multi-threading, ExecutorService
为什么要用多线程?
1
2
3
4
5
6
7
8
9
10
11
12
public class BigService {
public void execute () {
doSomething();
List entities = buildManyManyEntities();
save(entities);
doOthers();
}
}
如上的业务代码,经测试大部分时间花在了 save
方法上(耗时:15/22,数据量:5000)。
于是想到了用多线程(entities 数据之间互不影响)。
于是改成了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class BigService {
public void execute () {
doSomething();
List entities = buildManyManyEntities();
saveInParallel(entities);
doOthers();
}
void save (Object entities) {
}
void saveInParallel (List entities) {
int cpus = RunTime.getRunTime().availableProcessors();
ExecuteService exec = Executors.newFixedThreadPool(cpus);
try {
for (final Object each : entities) {
exec.execute(new Runnable() {
public void run () {
save(each);
}
});
}
} finally {
exec.shutdown();
}
}
}
自我感觉良好。结果一测试发现表里没数据。
网上查了一下,找到了原因:新的线程没受Spring控制(项目中事务是由Spring控制的)。
知道了问题,就有了方向。意料之外,情理之中地在Stack Overflow(链接见文末)找到了答案。关键点在于Spring 中的Lookup ,详述如下。
关键步骤在于由Spring来控制线程类的实例化。
新建多线程工具类:MultiThreadHelper.java 。
1
2
3
4
5
public abstract class MultiThreadHelper {
public abstract DAOTask createDAOTask () ;
}
新建线程类:DAOTask.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class DAOTask implements Runnable {
private DAO dao;
private Object entity;
public DAOTask (DAO dao) {
this .dao = dao;
}
public void addEntity (Object entity) {
this .entity = entity;
}
public void run () {
dao.save(entity);
}
}
新建Spring的配置文件: multi.xml 。
1
2
3
4
5
6
7
8
<bean id ="multiThreadHelper" class ="MultiThreadHelper" >
<lookup-method name ="createDAOTask" bean ="daoTask" />
</bean >
<bean id ="daoTask" class ="DAOTask" scope ="prototype" >
<constructor-arg ref ="dao" />
</bean >
新的BigService类(省略了Spring配置文件)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class BigService {
private MultiThreadHelper multiThreadHelper;
public void execute () {
doSomething();
List entities = buildManyManyEntities();
saveInParallel(entities);
doOthers();
}
void save (Object entities) {
}
void saveInParallel (List entities) {
int cpus = RunTime.getRunTime().availableProcessors();
ExecuteService exec = Executors.newFixedThreadPool(cpus);
try {
for (final Object each : entities) {
DAOTask task = getMultiThreadHelper().createDAOTask();
task.addEntity(each);
exec.execute(task);
}
} finally {
exec.shutdown();
}
}
}
最后需要注意的是,要把 run
方法纳入事务范围。传播类型REQUIRED
,REQUIRES_NEW
没看出明显差别。
大功告成!
时间从本来的22s左右,减少为4s左右。当然,这仅仅是响应时间(从调用开始到调用结束),不代表数据都已经存入库中了(通过日志可以看出来)。
线程数量的选择
之前看过的一本书(《Java并发编程实战》)里说:线程数量等于服务器的CPU数量+1时能实现最优的利用率。
于是分别测试了N+1,N,N-1三种情况,差别不大,N-1的略胜一筹。不知道是不是仅仅因为需要创建的线程数少了,导致响应时间变短。
说明:这里的N指CPU的数量,测试中是4 。
问题
这里存在一个问题:事务不能跟主线程一起统一管理,即如果主线程中发生了错误多线程写入的数据不会回滚。
不过有个小技巧——先删后插,每次调这个方法前都先删一下。
如有更好的方法,欢迎留言讨论。
参考链接
近期评论