本文首发于 http://www.YoungZY.com/
keywords:Java, Spring, Transactional, Multi-threading, ExecutorService
为什么要用多线程?
1 2 3 4 5 6 7 8 9 10 11 12 |
public class BigService { public void execute() { doSomething(); List entities = buildManyManyEntities(); save(entities); doOthers(); } } |
如上的业务代码,经测试大部分时间花在了 save
方法上(耗时:15/22,数据量:5000)。
于是想到了用多线程(entities 数据之间互不影响)。
于是改成了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
public class BigService { public void execute() { doSomething(); List entities = buildManyManyEntities(); // 使用多线程存储 saveInParallel(entities); doOthers(); } void save(Object entities) { // 直接存入数据库 // 单个对象和集合都可以 } void saveInParallel(List entities) { int cpus = RunTime.getRunTime().availableProcessors(); ExecuteService exec = Executors.newFixedThreadPool(cpus); try { for (final Object each : entities) { exec.execute(new Runnable() { public void run() { save(each); } }); } } finally { exec.shutdown(); // 关闭资源 } } } |
自我感觉良好。结果一测试发现表里没数据。
网上查了一下,找到了原因:新的线程没受Spring控制(项目中事务是由Spring控制的)。
知道了问题,就有了方向。意料之外,情理之中地在Stack Overflow(链接见文末)找到了答案。关键点在于Spring中的Lookup,详述如下。
关键步骤在于由Spring来控制线程类的实例化。
新建多线程工具类:MultiThreadHelper.java 。
1 2 3 4 5 |
public abstract class MultiThreadHelper { // 必须是抽象方法且无参 // 详见文末Spring官方文档Lookup部分 public abstract DAOTask createDAOTask(); } |
新建线程类:DAOTask.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
public class DAOTask implements Runnable { private DAO dao; private Object entity; public DAOTask(DAO dao) { this.dao = dao; } public void addEntity(Object entity) { this.entity = entity; } public void run() { dao.save(entity); } } |
新建Spring的配置文件: multi.xml 。
1 2 3 4 5 6 7 8 |
<bean id="multiThreadHelper" class="MultiThreadHelper"> <lookup-method name="createDAOTask" bean="daoTask"/> </bean> <!-- 注意要用prototype,每次都生成一个新的 --> <bean id="daoTask" class="DAOTask" scope="prototype"> <constructor-arg ref="dao"/> </bean> |
新的BigService类(省略了Spring配置文件)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
public class BigService { private MultiThreadHelper multiThreadHelper; // 通过Spring注入 public void execute() { doSomething(); List entities = buildManyManyEntities(); // 使用多线程存储 saveInParallel(entities); doOthers(); } void save(Object entities) { // 直接存入数据库 // 单个对象和集合都可以 } void saveInParallel(List entities) { int cpus = RunTime.getRunTime().availableProcessors(); ExecuteService exec = Executors.newFixedThreadPool(cpus); try { for (final Object each : entities) { // 一定要调用这个Lookup方法生成线程 DAOTask task = getMultiThreadHelper().createDAOTask(); task.addEntity(each); exec.execute(task); } } finally { exec.shutdown(); // 关闭资源 } } } |
最后需要注意的是,要把 run
方法纳入事务范围。传播类型REQUIRED
,REQUIRES_NEW
没看出明显差别。
大功告成!
时间从本来的22s左右,减少为4s左右。当然,这仅仅是响应时间(从调用开始到调用结束),不代表数据都已经存入库中了(通过日志可以看出来)。
线程数量的选择
之前看过的一本书(《Java并发编程实战》)里说:线程数量等于服务器的CPU数量+1时能实现最优的利用率。
于是分别测试了N+1,N,N-1三种情况,差别不大,N-1的略胜一筹。不知道是不是仅仅因为需要创建的线程数少了,导致响应时间变短。
说明:这里的N指CPU的数量,测试中是4 。
问题
这里存在一个问题:事务不能跟主线程一起统一管理,即如果主线程中发生了错误多线程写入的数据不会回滚。
不过有个小技巧——先删后插,每次调这个方法前都先删一下。
如有更好的方法,欢迎留言讨论。
参考链接
近期评论