下面列出了org.mockito.internal.stubbing.answers.ReturnsElementsOf#com.rabbitmq.client.GetResponse 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Helper method to retrieve queue message from rabbitMQ
*
* @return result
* @throws Exception
*/
private static String consumeWithoutCertificate() throws Exception {
String result = "";
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5671);
factory.useSslProtocol();
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
GetResponse chResponse = channel.basicGet("WithoutClientCertQueue", true);
if(chResponse != null) {
byte[] body = chResponse.getBody();
result = new String(body);
}
channel.close();
conn.close();
return result;
}
@Override
public GetResponse basicGet(String queue, boolean autoAck) {
if (DIRECT_REPLY_TO_QUEUE.equals(queue)) {
queue = directReplyToQueue;
if (!autoAck) {
throw new IllegalStateException("direct reply-to requires autoAck");
}
}
GetResponse getResponse = node.basicGet(lastGeneratedIfEmpty(queue), autoAck, this::nextDeliveryTag);
if (getResponse != null) {
metricsCollectorWrapper.consumedMessage(this, getResponse.getEnvelope().getDeliveryTag(), autoAck);
}
return getResponse;
}
@Test
void exchangeDeclare_twice_keeps_existing_bindings() throws IOException, TimeoutException {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
String exchangeName = "test1";
channel.exchangeDeclare(exchangeName, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, "unused");
// Declare the same exchange a second time
channel.exchangeDeclare(exchangeName, "fanout");
channel.basicPublish("test1", "unused", null, "test".getBytes());
GetResponse getResponse = channel.basicGet(queueName, true);
assertThat(getResponse).isNotNull();
}
}
}
@Test
void exchangeBind_binds_two_exchanges() throws IOException, TimeoutException {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
assertThat(channel.exchangeDeclare("ex-from", BuiltinExchangeType.FANOUT)).isNotNull();
assertThat(channel.exchangeDeclare("ex-to", BuiltinExchangeType.FANOUT)).isNotNull();
assertThat(channel.queueDeclare()).isNotNull();
assertThat(channel.exchangeBind("ex-to", "ex-from", "test.key")).isNotNull();
assertThat(channel.queueBind("", "ex-to", "queue.used")).isNotNull();
channel.basicPublish("ex-from", "unused", null, "test message".getBytes());
GetResponse response = channel.basicGet("", false);
assertThat(response).isNotNull();
assertThat(new String(response.getBody())).isEqualTo("test message");
}
}
}
@Test
void exchangeBindNoWait_binds_two_exchanges() throws IOException, TimeoutException {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
assertThat(channel.exchangeDeclare("ex-from", BuiltinExchangeType.FANOUT)).isNotNull();
assertThat(channel.exchangeDeclare("ex-to", BuiltinExchangeType.FANOUT)).isNotNull();
assertThat(channel.queueDeclare()).isNotNull();
channel.exchangeBindNoWait("ex-to", "ex-from", "test.key", null);
assertThat(channel.queueBind("", "ex-to", "queue.used")).isNotNull();
channel.basicPublish("ex-from", "unused", null, "test message".getBytes());
GetResponse response = channel.basicGet("", true);
assertThat(response).isNotNull();
assertThat(new String(response.getBody())).isEqualTo("test message");
}
}
}
@Test
void exchangeUnbind_removes_binding() throws IOException, TimeoutException {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
assertThat(channel.exchangeDeclare("ex-from", BuiltinExchangeType.FANOUT)).isNotNull();
assertThat(channel.exchangeDeclare("ex-to", BuiltinExchangeType.FANOUT)).isNotNull();
assertThat(channel.queueDeclare()).isNotNull();
channel.exchangeBindNoWait("ex-to", "ex-from", "test.key", null);
assertThat(channel.queueBind("", "ex-to", "queue.used")).isNotNull();
assertThat(channel.exchangeUnbind("ex-to", "ex-from", "test.key")).isNotNull();
channel.basicPublish("ex-from", "unused", null, "test message".getBytes());
GetResponse response = channel.basicGet("", true);
assertThat(response).isNull();
}
}
}
@Test
void exchangeUnbindNoWait_removes_binding() throws IOException, TimeoutException {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
assertThat(channel.exchangeDeclare("ex-from", BuiltinExchangeType.FANOUT)).isNotNull();
assertThat(channel.exchangeDeclare("ex-to", BuiltinExchangeType.FANOUT)).isNotNull();
assertThat(channel.queueDeclare()).isNotNull();
channel.exchangeBindNoWait("ex-to", "ex-from", "test.key", null);
assertThat(channel.queueBind("", "ex-to", "queue.used")).isNotNull();
channel.exchangeUnbindNoWait("ex-to", "ex-from", "test.key", null);
channel.basicPublish("ex-from", "unused", null, "test message".getBytes());
GetResponse response = channel.basicGet("", true);
assertThat(response).isNull();
}
}
}
@Test
void basicNack_with_requeue_replaces_message_in_queue() throws IOException, TimeoutException {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
String queueName = channel.queueDeclare().getQueue();
channel.basicPublish("", queueName, null, "test message".getBytes());
GetResponse getResponse = channel.basicGet("", false);
channel.basicNack(getResponse.getEnvelope().getDeliveryTag(), true, true);
getResponse = channel.basicGet("", false);
channel.basicReject(getResponse.getEnvelope().getDeliveryTag(), false);
assertThat(channel.basicGet("", false)).isNull();
}
}
}
@Test
void dead_letter_routing_key_is_used_when_a_message_is_rejected_without_requeue() throws IOException, TimeoutException {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
channel.queueDeclare("rejected", true, false, false, null);
channel.queueBindNoWait("rejected", "", "rejected", null);
queue("fruits")
.withDeadLetterExchange("")
.withDeadLetterRoutingKey("rejected")
.declare(channel);
channel.basicPublish("", "fruits", null, "banana".getBytes());
GetResponse getResponse = channel.basicGet("fruits", false);
channel.basicNack(getResponse.getEnvelope().getDeliveryTag(), false, true);
assertThat(channel.messageCount("rejected")).isEqualTo(0);
assertThat(channel.messageCount("fruits")).isEqualTo(1);
getResponse = channel.basicGet("fruits", false);
channel.basicReject(getResponse.getEnvelope().getDeliveryTag(), false);
assertThat(channel.messageCount("rejected")).isEqualTo(1);
assertThat(channel.messageCount("fruits")).isEqualTo(0);
}
}
}
@Test
void metrics_collector_is_invoked_on_basic_ack() throws IOException, TimeoutException {
MockConnectionFactory mockConnectionFactory = new MockConnectionFactory();
SimpleMeterRegistry registry = new SimpleMeterRegistry();
mockConnectionFactory.setMetricsCollector(new MicrometerMetricsCollector(registry));
try (MockConnection connection = mockConnectionFactory.newConnection();
Channel channel = connection.createChannel(42)) {
String queueName = channel.queueDeclare().getQueue();
channel.basicPublish("", queueName, null, "".getBytes());
GetResponse getResponse = channel.basicGet(queueName, false);
assertThat(registry.get("rabbitmq.acknowledged").counter().count()).isEqualTo(0);
channel.basicAck(getResponse.getEnvelope().getDeliveryTag(), false);
assertThat(registry.get("rabbitmq.acknowledged").counter().count()).isEqualTo(1);
}
}
@Test
void metrics_collector_is_invoked_on_basic_nack() throws IOException, TimeoutException {
MockConnectionFactory mockConnectionFactory = new MockConnectionFactory();
SimpleMeterRegistry registry = new SimpleMeterRegistry();
mockConnectionFactory.setMetricsCollector(new MicrometerMetricsCollector(registry));
try (MockConnection connection = mockConnectionFactory.newConnection();
Channel channel = connection.createChannel(42)) {
String queueName = channel.queueDeclare().getQueue();
channel.basicPublish("", queueName, null, "".getBytes());
GetResponse getResponse = channel.basicGet(queueName, false);
assertThat(registry.get("rabbitmq.rejected").counter().count()).isEqualTo(0);
channel.basicNack(getResponse.getEnvelope().getDeliveryTag(), false, false);
assertThat(registry.get("rabbitmq.rejected").counter().count()).isEqualTo(1);
}
}
@Test
void metrics_collector_is_invoked_on_basic_reject() throws IOException, TimeoutException {
MockConnectionFactory mockConnectionFactory = new MockConnectionFactory();
SimpleMeterRegistry registry = new SimpleMeterRegistry();
mockConnectionFactory.setMetricsCollector(new MicrometerMetricsCollector(registry));
try (MockConnection connection = mockConnectionFactory.newConnection();
Channel channel = connection.createChannel(42)) {
String queueName = channel.queueDeclare().getQueue();
channel.basicPublish("", queueName, null, "".getBytes());
GetResponse getResponse = channel.basicGet(queueName, false);
assertThat(registry.get("rabbitmq.rejected").counter().count()).isEqualTo(0);
channel.basicReject(getResponse.getEnvelope().getDeliveryTag(), false);
assertThat(registry.get("rabbitmq.rejected").counter().count()).isEqualTo(1);
}
}
@Test
public void basicGet(final MockTracer tracer) throws IOException {
final String exchangeName = "basicGetExchange";
final String queueName = "basicGetQueue";
final String routingKey = "#";
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
final byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
final GetResponse response = channel.basicGet(queueName, false);
assertNotNull(response.getBody());
final List<MockSpan> finishedSpans = tracer.finishedSpans();
assertEquals(2, finishedSpans.size());
assertNull(tracer.activeSpan());
}
private void doRunWithoutLock() {
try {
EventBaseDTO dto;
GetResponse getResponse;
while ((getResponse = channel.basicGet(dataKey, false)) != null) {
//反序列化对象
ByteArrayInputStream bais = new ByteArrayInputStream(getResponse.getBody());
ObjectInputStream ois = new ObjectInputStream(bais);
dto = (EventBaseDTO) ois.readObject();
doHandleWithoutLock(dto, retryTimes);
channel.basicAck(getResponse.getEnvelope().getDeliveryTag(), false);
}
} catch (Exception e) {
e.printStackTrace();
log.severe("接收处理数据失败:" + e.toString());
}
}
/**
* Will construct a {@link FlowFile} containing the body of the consumed
* AMQP message (if {@link GetResponse} returned by {@link AMQPConsumer} is
* not null) and AMQP properties that came with message which are added to a
* {@link FlowFile} as attributes, transferring {@link FlowFile} to
* 'success' {@link Relationship}.
*/
@Override
protected void rendezvousWithAmqp(ProcessContext context, ProcessSession processSession) throws ProcessException {
final GetResponse response = this.targetResource.consume();
if (response != null){
FlowFile flowFile = processSession.create();
flowFile = processSession.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(response.getBody());
}
});
BasicProperties amqpProperties = response.getProps();
flowFile = AMQPUtils.updateFlowFileAttributesWithAmqpProperties(amqpProperties, flowFile, processSession);
processSession.getProvenanceReporter().receive(flowFile,
this.amqpConnection.toString() + "/" + context.getProperty(QUEUE).getValue());
processSession.transfer(flowFile, REL_SUCCESS);
} else {
context.yield();
}
}
public TestChannel(Map<String, String> exchangeToRoutingKeyMappings,
Map<String, List<String>> routingKeyToQueueMappings) {
this.enqueuedMessages = new HashMap<>();
this.routingKeyToQueueMappings = routingKeyToQueueMappings;
if (this.routingKeyToQueueMappings != null) {
for (List<String> queues : routingKeyToQueueMappings.values()) {
for (String queue : queues) {
this.enqueuedMessages.put(queue, new ArrayBlockingQueue<GetResponse>(100));
}
}
}
this.exchangeToRoutingKeyMappings = exchangeToRoutingKeyMappings;
this.executorService = Executors.newCachedThreadPool();
this.returnListeners = new ArrayList<>();
this.open = true;
}
@Override
public void run() {
while (!client.isStopped()) {
PoolableChannel channel = client.channel();
try {
GetResponse response = channel.basicGet(queue, false);
if (response != null) {
workers.submit(doHandle(response, channel));
} else {
channel.close();
sleepSeconds(1);
}
} catch (Throwable e) {
logger.error("failed to pull message", e);
channel.setValid(false);
channel.close();
sleepSeconds(10);
}
}
}
List<GetResponse> buildQueue(final Random random, final int bound) {
final LinkedList<GetResponse> queue = new LinkedList();
for (int i = 0; i < bound; i++) {
AMQP.BasicProperties props = Mockito.mock(AMQP.BasicProperties.class);
Mockito.when(props.getMessageId()).thenReturn(UUID.randomUUID().toString());
Envelope envelope = Mockito.mock(Envelope.class);
Mockito.when(envelope.getDeliveryTag()).thenReturn(random.nextLong());
GetResponse response = Mockito.mock(GetResponse.class);
Mockito.when(response.getProps()).thenReturn(props);
Mockito.when(response.getEnvelope()).thenReturn(envelope);
Mockito.when(response.getBody()).thenReturn("{}".getBytes());
Mockito.when(response.getMessageCount()).thenReturn(bound - i);
queue.add(response);
}
return queue;
}
private void testGetMessagesFromQueueAndDefaultConfiguration(Channel channel, Connection connection,
boolean queueExists, boolean useWorkingChannel)
throws IOException, TimeoutException {
final Random random = new Random();
final String queueName = RandomStringUtils.randomAlphabetic(30);
AMQPSettings settings = new AMQPSettings(configuration).fromURI("amqp_queue:" + queueName);
List<GetResponse> queue = buildQueue(random, batchSize);
channel = mockChannelForQueue(channel, useWorkingChannel, queueExists, queueName, queue);
AMQPObservableQueue observableQueue = new AMQPObservableQueue(
mockConnectionFactory(connection),
addresses, false, settings, batchSize, pollTimeMs);
assertArrayEquals(addresses, observableQueue.getAddresses());
assertEquals(AMQPConstants.AMQP_QUEUE_TYPE, observableQueue.getType());
assertEquals(AMQPConstants.AMQP_QUEUE_TYPE+":"+queueName, observableQueue.getName());
assertEquals(queueName, observableQueue.getURI());
assertEquals(batchSize, observableQueue.getBatchSize());
assertEquals(pollTimeMs, observableQueue.getPollTimeInMS());
assertEquals(queue.size(), observableQueue.size());
runObserve(channel, observableQueue, queueName, useWorkingChannel, batchSize);
}
private void testGetMessagesFromQueueAndDefaultConfiguration_close(Channel channel, Connection connection,
boolean queueExists, boolean useWorkingChannel)
throws IOException, TimeoutException {
final Random random = new Random();
final String queueName = RandomStringUtils.randomAlphabetic(30);
AMQPSettings settings = new AMQPSettings(configuration).fromURI("amqp_queue:" + queueName);
List<GetResponse> queue = buildQueue(random, batchSize);
channel = mockChannelForQueue(channel, useWorkingChannel, queueExists, queueName, queue);
AMQPObservableQueue observableQueue = new AMQPObservableQueue(
mockConnectionFactory(connection),
addresses, false, settings, batchSize, pollTimeMs);
observableQueue.close();
assertArrayEquals(addresses, observableQueue.getAddresses());
assertEquals(AMQPConstants.AMQP_QUEUE_TYPE, observableQueue.getType());
assertEquals(AMQPConstants.AMQP_QUEUE_TYPE+":"+queueName, observableQueue.getName());
assertEquals(queueName, observableQueue.getURI());
assertEquals(batchSize, observableQueue.getBatchSize());
assertEquals(pollTimeMs, observableQueue.getPollTimeInMS());
assertEquals(queue.size(), observableQueue.size());
}
/**
* Make delivery serializable by cloning all non-serializable values into serializable ones. If it
* is not possible, initial delivery is returned and error message is logged
*
* @param processed
* @return
*/
private static GetResponse serializableDeliveryOf(GetResponse processed) {
// All content of envelope is serializable, so no problem there
Envelope envelope = processed.getEnvelope();
// in basicproperties, there may be LongString, which are *not* serializable
BasicProperties properties = processed.getProps();
BasicProperties nextProperties =
new BasicProperties.Builder()
.appId(properties.getAppId())
.clusterId(properties.getClusterId())
.contentEncoding(properties.getContentEncoding())
.contentType(properties.getContentType())
.correlationId(properties.getCorrelationId())
.deliveryMode(properties.getDeliveryMode())
.expiration(properties.getExpiration())
.headers(serializableHeaders(properties.getHeaders()))
.messageId(properties.getMessageId())
.priority(properties.getPriority())
.replyTo(properties.getReplyTo())
.timestamp(properties.getTimestamp())
.type(properties.getType())
.userId(properties.getUserId())
.build();
return new GetResponse(
envelope, nextProperties, processed.getBody(), processed.getMessageCount());
}
public RabbitMqMessage(String routingKey, GetResponse delivery) {
this.routingKey = routingKey;
delivery = serializableDeliveryOf(delivery);
body = delivery.getBody();
contentType = delivery.getProps().getContentType();
contentEncoding = delivery.getProps().getContentEncoding();
headers = delivery.getProps().getHeaders();
deliveryMode = delivery.getProps().getDeliveryMode();
priority = delivery.getProps().getPriority();
correlationId = delivery.getProps().getCorrelationId();
replyTo = delivery.getProps().getReplyTo();
expiration = delivery.getProps().getExpiration();
messageId = delivery.getProps().getMessageId();
timestamp = delivery.getProps().getTimestamp();
type = delivery.getProps().getType();
userId = delivery.getProps().getUserId();
appId = delivery.getProps().getAppId();
clusterId = delivery.getProps().getClusterId();
}
public TestChannel(Map<String, String> exchangeToRoutingKeyMappings,
Map<String, List<String>> routingKeyToQueueMappings) {
this.enqueuedMessages = new HashMap<>();
this.routingKeyToQueueMappings = routingKeyToQueueMappings;
if (this.routingKeyToQueueMappings != null) {
for (List<String> queues : routingKeyToQueueMappings.values()) {
for (String queue : queues) {
this.enqueuedMessages.put(queue, new ArrayBlockingQueue<GetResponse>(100));
}
}
}
this.exchangeToRoutingKeyMappings = exchangeToRoutingKeyMappings;
this.executorService = Executors.newCachedThreadPool();
this.returnListeners = new ArrayList<>();
this.open = true;
}
@Override
public String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException {
final BlockingQueue<GetResponse> messageQueue = enqueuedMessages.get(queue);
if (messageQueue == null) {
throw new IOException("Queue is not defined");
}
consumerMap.computeIfAbsent(queue, q -> new ArrayList<>()).add(callback);
final String consumerTag = UUID.randomUUID().toString();
GetResponse message;
while ((message = messageQueue.poll()) != null) {
callback.handleDelivery(consumerTag, message.getEnvelope(), message.getProps(), message.getBody());
}
return consumerTag;
}
public List<String> popAllMessages() throws IOException, InterruptedException {
List<String> messages = new ArrayList<>();
GetResponse response;
while ((response = channel.basicGet(routeKey, true)) != null) {
messages.add(new String(response.getBody()));
}
return messages;
}
public GetResponse basicGet(boolean autoAck, Supplier<Long> deliveryTagSupplier) {
long deliveryTag = deliveryTagSupplier.get();
Message message = messages.poll();
if (message != null) {
if (message.isExpired()) {
deadLetterWithReason(message, DeadLettering.ReasonType.EXPIRED);
return null;
} else {
if (!autoAck) {
unackedMessagesByDeliveryTag.put(deliveryTag, message);
}
Envelope envelope = new Envelope(
deliveryTag,
false,
message.exchangeName,
message.routingKey);
LOGGER.debug(localized("basic_get a message"));
return new GetResponse(
envelope,
message.props,
message.body,
messages.size());
}
} else {
LOGGER.debug(localized("basic_get no message available"));
return null;
}
}
@Test
void basic_get_case() throws IOException, TimeoutException {
String exchangeName = "test-exchange";
String routingKey = "test.key";
try (Connection conn = new MockConnectionFactory().newConnection()) {
assertThat(conn).isInstanceOf(MockConnection.class);
try (Channel channel = conn.createChannel()) {
assertThat(channel).isInstanceOf(MockChannel.class);
channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);
byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
GetResponse response = channel.basicGet(queueName, false);
if (response == null) {
fail("AMQP GetReponse must not be null");
} else {
byte[] body = response.getBody();
assertThat(new String(body)).isEqualTo("Hello, world!");
long deliveryTag = response.getEnvelope().getDeliveryTag();
channel.basicAck(deliveryTag, false);
}
}
}
}
@Test
void transaction_commit_propagate_publish_ack_and_reject() throws IOException, TimeoutException {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
String firstQueue = channel.queueDeclare().getQueue();
String secondQueue = channel.queueDeclare().getQueue();
channel.basicPublish("", secondQueue, null, "to_ack".getBytes());
channel.basicPublish("", secondQueue, null, "to_reject".getBytes());
channel.basicPublish("", secondQueue, null, "to_nack".getBytes());
assertThat(channel.messageCount(secondQueue)).isEqualTo(3);
channel.txSelect();
channel.basicPublish("", firstQueue, null, "test message".getBytes());
assertThat(channel.messageCount(firstQueue)).isEqualTo(0L);
GetResponse getResponse;
while ((getResponse = channel.basicGet(secondQueue, false)) != null) {
if (new String(getResponse.getBody()).contains("reject")) {
channel.basicReject(getResponse.getEnvelope().getDeliveryTag(), false);
} else if (new String(getResponse.getBody()).contains("nack")) {
channel.basicNack(getResponse.getEnvelope().getDeliveryTag(), false, false);
} else {
channel.basicAck(getResponse.getEnvelope().getDeliveryTag(), false);
}
}
assertThat(channel.messageCount(secondQueue)).isEqualTo(0L);
assertThat(channel.txCommit()).isNotNull();
assertThat(channel.messageCount(firstQueue)).isEqualTo(1L);
assertThat(channel.messageCount(secondQueue)).isEqualTo(0L);
}
}
}
@Test
void transaction_rollback_propagate_publish_ack_and_reject() throws IOException, TimeoutException {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
String firstQueue = channel.queueDeclare().getQueue();
String secondQueue = channel.queueDeclare().getQueue();
channel.basicPublish("", secondQueue, null, "to_ack".getBytes());
channel.basicPublish("", secondQueue, null, "to_reject".getBytes());
channel.basicPublish("", secondQueue, null, "to_nack".getBytes());
assertThat(channel.messageCount(secondQueue)).isEqualTo(3);
channel.txSelect();
channel.basicPublish("", firstQueue, null, "test message".getBytes());
assertThat(channel.messageCount(firstQueue)).isEqualTo(0L);
GetResponse getResponse;
while ((getResponse = channel.basicGet(secondQueue, false)) != null) {
if (new String(getResponse.getBody()).contains("reject")) {
channel.basicReject(getResponse.getEnvelope().getDeliveryTag(), false);
} else if (new String(getResponse.getBody()).contains("nack")) {
channel.basicNack(getResponse.getEnvelope().getDeliveryTag(), false, false);
} else {
channel.basicAck(getResponse.getEnvelope().getDeliveryTag(), false);
}
}
assertThat(channel.messageCount(secondQueue)).isEqualTo(0L);
assertThat(channel.txRollback()).isNotNull();
assertThat(channel.messageCount(firstQueue)).isEqualTo(0L);
assertThat(channel.messageCount(secondQueue)).isEqualTo(0L);
}
}
}
@Test
void alternate_exchange_is_used_when_routing_fails() throws IOException, TimeoutException {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
channel.exchangeDeclare("ex-1", BuiltinExchangeType.TOPIC, true, false, Collections.singletonMap("alternate-exchange", "ex-2"));
channel.exchangeDeclare("ex-2", BuiltinExchangeType.FANOUT);
channel.queueDeclare("fruits", true, false, false, Collections.emptyMap());
channel.queueDeclare("unrouted", true, false, false, Collections.emptyMap());
assertThat(channel.queueBind("fruits", "ex-1", "fruit.*")).isNotNull();
assertThat(channel.queueBind("unrouted", "ex-2", "")).isNotNull();
channel.basicPublish("ex-1", "vegetable.carrot", null, "carrot".getBytes());
channel.basicPublish("ex-1", "fruit.orange", null, "orange".getBytes());
GetResponse response = channel.basicGet("fruits", true);
assertThat(response).isNotNull();
assertThat(new String(response.getBody())).isEqualTo("orange");
assertThat(channel.basicGet("fruits", true)).isNull();
response = channel.basicGet("unrouted", true);
assertThat(response).isNotNull();
assertThat(new String(response.getBody())).isEqualTo("carrot");
assertThat(channel.basicGet("unrouted", true)).isNull();
}
}
}