文章标签 » Spring

如何在Spring管理的事务中使用多线程

keywords:Java, Spring, Transactional, Multi-threading, ExecutorService

为什么要用多线程?

1
2
3
4
5
6
7
8
9
10
11
12
public class BigService {

    public void execute() {
        doSomething();

	List entities = buildManyManyEntities();
	save(entities);

	doOthers();
    }

}

如上的业务代码,经测试大部分时间花在了 save 方法上(耗时:15/22,数据量:5000)。

于是想到了用多线程(entities 数据之间互不影响)。

于是改成了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class BigService {

    public void execute() {
	doSomething();

	List entities = buildManyManyEntities();
	// 使用多线程存储
	saveInParallel(entities);

	doOthers();
    }

    void save(Object entities) {
	// 直接存入数据库
	// 单个对象和集合都可以
    }

    void saveInParallel(List entities) {
	int cpus = RunTime.getRunTime().availableProcessors();
	ExecuteService exec = Executors.newFixedThreadPool(cpus);

	try {
	    for (final Object each : entities) {   
		exec.execute(new Runnable() {
			public void run() {
			    save(each);
			}
		});
	    }
	} finally {
	    exec.shutdown(); // 关闭资源
	}
    }
}

自我感觉良好。结果一测试发现表里没数据。

网上查了一下,找到了原因:新的线程没受Spring控制(项目中事务是由Spring控制的)。

知道了问题,就有了方向。意料之外,情理之中地在Stack Overflow(链接见文末)找到了答案。关键点在于Spring中的Lookup,详述如下。

关键步骤在于由Spring来控制线程类的实例化。

新建多线程工具类:MultiThreadHelper.java 。

1
2
3
4
5
public abstract class MultiThreadHelper {
    // 必须是抽象方法且无参
    // 详见文末Spring官方文档Lookup部分
    public abstract DAOTask createDAOTask();
}

新建线程类:DAOTask.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class DAOTask implements Runnable {
    private DAO dao;
    private Object entity;

    public DAOTask(DAO dao) {
	this.dao = dao;
    }

    public void addEntity(Object entity) {
	this.entity = entity;
    }

    public void run() {
	dao.save(entity);
    }
}

新建Spring的配置文件: multi.xml 。

1
2
3
4
5
6
7
8
<bean id="multiThreadHelper" class="MultiThreadHelper">
    <lookup-method name="createDAOTask" bean="daoTask"/>
</bean>

<!-- 注意要用prototype,每次都生成一个新的 -->
<bean id="daoTask" class="DAOTask" scope="prototype">
    <constructor-arg ref="dao"/>
</bean>

新的BigService类(省略了Spring配置文件)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class BigService {

    private MultiThreadHelper multiThreadHelper; // 通过Spring注入

    public void execute() {
	doSomething();

	List entities = buildManyManyEntities();
	// 使用多线程存储
	saveInParallel(entities);

	doOthers();
    }

    void save(Object entities) {
	// 直接存入数据库
	// 单个对象和集合都可以
    }

    void saveInParallel(List entities) {
	int cpus = RunTime.getRunTime().availableProcessors();
	ExecuteService exec = Executors.newFixedThreadPool(cpus);

	try {
	    for (final Object each : entities) {   
		// 一定要调用这个Lookup方法生成线程
		DAOTask task = getMultiThreadHelper().createDAOTask();
		task.addEntity(each);
		exec.execute(task);
	    }
	} finally {
	    exec.shutdown(); // 关闭资源
	}
    }
}

最后需要注意的是,要把 run 方法纳入事务范围。传播类型REQUIREDREQUIRES_NEW 没看出明显差别。

大功告成!

时间从本来的22s左右,减少为4s左右。当然,这仅仅是响应时间(从调用开始到调用结束),不代表数据都已经存入库中了(通过日志可以看出来)。

线程数量的选择

之前看过的一本书(《Java并发编程实战》)里说:线程数量等于服务器的CPU数量+1时能实现最优的利用率。

于是分别测试了N+1,N,N-1三种情况,差别不大,N-1的略胜一筹。不知道是不是仅仅因为需要创建的线程数少了,导致响应时间变短。

说明:这里的N指CPU的数量,测试中是4 。

问题

这里存在一个问题:事务不能跟主线程一起统一管理,即如果主线程中发生了错误多线程写入的数据不会回滚。

不过有个小技巧——先删后插,每次调这个方法前都先删一下。

如有更好的方法,欢迎留言讨论。

参考链接

 

Spring | UnexpectedRollbackException 异常(已解决)

项目中遇到了这样的异常:

1
org.springframework.transaction.UnexpectedRollbackException: Transaction rolled back because it has been marked as rollback-only

项目概要

调用逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 最外层 Service
public class MainService {
	public void executeAAA() {
		try {
			...
			nestedService.executeBBB();
			...
		} catch (Exception e) {
			// 更新操作日志表
			updateTblOperationLog(e);
		}
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
pulblic class NestedService {
	public void executeBBB() {
		try {
			...
			nestedNestedService.sendMsg();
			...
		
		} catch (Exception e) {
			throws new BusinessException(e);
		}
	}
}
1
2
3
4
5
6
7
8
9
pulblic class NestedNestedService {
	public void sendMsg() throws BusinessException {
		try {
			...
		} catch (Exception e) {
			throws new BusinessException(e);
		}
	}
}

事务配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<aop:config proxy-target-class="true">
	<aop:pointcut id="servicePointcut" 
		expression="execution(* com.XXX..*service..*(..))" />
	<aop:advisor pointcut-ref="servicePointcut" advice-ref="txAdvice" order="1" />
</aop:config>
<tx:annotation-driven transaction-manager = "transactionManager" />
<tx:advice id="txAdvice" transaction-manager="transactionManager">
	<tx:attributes>
		<tx:method name="save*" propagation="REQUIRED" rollback-for="BusinessException"/>
		<tx:method name="add*" propagation="REQUIRED" rollback-for="BusinessException"/>
		<tx:method name="insert*" propagation="REQUIRED" rollback-for="BusinessException"/>
		<tx:method name="update*" propagation="REQUIRED" rollback-for="BusinessException"/>
		<tx:method name="delete*" propagation="REQUIRED" rollback-for="BusinessException"/>
		<tx:method name="validate*" propagation="REQUIRED" rollback-for="BusinessException"/>
		<tx:method name="calculate*" propagation="REQUIRED" rollback-for="BusinessException"/>
		<tx:method name="execute*" propagation="REQUIRED" rollback-for="BusinessException"/>
		<tx:method name="find*" read-only="true" />
		<tx:method name="get*" read-only="true" />
		<tx:method name="load*" read-only="true" />
		<tx:method name="*" read-only="true"/>
	</tx:attributes>
</tx:advice>

出现这种没碰过的问题,第一反应当然就是 Google 了。
翻了很多 Blog 和 论坛,也尝试了很多办法,但都失败了。

尝试

原因已经找到:通过查看日志,知道问题是出现在调用 nestedNestedService.sendMsg() 时。由于对方服务不通,导致这个方法出现异常。

尝试1

总的来说就是事务嵌套的问题。网上说可以为每一个方法都起一个事务,修改事务配置文件:

1
<tx:method name="execute*" propagation="REQUIRED" rollback-for="BusinessException"/>

改为

1
<tx:method name="execute*" propagation="REQUIRES_NEW" rollback-for="BusinessException"/>

这个不行,还是会报一样的错误。

尝试2

考虑到出错的那个方法 sendMsg() 不在事务的预定义里面,属于 * 的范畴。

1
<tx:method name="*" read-only="true"/>

改成开启独立事务的模式(挂起前一个,当前方法新起一个事务):

1
<tx:method name="*" propagation="REQUIRES_NEW"/>

不行。

改成不用事务的,因为该用事务的前面定义的差不多了:

1
<tx:method name="*" propagation="NOT_SUPPORTED"/>

还是不行。报错说“事务是必须的”……

那就改成 SUPPORTS 的吧,意思是有事务就用,没有也没关系

1
<tx:method name="*" propagation="SUPPORTS"/>

还是不行。跟一开始一样的错。

* 不行,那就专门给以 send 开头的方法设置一个。
跟上面一样,尝试了 REQUIRES_NEWNOT_SUPPORTED ,都以失败告终。

解决办法

修改配置不行。那就只能从代码入手了。
看了这么多资料,做了这么多尝试,问题点差不多找到了:方法 nestedService.executeBBB() 抛出了 BusinessException 异常导致的。即使最外层的 Service 的 mainService.executeAAA() 做了 try-catch 也不行。

由于项目中的 Service 类里有很多 execute 开头的方法,所以不能随意的更改 rollback-for 配置。
事务里定义了如果遇到 BusinessException 就回滚。
怎样让它不回滚呢?
那就新增一个异常类吧。

  1. 新增异常类:
    1
    public class ServiceException extends Exception {}
  2. 修改 NestedNestedService 类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    pulblic class NestedNestedService {
    	public void sendMsg() throws Exception {
    		try {
    			...
    		} catch (Exception e) {
    			//throws new BusinessException(e);
    			// 直接 throw ,就不再封装了
    			throws e;
    		}
    	}
    }
  3. 修改 NestedService 类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    pulblic class NestedService {
    	public void executeBBB() {
    		try {
    			...
    			nestedNestedService.sendMsg();
    			...
    		
    		} catch (Exception e) {
    			// throws new BusinessException(e);
    			
    			// 换成 ServiceException 异常
    			throws new ServiceException(e);
    		}
    	}
    }

其实之前都是抱着试试看的态度。
问题就这样解决了。