如何在Spring管理的事务中使用多线程

keywords:Java, Spring, Transactional, Multi-threading, ExecutorService

为什么要用多线程?

1
2
3
4
5
6
7
8
9
10
11
12
public class BigService {

    public void execute() {
        doSomething();

	List entities = buildManyManyEntities();
	save(entities);

	doOthers();
    }

}

如上的业务代码,经测试大部分时间花在了 save 方法上(耗时:15/22,数据量:5000)。

于是想到了用多线程(entities 数据之间互不影响)。

于是改成了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class BigService {

    public void execute() {
	doSomething();

	List entities = buildManyManyEntities();
	// 使用多线程存储
	saveInParallel(entities);

	doOthers();
    }

    void save(Object entities) {
	// 直接存入数据库
	// 单个对象和集合都可以
    }

    void saveInParallel(List entities) {
	int cpus = RunTime.getRunTime().availableProcessors();
	ExecuteService exec = Executors.newFixedThreadPool(cpus);

	try {
	    for (final Object each : entities) {   
		exec.execute(new Runnable() {
			public void run() {
			    save(each);
			}
		});
	    }
	} finally {
	    exec.shutdown(); // 关闭资源
	}
    }
}

自我感觉良好。结果一测试发现表里没数据。

网上查了一下,找到了原因:新的线程没受Spring控制(项目中事务是由Spring控制的)。

知道了问题,就有了方向。意料之外,情理之中地在Stack Overflow(链接见文末)找到了答案。关键点在于Spring中的Lookup,详述如下。

关键步骤在于由Spring来控制线程类的实例化。

新建多线程工具类:MultiThreadHelper.java 。

1
2
3
4
5
public abstract class MultiThreadHelper {
    // 必须是抽象方法且无参
    // 详见文末Spring官方文档Lookup部分
    public abstract DAOTask createDAOTask();
}

新建线程类:DAOTask.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class DAOTask implements Runnable {
    private DAO dao;
    private Object entity;

    public DAOTask(DAO dao) {
	this.dao = dao;
    }

    public void addEntity(Object entity) {
	this.entity = entity;
    }

    public void run() {
	dao.save(entity);
    }
}

新建Spring的配置文件: multi.xml 。

1
2
3
4
5
6
7
8
<bean id="multiThreadHelper" class="MultiThreadHelper">
    <lookup-method name="createDAOTask" bean="daoTask"/>
</bean>

<!-- 注意要用prototype,每次都生成一个新的 -->
<bean id="daoTask" class="DAOTask" scope="prototype">
    <constructor-arg ref="dao"/>
</bean>

新的BigService类(省略了Spring配置文件)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class BigService {

    private MultiThreadHelper multiThreadHelper; // 通过Spring注入

    public void execute() {
	doSomething();

	List entities = buildManyManyEntities();
	// 使用多线程存储
	saveInParallel(entities);

	doOthers();
    }

    void save(Object entities) {
	// 直接存入数据库
	// 单个对象和集合都可以
    }

    void saveInParallel(List entities) {
	int cpus = RunTime.getRunTime().availableProcessors();
	ExecuteService exec = Executors.newFixedThreadPool(cpus);

	try {
	    for (final Object each : entities) {   
		// 一定要调用这个Lookup方法生成线程
		DAOTask task = getMultiThreadHelper().createDAOTask();
		task.addEntity(each);
		exec.execute(task);
	    }
	} finally {
	    exec.shutdown(); // 关闭资源
	}
    }
}

最后需要注意的是,要把 run 方法纳入事务范围。传播类型REQUIREDREQUIRES_NEW 没看出明显差别。

大功告成!

时间从本来的22s左右,减少为4s左右。当然,这仅仅是响应时间(从调用开始到调用结束),不代表数据都已经存入库中了(通过日志可以看出来)。

线程数量的选择

之前看过的一本书(《Java并发编程实战》)里说:线程数量等于服务器的CPU数量+1时能实现最优的利用率。

于是分别测试了N+1,N,N-1三种情况,差别不大,N-1的略胜一筹。不知道是不是仅仅因为需要创建的线程数少了,导致响应时间变短。

说明:这里的N指CPU的数量,测试中是4 。

问题

这里存在一个问题:事务不能跟主线程一起统一管理,即如果主线程中发生了错误多线程写入的数据不会回滚。

不过有个小技巧——先删后插,每次调这个方法前都先删一下。

如有更好的方法,欢迎留言讨论。

参考链接

 

加入讨论

您的电子邮箱地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据