下面列出了org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate#org.springframework.amqp.rabbit.connection.ConnectionFactory 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testCloudProfile() {
this.context = new SpringApplicationBuilder(SimpleProcessor.class,
MockCloudConfiguration.class).web(WebApplicationType.NONE)
.profiles("cloud").run();
BinderFactory binderFactory = this.context.getBean(BinderFactory.class);
Binder<?, ?, ?> binder = binderFactory.getBinder(null, MessageChannel.class);
assertThat(binder).isInstanceOf(RabbitMessageChannelBinder.class);
DirectFieldAccessor binderFieldAccessor = new DirectFieldAccessor(binder);
ConnectionFactory binderConnectionFactory = (ConnectionFactory) binderFieldAccessor
.getPropertyValue("connectionFactory");
ConnectionFactory connectionFactory = this.context
.getBean(ConnectionFactory.class);
assertThat(binderConnectionFactory).isNotSameAs(connectionFactory);
assertThat(TestUtils.getPropertyValue(connectionFactory, "addresses"))
.isNotNull();
assertThat(TestUtils.getPropertyValue(binderConnectionFactory, "addresses"))
.isNull();
Cloud cloud = this.context.getBean(Cloud.class);
verify(cloud).getSingletonServiceConnector(ConnectionFactory.class, null);
}
public static void assertConfigProperties(ConnectionFactory connector, Integer channelCacheSize,
Integer requestedHeartbeat, Integer connectionTimeout,
int connectionLimit, boolean publisherConfirms) {
Integer timeoutToTest = connectionTimeout < 0 ? DEFAULT_FACTORY_TIMEOUT : connectionTimeout;
Integer heartBeatToTest = requestedHeartbeat < 0 ? DEFAULT_FACTORY_HEARTBEAT : requestedHeartbeat;
assertNotNull(connector);
assertEquals(channelCacheSize, ReflectionTestUtils.getField(connector, "channelCacheSize"));
assertEquals(connectionLimit, ReflectionTestUtils.getField(connector, "connectionLimit"));
assertEquals(publisherConfirms, ReflectionTestUtils.getField(connector, "publisherConfirms"));
Object rabbitConnectionFactory = ReflectionTestUtils.getField(connector, "rabbitConnectionFactory");
assertNotNull(rabbitConnectionFactory);
assertEquals(heartBeatToTest, ReflectionTestUtils.getField(rabbitConnectionFactory, "requestedHeartbeat"));
assertEquals(timeoutToTest, ReflectionTestUtils.getField(rabbitConnectionFactory, "connectionTimeout"));
}
@Test
public void cloudScanWithAllTypesOfServices() {
ApplicationContext testContext = getTestApplicationContext(createMysqlService("mysqlDb"),
createPostgresqlService("postDb"),
createMongoService("mongoDb"),
createRedisService("redisDb"),
createRabbitService("rabbit"));
assertNotNull("Getting service by id", testContext.getBean("mysqlDb"));
assertNotNull("Getting service by id and type", testContext.getBean("mysqlDb", DataSource.class));
assertNotNull("Getting service by id", testContext.getBean("postDb"));
assertNotNull("Getting service by id and type", testContext.getBean("postDb", DataSource.class));
assertNotNull("Getting service by id", testContext.getBean("mongoDb"));
assertNotNull("Getting service by id and type", testContext.getBean("mongoDb", MongoDbFactory.class));
assertNotNull("Getting service by id", testContext.getBean("redisDb"));
assertNotNull("Getting service by id and type", testContext.getBean("redisDb", RedisConnectionFactory.class));
assertNotNull("Getting service by id", testContext.getBean("rabbit"));
assertNotNull("Getting service by id and type", testContext.getBean("rabbit", ConnectionFactory.class));
}
/**
* Creates a {@link ConnectionFactory} using the singleton service
* connector.
* @param cloud {@link Cloud} instance to be used for accessing services.
* @param connectorConfigObjectProvider the {@link ObjectProvider} for the
* {@link RabbitConnectionFactoryConfig}.
* @param applicationContext application context instance
* @param rabbitProperties rabbit properties
* @return the {@link ConnectionFactory} used by the binder.
* @throws Exception if configuration of connection factory fails
*/
@Bean
@Primary
ConnectionFactory rabbitConnectionFactory(Cloud cloud,
ObjectProvider<RabbitConnectionFactoryConfig> connectorConfigObjectProvider,
ConfigurableApplicationContext applicationContext,
RabbitProperties rabbitProperties) throws Exception {
ConnectionFactory connectionFactory = cloud
.getSingletonServiceConnector(ConnectionFactory.class,
connectorConfigObjectProvider.getIfUnique());
configureCachingConnectionFactory(
(CachingConnectionFactory) connectionFactory,
applicationContext, rabbitProperties);
return connectionFactory;
}
@Test
public void testParentConnectionFactoryInheritedIfOverridden() {
context = new SpringApplicationBuilder(SimpleProcessor.class,
ConnectionFactoryConfiguration.class).web(WebApplicationType.NONE)
.run("--server.port=0");
BinderFactory binderFactory = context.getBean(BinderFactory.class);
Binder<?, ?, ?> binder = binderFactory.getBinder(null, MessageChannel.class);
assertThat(binder).isInstanceOf(RabbitMessageChannelBinder.class);
DirectFieldAccessor binderFieldAccessor = new DirectFieldAccessor(binder);
ConnectionFactory binderConnectionFactory = (ConnectionFactory) binderFieldAccessor
.getPropertyValue("connectionFactory");
assertThat(binderConnectionFactory).isSameAs(MOCK_CONNECTION_FACTORY);
ConnectionFactory connectionFactory = context.getBean(ConnectionFactory.class);
assertThat(binderConnectionFactory).isSameAs(connectionFactory);
CompositeHealthContributor bindersHealthIndicator = context
.getBean("bindersHealthContributor", CompositeHealthContributor.class);
assertThat(bindersHealthIndicator).isNotNull();
RabbitHealthIndicator indicator = (RabbitHealthIndicator) bindersHealthIndicator.getContributor("rabbit");
assertThat(indicator).isNotNull();
// mock connection factory behaves as if down
assertThat(indicator.health().getStatus())
.isEqualTo(Status.DOWN);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
template.setEncoding(SeedConstants.DEFAULT_CHARSET);
//消息发送失败时,返回到队列中(需要spring.rabbitmq.publisherReturns=true)
template.setMandatory(true);
//消息成功到达exchange,但没有queue与之绑定时触发的回调(即消息发送不到任何一个队列中)
//也可以在生产者发送消息的类上实现org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback和RabbitTemplate.ReturnCallback两个接口(本例中即为SendController.java)
template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> LogUtil.getLogger().error("消息发送失败,replyCode={},replyText={},exchange={},routingKey={},消息体=[{}]", replyCode, replyText, exchange, routingKey, JSON.toJSONString(message.getBody())));
//消息成功到达exchange后触发的ack回调(需要spring.rabbitmq.publisherConfirms=true)
template.setConfirmCallback((correlationData, ack, cause) -> {
if(ack){
LogUtil.getLogger().info("消息发送成功,消息ID={}", correlationData.getId());
}else{
LogUtil.getLogger().error("消息发送失败,消息ID={},cause={}", correlationData.getId(), cause);
}
});
return template;
}
@Bean
ConnectionFactory rabbitConnectionFactory(final RabbitMqSetupService rabbitmqSetupService) {
final CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost(rabbitmqSetupService.getHostname());
factory.setPort(5672);
factory.setUsername(rabbitmqSetupService.getUsername());
factory.setPassword(rabbitmqSetupService.getPassword());
try {
factory.setVirtualHost(rabbitmqSetupService.createVirtualHost());
// All exception are catched. The BrokerRunning decide if the
// test should break or not
} catch (@SuppressWarnings("squid:S2221") final Exception e) {
Throwables.propagateIfInstanceOf(e, AlivenessException.class);
LOG.error("Cannot create virtual host.", e);
}
return factory;
}
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(MessageConverter converter,
ConnectionFactory connectionFactory, @Autowired(required = false) List<Consumer<?>> list) {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
init(list);
Receiver receiver = new Receiver(converter);
container.setMessageListener(new MessageListenerAdapter(receiver, converter));
container.setQueueNames(queueNames.toArray(new String[0]));
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setPrefetchCount(prefetchCount);
container.setTxSize(txSize);
return container;
}
@Bean
SimpleMessageListenerContainer containerAnaylsis(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueNames(queueNameSzz);
container.setMessageListener(new MessageReceivedComponent(rabbitTemplate(connectionFactory()), dbEntryDao));
return container;
}
@Bean
public ConnectionFactory connectionFactory() {
CommonProperties.RabbitMQ rabbitMQ = commonProperties.getRabbitMQ();
CachingConnectionFactory factory = new CachingConnectionFactory(rabbitMQ.getHostname(), rabbitMQ.getPort());
if (rabbitMQ.getUsername() != null && rabbitMQ.getPassword() != null) {
factory.setUsername(rabbitMQ.getUsername());
factory.setPassword(rabbitMQ.getPassword());
}
return factory;
}
private void assertConnectorPropertiesMatchUri(ConnectionFactory connector, String uriString) throws Exception {
assertNotNull(connector);
URI uri = new URI(uriString);
assertEquals(uri.getHost(), connector.getHost());
assertEquals(uri.getPort(), connector.getPort());
com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory =
(com.rabbitmq.client.ConnectionFactory) ReflectionTestUtils.getField(connector, "rabbitConnectionFactory");
String[] userInfo = uri.getRawUserInfo().split(":");
assertEquals(userInfo[0], rabbitConnectionFactory.getUsername());
assertEquals(userInfo[1], rabbitConnectionFactory.getPassword());
assertTrue(uri.getPath().endsWith(connector.getVirtualHost()));
}
@Bean
public RabbitAdmin rabbitAdmin(Queue queue, ConnectionFactory connectionFactory) {
final TopicExchange exchange = new TopicExchange("myExchange", true, false);
final RabbitAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareQueue(queue);
admin.declareExchange(exchange);
admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("#"));
return admin;
}
private void assertConnectorPropertiesMatchHosts(ConnectionFactory connector, List<String> uriStrings) throws Exception {
Address[] addresses = (Address[]) ReflectionTestUtils.getField(connector, "addresses");
assertNotNull(addresses);
assertEquals(uriStrings.size(), addresses.length);
for (int i = 0; i < uriStrings.size(); i++) {
URI uri = new URI(uriStrings.get(i));
assertEquals(uri.getHost(), addresses[i].getHost());
assertEquals(uri.getPort(), addresses[i].getPort());
}
}
/**
* decalre specified excahnge
* @param connectionFactory
* @param exchangeType
* @param exchangeName
* @param durable
* @param autoDelete
*/
public static void declareExchange(ConnectionFactory connectionFactory,
String exchangeType,
String exchangeName,
boolean durable,
boolean autoDelete) {
Exchange x = createExchange(exchangeType, exchangeName, durable, autoDelete);
RabbitAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareExchange(x);
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
@Bean
public ConnectionFactory connectionFactory() {
//LOG.info("Creating RabbitMQHost ConnectionFactory for host: {}", rabbitMQHost);
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(rabbitMQHost);
return cachingConnectionFactory;
}
private void setConnectionFactoryUri(com.rabbitmq.client.ConnectionFactory connectionFactory, String uri) {
try {
connectionFactory.setUri(uri);
} catch (Exception e) {
throw new IllegalArgumentException("Invalid AMQP URI", e);
}
}
@Bean("springConnectionFactory")
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUsername(this.user);
factory.setPassword(this.pass);
factory.setHost(this.host);
factory.setPort(this.port);
return factory;
}
@Bean
public ConnectionFactory connectionFactory() {
//LOG.info("Creating RabbitMQHost ConnectionFactory for host: {}", rabbitMQHost);
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(rabbitMQHost);
return cachingConnectionFactory;
}
@Test
public void cloudRabbitConnectionFactoryWithProperties() {
ApplicationContext testContext = getTestApplicationContext("cloud-rabbit-with-config.xml", createService("my-service"));
ConnectionFactory connector = testContext.getBean("service-properties", getConnectorType());
RabbitConnectionFactoryCloudConfigTestHelper.assertConfigProperties(connector, DEFAULT_CHANNEL_CACHE_SIZE, 5, 10, 15, true);
}
@Test
public void 値がオーバライドされて指定のインスタンスがDIコンテナに登録される() {
assertThat(config.host, is("192.168.10.10"));
assertThat(config.port, is("5673"));
assertThat(config.username, is("jfw"));
assertThat(config.password, is("jfw"));
assertThat(config.channelCacheSize, is(100));
assertThat(context.getBean(ConnectionFactory.class), is(notNullValue()));
assertThat(context.getBean(RabbitTemplate.class), is(notNullValue()));
assertThat(context.getBean(MessageConverter.class), is(notNullValue()));
}
@Bean("springConnectionFactory")
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUsername(this.user);
factory.setPassword(this.pass);
factory.setHost(this.host);
factory.setPort(this.port);
return factory;
}
@Bean RabbitTemplate newRabbitTemplate(
ConnectionFactory connectionFactory,
Binding binding,
SpringRabbitTracing springRabbitTracing
) {
RabbitTemplate result = springRabbitTracing.newRabbitTemplate(connectionFactory);
result.setExchange(binding.getExchange());
return result;
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setAddresses(host + ":" + port);
factory.setVirtualHost(virtualHost);
factory.setUsername(username);
factory.setPassword(password);
return factory;
}
private String toString(ConnectionFactory rabbitConnectionFactory) {
if (rabbitConnectionFactory == null) {
return "<none>";
} else {
return rabbitConnectionFactory.getHost() + ":"
+ rabbitConnectionFactory.getPort();
}
}
@Override
public void subscribe(String clientId, BinLogDistributorClient binLogDistributorClient) {
List<QueueInfo> queueList = rabbitHttpClient.getQueues(vhost);
ConnectionFactory connectionFactory = rabbitMQClient.getConnectionFactory();
//处理历史数据
queueList.stream().filter(queueInfo -> queueInfo.getName().startsWith(DATA + clientId) && !queueInfo.getName().endsWith("-Lock"))
.forEach(queueInfo -> executors.submit(new DataHandler(queueInfo.getName(), clientId, binLogDistributorClient, redissonClient, connectionFactory)));
try {
Channel channel = connectionFactory.createConnection().createChannel(false);
channel.queueDeclare(NOTIFIER + clientId, true, false, true, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
//每次推送都会执行这个方法,每次开线程,使用线程里面redis锁判断开销太大,先在外面判断一次
if (!DataHandler.DATA_KEY_IN_PROCESS.contains(msg)) {
//如果没在处理再进入
executors.submit(new DataHandler(msg, clientId, binLogDistributorClient, redissonClient, connectionFactory));
}
}
};
channel.basicConsume(NOTIFIER + clientId, true, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}
@Bean
public SimpleRabbitListenerContainerFactory retryQueuesContainerFactory(ConnectionFactory connectionFactory, RetryQueuesInterceptor retryInterceptor) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
Advice[] adviceChain = { retryInterceptor };
factory.setAdviceChain(adviceChain);
return factory;
}
@Before
public void setup() {
ConnectionFactory defaultConnectionFactory = rabbitAvailableRule.getResource();
String[] addresses = new String[] { "localhost:9999", "localhost:5672" };
String[] adminAddresses = new String[] { "http://localhost:15672",
"http://localhost:15672" };
String[] nodes = new String[] { "[email protected]", "[email protected]" };
String vhost = "/";
String username = "guest";
String password = "guest";
this.lqcf = new LocalizedQueueConnectionFactory(defaultConnectionFactory,
addresses, adminAddresses, nodes, vhost, username, password, false, null,
null, null, null);
}
public RabbitExchangeQueueProvisioner(ConnectionFactory connectionFactory,
List<DeclarableCustomizer> customizers) {
this.rabbitAdmin = new RabbitAdmin(connectionFactory);
this.autoDeclareContext.refresh();
this.rabbitAdmin.setApplicationContext(this.autoDeclareContext);
this.rabbitAdmin.afterPropertiesSet();
this.customizers = customizers;
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setAddresses(host + ":" + port);
factory.setVirtualHost(virtualHost);
factory.setUsername(username);
factory.setPassword(password);
return factory;
}