近期热推文章:
1、SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据;
2、SpringBoot整合多数据源,并支持动态新增与切换(详细教程)
一、简介
我们在开发时常常会遇到多线程事务的相关问题,以为添加了@Transactional
注解就行了,其实加了注解之后会发现事务失效。这是为什么呢?原因就是:数据库连接Spring是放在threadLocal里面,多线程场景下,拿到的数据库连接是不一样的,也就是属于不同事务。那要怎么处理呢?
要实现多线程事务回滚,我们可以采用以下两种方式:
1、基于注解式事务管理:在方法上加上@Async注解,并结合@Transactional注解来启用异步事务管理。首先,我们需要在@Configuration类中使用@EnableAsync注解启用异步支持。然后,在方法上加上@Async注解来开启异步执行。接着,我们需要在方法上同时添加@Transactional注解来启用事务管理。这样,当方法执行时,Spring会将其放入一个独立的线程中并异步执行,同时也会在子线程中开启一个新的事务。当发生异常时,整个子线程中的事务都会进行回滚。具体实现方式参考:SpringBoot使用@Async实现多线程异步
2、基于编程式事务管理:在方法中使用编程式事务管理器,并手动在异常发生时进行事务回滚。首先,我们需要在方法上加上@Transactional注解来启用事务管理。然后,在方法体中,我们可以通过获取当前事务管理器的实例,并通过它来进行事务管理。当出现异常时,我们可以使用TransactionAspectSupport.currentTransactionStatus().setRollbackOnly()方法来进行事务回滚。这样,当任何一个线程中的事务发生异常时,整个事务都会回滚。
二、具体实现
2.1、自定义线程池
/**
* @Description: TODO:利用ThreadPoolTaskExecutor多线程批量执行相关配置
* 自定义线程池
* 发现不是线程数越多越好,具体多少合适,网上有一个不成文的算法:CPU核心数量*2 +2 个线程。
* @Author: yyalin
* @CreateDate: 2022/11/6 11:56
* @Version: V1.0
*/
public class ThreadPoolConfig {
//获取cpu核心数
private final static int processNum = Runtime.getRuntime().availableProcessors();
//自定义使用参数
private int corePoolSize; //配置核心线程数
private int maxPoolSize; //配置最大线程数
private int queueCapacity;
private String namePrefix;
private int keepAliveSeconds;
//1、自定义asyncServiceExecutor线程池
public ThreadPoolTaskExecutor asyncServiceExecutor() {
log.info("start asyncServiceExecutor......");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
//线程池创建的核心线程数,线程池维护线程的最少数量,即使没有任务需要执行,也会一直存活
executor.setCorePoolSize(corePoolSize);
//配置队列大小 设置任务等待队列的大小
//阻塞队列 当核心线程数达到最大时,新任务会放在队列中排队等待执行
executor.setQueueCapacity(queueCapacity);
//配置最大线程数
//最大线程池数量,当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
//任务队列已满时, 且当线程数=maxPoolSize,,线程池会拒绝处理任务而抛出异常
executor.setMaxPoolSize(maxPoolSize);
//设置线程空闲等待时间 s
//当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
//允许线程空闲时间30秒,当maxPoolSize的线程在空闲时间到达的时候销毁
//如果allowCoreThreadTimeout=true,则会直到线程数量=0
executor.setKeepAliveSeconds(keepAliveSeconds);
//配置线程池中的线程的名称前缀
//设置线程池内线程名称的前缀-------阿里编码规约推荐--方便出错后进行调试
executor.setThreadNamePrefix(namePrefix);
// rejection-policy:拒绝策略:当线程数已经达到maxSize的时候,如何处理新任务
// CallerRunsPolicy():交由调用方线程运行,比如 main 线程;如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行, (个人推荐)
// AbortPolicy():该策略是线程池的默认策略,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
// DiscardPolicy():如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常
// DiscardOldestPolicy():丢弃队列中最老的任务,队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean,这样这些异步任务的销毁就会先于Redis线程池的销毁
executor.setWaitForTasksToCompleteOnShutdown(true);
//设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住。
executor.setAwaitTerminationSeconds(60);
//执行初始化
executor.initialize();
return executor;
}
2.2、主事务注解
/**
* @Description: TODO:多线程事务注解: 主事务
* @Author: yyalin
* @CreateDate: 2023/12/9 14:48
* @Version: V1.0
*/
({ElementType.METHOD})
(RetentionPolicy.RUNTIME)
public MainTransaction {
int value();//子线程数量
}
@MainTransaction注解:用在调用方,其参数为必填,参数值为本方法中调用的方法开启的线程数,如:在这个方法中调用的方法中有2个方法用@Async注解开启了子线程,则参数为@MainTransaction(2),另外如果未使用@MainTransaction注解,则直接已无多线程事务执行(不影响方法的单线程事务)
2.3、子事务注解
/**
* @Description: TODO:多线程事务注解: 子事务
* @Author: yyalin
* @CreateDate: 2023/12/9 14:51
* @Version: V1.0
*/
public ChildTransaction {
String value() default "";
}
用在被调用方(开启线程的方法),无需传入参数。注意的是:两个注解都是用在方法上的,须配合@Transactional(rollbackFor = Exception.class)一起使用
2.4、多线程事务AOP
/**
* @Description: TODO:多线程事务AOP
* @Author: yyalin
* @CreateDate: 2023/12/9 15:06
* @Version: V1.0
*/
@Aspect
@Component
public class TransactionAop {
//用来存储各线程计数器数据(每次执行后会从map中删除)
private static final Map<String, Object> map = new HashMap<>();
@Resource
private PlatformTransactionManager transactionManager;
/**
* 功能描述:主事务
* @MethodName: mainIntercept
* @MethodParam: [joinPoint, mainTransaction]
* @Return: void
* @Author: yyalin
* @CreateDate: 2023/12/9 15:10
*/
@Around("@annotation(mainTransaction)")
public void mainIntercept(ProceedingJoinPoint joinPoint, MainTransaction mainTransaction) throws Throwable {
//当前线程名称
String threadName = Thread.currentThread().getName();
//初始化计数器
CountDownLatch mainDownLatch = new CountDownLatch(1);
//@MainTransaction注解中的参数, 为子线程的数量
CountDownLatch childDownLatch = new CountDownLatch(mainTransaction.value());
// 用来记录子线程的运行状态,只要有一个失败就变为true
AtomicBoolean rollBackFlag = new AtomicBoolean(false);
// 用来存每个子线程的异常,把每个线程的自定义异常向vector的首位置插入,其余异常向末位置插入,避免线程不安全,所以使用vector代替list
Vector<Throwable> exceptionVector = new Vector<>();
map.put(threadName + "mainDownLatch", mainDownLatch);
map.put(threadName + "childDownLatch", childDownLatch);
map.put(threadName + "rollBackFlag", rollBackFlag);
map.put(threadName + "exceptionVector", exceptionVector);
try {
joinPoint.proceed();//执行方法
} catch (Throwable e) {
exceptionVector.add(0, e);
rollBackFlag.set(true);//子线程回滚
mainDownLatch.countDown();//放行所有子线程
}
if (!rollBackFlag.get()) {
try {
// childDownLatch等待,直到所有子线程执行完插入操作,但此时还没有提交事务
childDownLatch.await();
// 根据rollBackFlag状态放行子线程的await处,告知是回滚还是提交
mainDownLatch.countDown();
} catch (Exception e) {
rollBackFlag.set(true);
exceptionVector.add(0, e);
}
}
if (CollectionUtils.isNotEmpty(exceptionVector)) {
map.remove(threadName + "mainDownLatch");
map.remove(threadName + "childDownLatch");
map.remove(threadName + "rollBackFlag");
map.remove(threadName + "exceptionVector");
throw exceptionVector.get(0);
}
}
/**
* 功能描述:子事务
* @MethodName: sonIntercept
* @MethodParam: [joinPoint]
* @Return: void
* @Author: yyalin
* @CreateDate: 2023/12/9 15:12
*/
@Around("@annotation(ChildTransaction)")
public void childIntercept(ProceedingJoinPoint joinPoint) throws Throwable {
Object[] args = joinPoint.getArgs();
Thread thread = (Thread) args[args.length - 1];
String threadName = thread.getName();
CountDownLatch mainDownLatch = (CountDownLatch) map.get(threadName + "mainDownLatch");
if (mainDownLatch == null) {
//主事务未加注解时, 直接执行子事务
joinPoint.proceed();//这里最好的方式是:交由上面的thread来调用此方法,但我没有找寻到对应api,只能直接放弃事务, 欢迎大神来优化, 留言分享
return;
}
CountDownLatch childDownLatch = (CountDownLatch) map.get(threadName + "childDownLatch");
AtomicBoolean rollBackFlag = (AtomicBoolean) map.get(threadName + "rollBackFlag");
Vector<Throwable> exceptionVector = (Vector<Throwable>) map.get(threadName + "exceptionVector");
//如果这时有一个子线程已经出错,那当前线程不需要执行
if (rollBackFlag.get()) {
childDownLatch.countDown();
return;
}
DefaultTransactionDefinition def = new DefaultTransactionDefinition();// 开启事务
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);// 设置事务隔离级别
TransactionStatus status = transactionManager.getTransaction(def);
try {
joinPoint.proceed();//执行方法
childDownLatch.countDown();// 对sonDownLatch-1
mainDownLatch.await();// 如果mainDownLatch不是0,线程会在此阻塞,直到mainDownLatch变为0
// 如果能执行到这一步说明所有子线程都已经执行完毕判断如果atomicBoolean是true就回滚false就提交
if (rollBackFlag.get()) {
transactionManager.rollback(status);
} else {
transactionManager.commit(status);
}
} catch (Throwable e) {
exceptionVector.add(0, e);
// 回滚
transactionManager.rollback(status);
// 并把状态设置为true
rollBackFlag.set(true);
mainDownLatch.countDown();
childDownLatch.countDown();
}
}
说明:
用计数 1 初始化的 mainDownLatch
当作一个简单的开/关锁存器,或入口:在通过调用 countDown()
的线程打开入口前,所有调用 await 的线程都一直在入口处等待。
用子线程数量 初始化的 childDownLatch
可以使一个线程在 N 个线程完成某项操作之前一直等待,或者使其在某项操作完成 N 次之前一直等待。
2.5、子服务类
/**
* @Description: TODO:定义子服务
* @Author: yyalin
* @CreateDate: 2023/12/9 15:24
* @Version: V1.0
*/
4j
public class ChildService {
/**
* 参数说明: 以下4个方法参数和此相同
*
* @param args
* @param thread ,
*
*
*/
/**
* 功能描述:
* @MethodName: childMethod1
* @MethodParam: [args:业务中需要传递的参数, thread:调用者的线程, 用于aop获取参数, 不建议以方法重写的方式简略此参数
* 在调用者方法中可以以此参数为标识计算子线程的个数作为注解参数,避免线程参数计算错误导致锁表
* 传参时参数固定为: Thread.currentThread()]
* @Return: void
* @Author: yyalin
* @CreateDate: 2023/12/9 15:30
*/
(rollbackFor = Exception.class)
"asyncServiceExecutor") (
public void childMethod1(String args, Thread thread) {
log.info("childMethod1:"+args + "开启了线程");
log.info("开始处理业务...");
//处理具体的业务
//todo:插入业务表数据
}
(rollbackFor = Exception.class)
"asyncServiceExecutor") (
public void childMethod2(String args1, String args2, Thread thread) {
log.info("childMethod2:"+args1 + "和" + args2 + "开启了线程");
log.info("开始处理业务...");
//模拟发送异常
int num=1/0;
log.info("发生异常...");
//todo:插入业务表数据
}
(rollbackFor = Exception.class)
"asyncServiceExecutor") (
public void childMethod3(String args, Thread thread) {
log.info("childMethod3:"+args + "开启了线程");
log.info("开始处理业务...");
//todo:插入业务表数据
}
//childMethod4方法没有使用线程池
(rollbackFor = Exception.class)
public void childMethod4(String args) {
log.info("childMethod4:"+args + "没有开启线程");
log.info("开始处理业务...");
//todo:插入业务表数据
}
}
2.6、主服务类
/**
* @Description: TODO:定义事务
* @Author: yyalin
* @CreateDate: 2023/12/9 15:40
* @Version: V1.0
*/
4j
public class MainService {
private ChildService childService;
/**
* 功能描述:调用的方法中childMethod1/childMethod2/childMethod3使用@Async开启了线程, 所以参数为: 3
* @MethodName: test1
* @MethodParam: []
* @Return: void
* @Author: yyalin
* @CreateDate: 2023/12/9 15:42
*/
3) (
(rollbackFor = Exception.class)
public void test1() {
//主线程中调用四个子线程事务
childService.childMethod1("小明", Thread.currentThread());
childService.childMethod2("小明02", "小明03", Thread.currentThread());
childService.childMethod3("小明04", Thread.currentThread());
childService.childMethod4("小明05");
log.info("结束处理业务...");
}
/*
* 有的业务中存在if的多种可能, 每一种走向调用的方法(开启线程的方法)数量如果不同, 这时可以选择放弃使用@MainTransaction注解避免锁表
* 这时候如果发生异常会导致多线程不能同时回滚, 可根据业务自己权衡是否使用
*/
(rollbackFor = Exception.class)
public void test2() {
childService.childMethod1("小李01", Thread.currentThread());
childService.childMethod2("小李02", "小李03", Thread.currentThread());
childService.childMethod3("小李04", Thread.currentThread());
childService.childMethod4("小李05");
}
最后:
有的业务中存在比较复杂的分支, 不同情况都会调用不同的方法,开启不同数量的线程,这时可以选择放弃使用@MainTransaction注解避免锁表,因此在使用过程中,需要根据自己的权衡。
三、源码获取方式
更多优秀文章,请关注个人微信公众号或搜索“程序猿小杨”查阅。然后回复:源码,可以获取对应的源码,开箱即可使用。
参考:https://www.jianshu.com/p/742e76585947
如果大家对相关文章感兴趣,可以关注微信公众号"程序猿小杨",会持续更新优秀文章!欢迎大家 分享、收藏、点赞、在看,您的支持就是我坚持下去的最大动力!谢谢!
转载于:https://mp.weixin.qq.com/s/eSfpezungihzUjHdtu-V8g