下面列出了org.springframework.boot.test.context.FilteredClassLoader#org.springframework.transaction.ReactiveTransactionManager 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private ReactiveTransactionInfo prepareTransactionInfo(@Nullable ReactiveTransactionManager tm,
@Nullable TransactionAttribute txAttr, String joinpointIdentification,
@Nullable ReactiveTransaction transaction) {
ReactiveTransactionInfo txInfo = new ReactiveTransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
// We need a transaction for this method...
if (logger.isTraceEnabled()) {
logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// The transaction manager will flag an error if an incompatible tx already exists.
txInfo.newReactiveTransaction(transaction);
}
else {
// The TransactionInfo.hasTransaction() method will return false. We created it only
// to preserve the integrity of the ThreadLocal stack maintained in this class.
if (logger.isTraceEnabled()) {
logger.trace("Don't need to create transaction for [" + joinpointIdentification +
"]: This method isn't transactional.");
}
}
return txInfo;
}
@Test
public void noTransaction() throws Exception {
ReactiveTransactionManager rtm = mock(ReactiveTransactionManager.class);
DefaultTestBean tb = new DefaultTestBean();
TransactionAttributeSource tas = new MapTransactionAttributeSource();
// All the methods in this class use the advised() template method
// to obtain a transaction object, configured with the when PlatformTransactionManager
// and transaction attribute source
TestBean itb = (TestBean) advised(tb, rtm, tas);
checkReactiveTransaction(false);
itb.getName();
checkReactiveTransaction(false);
// expect no calls
verifyZeroInteractions(rtm);
}
/**
* Check that a transaction is created and committed.
*/
@Test
public void transactionShouldSucceed() throws Exception {
TransactionAttribute txatt = new DefaultTransactionAttribute();
MapTransactionAttributeSource tas = new MapTransactionAttributeSource();
tas.register(getNameMethod, txatt);
ReactiveTransaction status = mock(ReactiveTransaction.class);
ReactiveTransactionManager rtm = mock(ReactiveTransactionManager.class);
// expect a transaction
given(rtm.getReactiveTransaction(txatt)).willReturn(Mono.just(status));
given(rtm.commit(status)).willReturn(Mono.empty());
DefaultTestBean tb = new DefaultTestBean();
TestBean itb = (TestBean) advised(tb, rtm, tas);
itb.getName()
.as(StepVerifier::create)
.verifyComplete();
verify(rtm).commit(status);
}
/**
* Check that a transaction is created and committed.
*/
@Test
public void transactionShouldSucceedWithNotNew() throws Exception {
TransactionAttribute txatt = new DefaultTransactionAttribute();
MapTransactionAttributeSource tas = new MapTransactionAttributeSource();
tas.register(getNameMethod, txatt);
ReactiveTransaction status = mock(ReactiveTransaction.class);
ReactiveTransactionManager rtm = mock(ReactiveTransactionManager.class);
// expect a transaction
given(rtm.getReactiveTransaction(txatt)).willReturn(Mono.just(status));
given(rtm.commit(status)).willReturn(Mono.empty());
DefaultTestBean tb = new DefaultTestBean();
TestBean itb = (TestBean) advised(tb, rtm, tas);
itb.getName()
.as(StepVerifier::create)
.verifyComplete();
verify(rtm).commit(status);
}
@Test
public void noExistingTransaction() {
ReactiveTransactionManager tm = new ReactiveTestTransactionManager(false, true);
tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_SUPPORTS))
.subscriberContext(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class)
.as(StepVerifier::create).consumeNextWith(actual ->
assertFalse(actual.hasTransaction())
).verifyComplete();
tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED))
.cast(GenericReactiveTransaction.class).subscriberContext(TransactionContextManager.createTransactionContext())
.as(StepVerifier::create).consumeNextWith(actual -> {
assertTrue(actual.hasTransaction());
assertTrue(actual.isNewTransaction());
}).verifyComplete();
tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_MANDATORY))
.subscriberContext(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class)
.as(StepVerifier::create).expectError(IllegalTransactionStateException.class).verify();
}
@Test
public void existingTransaction() {
ReactiveTransactionManager tm = new ReactiveTestTransactionManager(true, true);
tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_SUPPORTS))
.subscriberContext(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class)
.as(StepVerifier::create).consumeNextWith(actual -> {
assertNotNull(actual.getTransaction());
assertFalse(actual.isNewTransaction());
}).verifyComplete();
tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED))
.subscriberContext(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class)
.as(StepVerifier::create).consumeNextWith(actual -> {
assertNotNull(actual.getTransaction());
assertFalse(actual.isNewTransaction());
}).verifyComplete();
tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_MANDATORY))
.subscriberContext(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class)
.as(StepVerifier::create).consumeNextWith(actual -> {
assertNotNull(actual.getTransaction());
assertFalse(actual.isNewTransaction());
}).verifyComplete();
}
@Test
void didProvideOnlyReactiveTransactionManager() {
assertThat(this.applicationContext.getBean(ReactiveTransactionManager.class)).isInstanceOf(
ReactiveNeo4jTransactionManager.class);
assertThatExceptionOfType(NoSuchBeanDefinitionException.class)
.isThrownBy(() -> this.applicationContext.getBean(PlatformTransactionManager.class));
}
@Bean(ReactiveNeo4jRepositoryConfigurationExtension.DEFAULT_TRANSACTION_MANAGER_BEAN_NAME)
@ConditionalOnMissingBean(ReactiveTransactionManager.class)
public ReactiveTransactionManager transactionManager(Driver driver,
ReactiveDatabaseSelectionProvider databaseNameProvider) {
return new ReactiveNeo4jTransactionManager(driver, databaseNameProvider);
}
@Test
@DisplayName("Should require all needed classes")
void shouldRequireAllNeededClasses() {
contextRunner
.withClassLoader(
new FilteredClassLoader(ReactiveNeo4jTransactionManager.class, ReactiveTransactionManager.class, Flux.class))
.run(ctx -> assertThat(ctx)
.doesNotHaveBean(ReactiveNeo4jClient.class)
.doesNotHaveBean(ReactiveNeo4jTemplate.class)
.doesNotHaveBean(ReactiveNeo4jTransactionManager.class)
);
}
@Test
@DisplayName("…should honour existing transaction manager")
void shouldHonourExisting() {
contextRunner
.withUserConfiguration(ConfigurationWithExistingReactiveTransactionManager.class)
.run(ctx -> assertThat(ctx)
.hasSingleBean(ReactiveTransactionManager.class)
.hasBean("myCustomReactiveTransactionManager")
);
}
@Nullable
private ReactiveTransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {
// Do not attempt to lookup tx manager if no tx attributes are set
if (txAttr == null || beanFactory == null) {
return asReactiveTransactionManager(getTransactionManager());
}
String qualifier = txAttr.getQualifier();
if (StringUtils.hasText(qualifier)) {
return determineQualifiedTransactionManager(beanFactory, qualifier);
}
else if (StringUtils.hasText(transactionManagerBeanName)) {
return determineQualifiedTransactionManager(beanFactory, transactionManagerBeanName);
}
else {
ReactiveTransactionManager defaultTransactionManager = asReactiveTransactionManager(getTransactionManager());
if (defaultTransactionManager == null) {
defaultTransactionManager = asReactiveTransactionManager(
transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY));
if (defaultTransactionManager == null) {
defaultTransactionManager = beanFactory.getBean(ReactiveTransactionManager.class);
transactionManagerCache.putIfAbsent(
DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager);
}
}
return defaultTransactionManager;
}
}
private ReactiveTransactionManager determineQualifiedTransactionManager(BeanFactory beanFactory, String qualifier) {
ReactiveTransactionManager txManager = asReactiveTransactionManager(transactionManagerCache.get(qualifier));
if (txManager == null) {
txManager = BeanFactoryAnnotationUtils.qualifiedBeanOfType(
beanFactory, ReactiveTransactionManager.class, qualifier);
transactionManagerCache.putIfAbsent(qualifier, txManager);
}
return txManager;
}
@Nullable
private ReactiveTransactionManager asReactiveTransactionManager(@Nullable Object transactionManager) {
if (transactionManager == null || transactionManager instanceof ReactiveTransactionManager) {
return (ReactiveTransactionManager) transactionManager;
}
else {
throw new IllegalStateException(
"Specified transaction manager is not a ReactiveTransactionManager: " + transactionManager);
}
}
@SuppressWarnings("serial")
private Mono<ReactiveTransactionInfo> createTransactionIfNecessary(@Nullable ReactiveTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// If no name specified, apply method identification as transaction name.
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionAttribute attrToUse = txAttr;
Mono<ReactiveTransaction> tx = Mono.empty();
if (txAttr != null) {
if (tm != null) {
tx = tm.getReactiveTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
return tx.map(it -> prepareTransactionInfo(tm, attrToUse, joinpointIdentification, it)).switchIfEmpty(
Mono.defer(() -> Mono.just(prepareTransactionInfo(tm, attrToUse, joinpointIdentification, null))));
}
public ReactiveTransactionInfo(@Nullable ReactiveTransactionManager transactionManager,
@Nullable TransactionAttribute transactionAttribute, String joinpointIdentification) {
this.transactionManager = transactionManager;
this.transactionAttribute = transactionAttribute;
this.joinpointIdentification = joinpointIdentification;
}
@Override
protected Object advised(Object target, ReactiveTransactionManager ptm, TransactionAttributeSource[] tas) {
TransactionInterceptor ti = new TransactionInterceptor();
ti.setTransactionManager(ptm);
ti.setTransactionAttributeSources(tas);
ProxyFactory pf = new ProxyFactory(target);
pf.addAdvice(0, ti);
return pf.getProxy();
}
/**
* Template method to create an advised object given the
* target object and transaction setup.
* Creates a TransactionInterceptor and applies it.
*/
@Override
protected Object advised(Object target, ReactiveTransactionManager ptm, TransactionAttributeSource tas) {
TransactionInterceptor ti = new TransactionInterceptor();
ti.setTransactionManager(ptm);
assertThat(ti.getTransactionManager()).isEqualTo(ptm);
ti.setTransactionAttributeSource(tas);
assertThat(ti.getTransactionAttributeSource()).isEqualTo(tas);
ProxyFactory pf = new ProxyFactory(target);
pf.addAdvice(0, ti);
return pf.getProxy();
}
/**
* Check that two transactions are created and committed.
*/
@Test
public void twoTransactionsShouldSucceed() throws Exception {
TransactionAttribute txatt = new DefaultTransactionAttribute();
MapTransactionAttributeSource tas1 = new MapTransactionAttributeSource();
tas1.register(getNameMethod, txatt);
MapTransactionAttributeSource tas2 = new MapTransactionAttributeSource();
tas2.register(setNameMethod, txatt);
ReactiveTransaction status = mock(ReactiveTransaction.class);
ReactiveTransactionManager rtm = mock(ReactiveTransactionManager.class);
// expect a transaction
given(rtm.getReactiveTransaction(txatt)).willReturn(Mono.just(status));
given(rtm.commit(status)).willReturn(Mono.empty());
DefaultTestBean tb = new DefaultTestBean();
TestBean itb = (TestBean) advised(tb, rtm, new TransactionAttributeSource[] {tas1, tas2});
itb.getName()
.as(StepVerifier::create)
.verifyComplete();
Mono.from(itb.setName("myName"))
.as(StepVerifier::create)
.verifyComplete();
verify(rtm, times(2)).commit(status);
}
/**
* Simulate a transaction infrastructure failure.
* Shouldn't invoke target method.
*/
@Test
public void cannotCreateTransaction() throws Exception {
TransactionAttribute txatt = new DefaultTransactionAttribute();
Method m = getNameMethod;
MapTransactionAttributeSource tas = new MapTransactionAttributeSource();
tas.register(m, txatt);
ReactiveTransactionManager rtm = mock(ReactiveTransactionManager.class);
// Expect a transaction
CannotCreateTransactionException ex = new CannotCreateTransactionException("foobar", null);
given(rtm.getReactiveTransaction(txatt)).willThrow(ex);
DefaultTestBean tb = new DefaultTestBean() {
@Override
public Mono<String> getName() {
throw new UnsupportedOperationException(
"Shouldn't have invoked target method when couldn't create transaction for transactional method");
}
};
TestBean itb = (TestBean) advised(tb, rtm, tas);
itb.getName()
.as(StepVerifier::create)
.expectError(CannotCreateTransactionException.class)
.verify();
}
/**
* Simulate failure of the underlying transaction infrastructure to commit.
* Check that the target method was invoked, but that the transaction
* infrastructure exception was thrown to the client
*/
@Test
public void cannotCommitTransaction() throws Exception {
TransactionAttribute txatt = new DefaultTransactionAttribute();
Method m = setNameMethod;
MapTransactionAttributeSource tas = new MapTransactionAttributeSource();
tas.register(m, txatt);
// Method m2 = getNameMethod;
// No attributes for m2
ReactiveTransactionManager rtm = mock(ReactiveTransactionManager.class);
ReactiveTransaction status = mock(ReactiveTransaction.class);
given(rtm.getReactiveTransaction(txatt)).willReturn(Mono.just(status));
UnexpectedRollbackException ex = new UnexpectedRollbackException("foobar", null);
given(rtm.commit(status)).willReturn(Mono.error(ex));
given(rtm.rollback(status)).willReturn(Mono.empty());
DefaultTestBean tb = new DefaultTestBean();
TestBean itb = (TestBean) advised(tb, rtm, tas);
String name = "new name";
Mono.from(itb.setName(name))
.as(StepVerifier::create)
.consumeErrorWith(throwable -> {
assertEquals(RuntimeException.class, throwable.getClass());
assertEquals(ex, throwable.getCause());
})
.verify();
// Should have invoked target and changed name
itb.getName()
.as(StepVerifier::create)
.expectNext(name)
.verifyComplete();
}
@Bean("myCustomReactiveTransactionManager")
ReactiveTransactionManager transactionManager() {
return mock(ReactiveTransactionManager.class);
}
@Autowired ReactiveIdGeneratorsIT(Driver driver, ReactiveTransactionManager transactionManager) {
super(driver);
this.transactionManager = transactionManager;
}
@Autowired ReactiveAuditingIT(Driver driver, ReactiveTransactionManager transactionManager) {
super(driver);
this.transactionManager = transactionManager;
}
@Autowired ReactiveCallbacksIT(Driver driver, ReactiveTransactionManager transactionManager) {
super(driver);
this.transactionManager = transactionManager;
}
ReactiveTransactionManager getTransactionManager() {
return transactionManager;
}
public ReactiveTransactionManager getTransactionManager() {
Assert.state(this.transactionManager != null, "No ReactiveTransactionManager set");
return this.transactionManager;
}
/**
* Return the transaction management strategy to be used.
*/
public ReactiveTransactionManager getTransactionManager() {
return this.transactionManager;
}
/**
* Check that the when exception thrown by the target can produce the
* desired behavior with the appropriate transaction attribute.
* @param ex exception to be thrown by the target
* @param shouldRollback whether this should cause a transaction rollback
*/
@SuppressWarnings("serial")
protected void doTestRollbackOnException(
final Exception ex, final boolean shouldRollback, boolean rollbackException) throws Exception {
TransactionAttribute txatt = new DefaultTransactionAttribute() {
@Override
public boolean rollbackOn(Throwable t) {
assertThat(t).isSameAs(ex);
return shouldRollback;
}
};
Method m = exceptionalMethod;
MapTransactionAttributeSource tas = new MapTransactionAttributeSource();
tas.register(m, txatt);
ReactiveTransaction status = mock(ReactiveTransaction.class);
ReactiveTransactionManager rtm = mock(ReactiveTransactionManager.class);
// Gets additional call(s) from TransactionControl
given(rtm.getReactiveTransaction(txatt)).willReturn(Mono.just(status));
TransactionSystemException tex = new TransactionSystemException("system exception");
if (rollbackException) {
if (shouldRollback) {
given(rtm.rollback(status)).willReturn(Mono.error(tex));
}
else {
given(rtm.commit(status)).willReturn(Mono.error(tex));
}
}
else {
given(rtm.commit(status)).willReturn(Mono.empty());
given(rtm.rollback(status)).willReturn(Mono.empty());
}
DefaultTestBean tb = new DefaultTestBean();
TestBean itb = (TestBean) advised(tb, rtm, tas);
itb.exceptional(ex)
.as(StepVerifier::create)
.expectErrorSatisfies(actual -> {
if (rollbackException) {
assertThat(actual).isEqualTo(tex);
}
else {
assertThat(actual).isEqualTo(ex);
}
}).verify();
if (!rollbackException) {
if (shouldRollback) {
verify(rtm).rollback(status);
}
else {
verify(rtm).commit(status);
}
}
}
protected Object advised(
Object target, ReactiveTransactionManager rtm, TransactionAttributeSource[] tas) throws Exception {
return advised(target, rtm, new CompositeTransactionAttributeSource(tas));
}
@Bean
ReactiveTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new R2dbcTransactionManager(connectionFactory);
}