下面列出了怎么用com.rabbitmq.client.Envelope的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("body: " + new String(body));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if ((Integer) properties.getHeaders().get("num") == 0) {
// noack 重回队列
channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
@Test
void expired_message_should_be_consumable_after_being_dead_lettered() throws IOException, TimeoutException, InterruptedException {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
channel.exchangeDeclare("rejected-ex", BuiltinExchangeType.FANOUT);
channel.queueDeclare("rejected", true, false, false, null);
channel.queueBindNoWait("rejected", "rejected-ex", "unused", null);
queue("fruits").withMessageTtl(10L).withDeadLetterExchange("rejected-ex").declare(channel);
List<String> messages = new ArrayList<>();
channel.basicConsume("rejected", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) {
messages.add(new String(body));
}
});
channel.basicPublish("", "fruits", null, "banana".getBytes());
TimeUnit.MILLISECONDS.sleep(100L);
assertThat(messages).hasSize(1);
}
}
}
@Test
void multiple_expired_messages_are_not_delivered_to_consumer() throws IOException, TimeoutException, InterruptedException {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
queue("fruits").withMessageTtl(-1L).declare(channel);
List<String> messages = new ArrayList<>();
channel.basicConsume("fruits", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) {
messages.add(new String(body));
}
});
channel.basicPublish("", "fruits", null, "banana".getBytes());
channel.basicPublish("", "fruits", null, "orange".getBytes());
TimeUnit.MILLISECONDS.sleep(100L);
assertThat(messages).hasSize(0);
}
}
}
public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ地址
factory.setHost("127.0.0.1");
// 创建一个新的连接
Connection connection = factory.newConnection();
// 创建一个频道
Channel channel = connection.createChannel();
// 声明要关注的队列 -- 在RabbitMQ中,队列声明是幂等性的(一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同),也就是说,如果不存在,就创建,如果存在,不会对已经存在的队列产生任何影响。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("C [*] Waiting for messages. To exit press CTRL+C");
// DefaultConsumer类实现了Consumer接口,通过传入一个频道,告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("C [x] Received '" + message + "'");
}
};
// 自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(QUEUE_NAME, true, consumer);
}
public void consumer1() throws Exception {
//1、获取连接
Connection connection =RabbitMqConnectionFactoy.getConnection();
//2、声明通道
Channel channel = connection.createChannel();
//3、声明队列
channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME,ROUTER_KEY_1);
//同一时刻服务器只会发送一条消息给消费者(如果设置为N,则当客户端堆积N条消息后服务端不会推送给客户端了)
//channel.basicQos(1);//每次处理1个
//4、定义队列的消费者
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body)
throws IOException {
//获取并转成String
String message = new String(body, "UTF-8");
System.out.println("-->消费者1号,收到消息,msg :"+message+",header:"+properties.getHeaders().toString());
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME1, autoAck,consumer);
}
public void init() throws Exception {
//1、获取连接
connection = RabbitMqConnectionFactoy.getConnection();
//2、声明信道
channel = connection.createChannel();
//定义一个临时变量的接受队列名
replyQueueName = channel.queueDeclare().getQueue();
DefaultConsumer consumer= new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties,
byte[] body) throws IOException {
//检查它的correlationId是否是我们所要找的那个
CompletableFuture<String> future=call.get(properties.getCorrelationId());
if(future!=null)
{
future.complete(new String(body,"UTF-8"));
}
}
};
channel.basicConsume(replyQueueName, autoAck,consumer);
}
public void consumer1A() throws Exception {
//1、获取连接
Connection connection =RabbitMqConnectionFactoy.getConnection();
//2、声明通道
Channel channel = connection.createChannel();
//3、声明队列
channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME,"test.a");//只收到基数
//同一时刻服务器只会发送一条消息给消费者(如果设置为N,则当客户端堆积N条消息后服务端不会推送给客户端了)
//channel.basicQos(1);//每次处理1个
//4、定义队列的消费者
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body)
throws IOException {
//获取并转成String
String message = new String(body, "UTF-8");
System.out.println("-->消费者1A号,收到消息,msg :"+message+",header:"+properties.getHeaders().toString());
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME1, autoAck,consumer);
}
public void registerSubscriber(String routingKey, final EngineSubscriberResponseHandler responseHandler) throws Exception {
Channel channel = connection.createChannel();
String queueName = routingKey.replace(".", "_");
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, MessageConstants.EXCHANGE_NAME, routingKey);
channel.basicQos(ApplicationConfigProvider.getInstance().getMessageQueue().getPrefetchCount());
responseHandler.setChannel(channel);
log.debug("prefetchCount "+ApplicationConfigProvider.getInstance().getMessageQueue().getPrefetchCount() );
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
responseHandler.handleDelivery(consumerTag, envelope, properties, body);
}
};
channel.basicConsume(queueName, false, routingKey, consumer);
}
List<GetResponse> buildQueue(final Random random, final int bound) {
final LinkedList<GetResponse> queue = new LinkedList();
for (int i = 0; i < bound; i++) {
AMQP.BasicProperties props = Mockito.mock(AMQP.BasicProperties.class);
Mockito.when(props.getMessageId()).thenReturn(UUID.randomUUID().toString());
Envelope envelope = Mockito.mock(Envelope.class);
Mockito.when(envelope.getDeliveryTag()).thenReturn(random.nextLong());
GetResponse response = Mockito.mock(GetResponse.class);
Mockito.when(response.getProps()).thenReturn(props);
Mockito.when(response.getEnvelope()).thenReturn(envelope);
Mockito.when(response.getBody()).thenReturn("{}".getBytes());
Mockito.when(response.getMessageCount()).thenReturn(bound - i);
queue.add(response);
}
return queue;
}
@Test
public void negativeDeliveryAck() throws IOException, TimeoutException, InterruptedException {
//setup
long deliveryTag = 1L;
String exchange = "someExchange";
String routingKey = "someRoutingKey";
AMQP.BasicProperties properties = new AMQP.BasicProperties();
byte[] body = new byte[0];
Delivery delivery = new Delivery(new Envelope(deliveryTag, false, exchange, routingKey), properties, body);
//test
differPublisher.negativeDeliveryAck(delivery);
//assert
Mockito.verify(channel).basicNack(eq(deliveryTag), eq(false), eq(false));
Mockito.verify(channel).basicPublish(
eq(EDDI_EXCHANGE), eq(MESSAGE_CREATED_EDDI_FAILED_ROUTING_KEY), eq(null), eq(body));
Mockito.verify(channel).waitForConfirmsOrDie(eq(TIMEOUT_CONFIRMS_IN_MILLIS));
}
@Override
public void handleDelivery( String consumerTag, Envelope envelope, BasicProperties properties, byte[] body )
throws IOException {
try {
Message message = SerializationUtils.deserializeObject( body );
this.logger.finer( this.sourceName + " received a message " + message.getClass().getSimpleName()
+ " on routing key '" + envelope.getRoutingKey() + "'.");
this.messageQueue.add( message );
} catch( ClassNotFoundException | IOException e ) {
this.logger.severe( this.sourceName + ": a message could not be deserialized. => " + e.getClass().getSimpleName());
Utils.logException( this.logger, e );
this.messageQueue.errorWhileReceivingMessage();
}
}
protected void connect() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
channel = connection.createChannel();
String queueName = "flowing-retail-" + name;
channel.queueDeclare(queueName, true, false, false, null);
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true); // publish/subscribe model
channel.queueBind(queueName, EXCHANGE_NAME, "*");
System.out.println(" [*] Waiting for messages.");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
eventHandler.handleEvent(message);
}
};
channel.basicConsume(queueName, true, consumer);
}
byte[] readMessage() throws Exception {
final CountDownLatch countDown = new CountDownLatch(1);
final AtomicReference<byte[]> result = new AtomicReference<>();
// Don't close this as it invalidates the sender's connection!
Channel channel = sender.localChannel();
channel.basicConsume(sender.queue, true, new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
result.set(body);
countDown.countDown();
}
});
assertThat(countDown.await(10, TimeUnit.SECONDS))
.withFailMessage("Timed out waiting to read message")
.isTrue();
assertThat(result)
.withFailMessage("handleDelivery set null body")
.isNotNull();
return result.get();
}
void deliverWithAck(String consumerTag, Delivery message) throws IOException {
Envelope envelope = message.getEnvelope();
long deliveryTag = envelope.getDeliveryTag();
LOGGER.debug("Consuming message {} for consumer tag {}", envelope, consumerTag);
if (consumer.consume(consumerTag, envelope, message.getProperties(), message.getBody())) {
invokeAckAction(ch -> {
ch.basicAck(deliveryTag, false);
LOGGER.debug("Acknowledged {}", message);
});
} else {
invokeAckAction(ch -> {
ch.basicNack(deliveryTag, false, false);
LOGGER.debug("Not acknowledged {}", envelope);
});
}
}
public static void main(String[] args) {
CountDownLatch countDown = new CountDownLatch(1);
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("127.0.0.1");
try (Connection con = factory.newConnection(); Channel chn = con.createChannel()) {
AtomicLong receivedMessages = new AtomicLong();
String consumerTag =
chn.basicConsume("product.catalog_item.sync", true, new DefaultConsumer(chn) {
@Override
public void handleDelivery(String tag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
long actualCount = receivedMessages.incrementAndGet();
if (actualCount % 1000 == 0) {
System.out.println("Received " + actualCount + " messages so far.");
}
// countDown.countDown();
}
});
System.out.println(consumerTag);
countDown.await();
} catch (Exception e) {
e.printStackTrace();
}
}
@SuppressWarnings("boxing")
@Test
public void testHandleDelivery() throws Exception {
TestEvent event = new TestEvent();
byte[] body = "the message".getBytes();
Envelope envelope = new Envelope(123L, false, null, null);
BasicProperties properties = new BasicProperties();
when(decoder.willDecode(null)).thenReturn(true);
when(decoder.decode(body)).thenReturn(event);
when(eventSink.select(TestEvent.class)).thenReturn(testEventSink);
assertTrue(consumer.consume("consumerTag", envelope, properties, body));
verify(testEventSink).fire(event);
}
@Test
void deliverWithAckSuccess() throws IOException {
sut = new ConsumerHolder(eventConsumerMock, "queue", true, PREFETCH_COUNT,
consumerChannelFactoryMock, declarationsListMock, declarerRepositoryMock);
BasicProperties properties = MessageProperties.BASIC;
byte[] body = "some body".getBytes();
Envelope envelope = new Envelope(123L, false, "exchange", "routingKey");
Delivery message = new Delivery(envelope, properties, body);
when(consumerChannelFactoryMock.createChannel()).thenReturn(channelMock);
when(eventConsumerMock.consume("consumerTag", envelope, properties, body)).thenReturn(true);
sut.activate();
sut.deliverWithAck("consumerTag", message);
verify(channelMock).basicAck(123L, false);
}
@Test
void deliverWithAckSendFailed() throws IOException {
sut = new ConsumerHolder(eventConsumerMock, "queue", true, PREFETCH_COUNT,
consumerChannelFactoryMock, declarationsListMock, declarerRepositoryMock);
BasicProperties properties = MessageProperties.BASIC;
byte[] body = "some body".getBytes();
Envelope envelope = new Envelope(123L, false, "exchange", "routingKey");
Delivery message = new Delivery(envelope, properties, body);
when(consumerChannelFactoryMock.createChannel()).thenReturn(channelMock);
when(eventConsumerMock.consume("consumerTag", envelope, properties, body)).thenReturn(false);
sut.activate();
sut.deliverWithAck("consumerTag", message);
verify(channelMock).basicNack(123L, false, false);
}
@Test
void deliverWithAckFailedAck() throws IOException {
sut = new ConsumerHolder(eventConsumerMock, "queue", true, PREFETCH_COUNT,
consumerChannelFactoryMock, declarationsListMock, declarerRepositoryMock);
BasicProperties properties = MessageProperties.BASIC;
byte[] body = "some body".getBytes();
Envelope envelope = new Envelope(123L, false, "exchange", "routingKey");
Delivery message = new Delivery(envelope, properties, body);
IOException ioe = new IOException("some error");
when(consumerChannelFactoryMock.createChannel()).thenReturn(channelMock);
when(eventConsumerMock.consume("consumerTag", envelope, properties, body)).thenReturn(false);
doThrow(ioe).when(channelMock).basicNack(123L, false, false);
sut.activate();
assertThrows(IOException.class, () -> sut.deliverWithAck("consumerTag", message));
}
@Override
public void handleDelivery(String consumer_Tag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
long tag = envelope.getDeliveryTag();
if (envelope.isRedeliver() && (recoveredTags.contains(tag) || pendingAck.contains(tag))) {
if (recoveredTags.contains(tag)) {
pendingAck.add(tag);
}
return;
}
// Acknowledgements are sent at the end of the window after adding to idempotency manager
pendingAck.add(tag);
holdingBuffer.add(new KeyValPair<Long, byte[]>(tag, body));
logger.debug("Received Async message: {} buffersize: {} ", new String(body), holdingBuffer.size());
}
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
ContextCarrier contextCarrier = new ContextCarrier();
String url = (String) objInst.getSkyWalkingDynamicField();
Envelope envelope = (Envelope) allArguments[1];
AMQP.BasicProperties properties = (AMQP.BasicProperties) allArguments[2];
AbstractSpan activeSpan = ContextManager.createEntrySpan(OPERATE_NAME_PREFIX + "Topic/" + envelope.getExchange() + "Queue/" + envelope
.getRoutingKey() + CONSUMER_OPERATE_NAME_SUFFIX, null).start(System.currentTimeMillis());
Tags.MQ_BROKER.set(activeSpan, url);
Tags.MQ_TOPIC.set(activeSpan, envelope.getExchange());
Tags.MQ_QUEUE.set(activeSpan, envelope.getRoutingKey());
activeSpan.setComponent(ComponentsDefine.RABBITMQ_CONSUMER);
SpanLayer.asMQ(activeSpan);
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
if (properties.getHeaders() != null && properties.getHeaders().get(next.getHeadKey()) != null) {
next.setHeadValue(properties.getHeaders().get(next.getHeadKey()).toString());
}
}
ContextManager.extract(contextCarrier);
}
@Test
public void TestRabbitMQConsumerInterceptor() throws Throwable {
Envelope envelope = new Envelope(1111, false, "", "rabbitmq-test");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put(SW8CarrierItem.HEADER_NAME, "1-My40LjU=-MS4yLjM=-3-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA=");
AMQP.BasicProperties.Builder propsBuilder = new AMQP.BasicProperties.Builder();
Object[] arguments = new Object[] {
0,
envelope,
propsBuilder.headers(headers).build()
};
rabbitMQConsumerInterceptor.beforeMethod(enhancedInstance, null, arguments, null, null);
rabbitMQConsumerInterceptor.afterMethod(enhancedInstance, null, arguments, null, null);
List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
Assert.assertThat(traceSegments.size(), is(1));
}
/**
* Incoming messages
*/
@Override
public MessageProperties toMessageProperties(AMQP.BasicProperties source, Envelope envelope, String charset) {
final MessageProperties messageProperties = super.toMessageProperties(source, envelope, charset);
final TraceeFilterConfiguration filterConfiguration = backend.getConfiguration(profile);
if (filterConfiguration.shouldProcessContext(AsyncProcess)) {
// Values are stored as type of LongStringHelper.ByteArrayLongString - but it's private
final Map<String, String> traceeContextMap = transformToTraceeContextMap(
(Map<String, ?>) messageProperties.getHeaders().get(TPIC_HEADER));
if (traceeContextMap != null && !traceeContextMap.isEmpty()) {
backend.putAll(filterConfiguration.filterDeniedParams(traceeContextMap, AsyncProcess));
}
}
Utilities.generateInvocationIdIfNecessary(backend);
return messageProperties;
}
@Test
public void testRabbitMQConsumerInterceptorWithEmptyHeaders() throws Throwable {
Envelope envelope = new Envelope(1111, false, "", "rabbitmq-test");
Map<String, Object> headers = new HashMap<String, Object>();
AMQP.BasicProperties.Builder propsBuilder = new AMQP.BasicProperties.Builder();
Object[] arguments = new Object[] {
0,
envelope,
propsBuilder.headers(headers).build()
};
rabbitMQConsumerInterceptor.beforeMethod(enhancedInstance, null, arguments, null, null);
rabbitMQConsumerInterceptor.afterMethod(enhancedInstance, null, arguments, null, null);
List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
Assert.assertThat(traceSegments.size(), is(1));
}
@Test
public void testConsumerSingleMessage() throws Exception {
TransferQueue<RabbitMessage> messages = new LinkedTransferQueue<>();
Channel channel = mock(Channel.class);
final Consumer consumer = new StreamSetsMessageConsumer(channel, messages);
final Envelope envelope = new Envelope(1L, false, EXCHANGE_NAME, QUEUE_NAME);
executor.submit(new Runnable() {
@Override
public void run() {
try {
consumer.handleDelivery("consumerTag", envelope, null, TEST_MESSAGE_1.getBytes());
} catch (IOException ignored) {
// no op
}
}
});
RabbitMessage message = messages.take();
assertEquals(TEST_MESSAGE_1, new String(message.getBody(), StandardCharsets.UTF_8));
}
/**
* Called when a basic.deliver is received for this consumer.
*
* @param consumerTag the consumer tag associated with the consumer
* @param envelope packaging data for the message
* @param properties content header data for the message
* @param body the message body (opaque, client-specific byte array)
* @throws IOException if the consumer encounters an I/O error while processing the message
* @see Envelope
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
boolean successful = injectHandler.onMessage(properties, body, inboundName);
if (successful) {
if (!autoAck) {
channel.basicAck(envelope.getDeliveryTag(), false);
}
} else {
List<HashMap<String, Object>> xDeathHeader =
(ArrayList<HashMap<String, Object>>) properties.getHeaders().get("x-death");
// check if message has been already dead-lettered
if (xDeathHeader != null && xDeathHeader.size() > 0 && maxDeadLetteredCount != -1) {
Long count = (Long) xDeathHeader.get(0).get("count");
if (count <= maxDeadLetteredCount) {
channel.basicReject(envelope.getDeliveryTag(), false);
log.info("The rejected message with message id: " + properties.getMessageId() + " and " +
"delivery tag: " + envelope.getDeliveryTag() + " on the queue: " + queueName + " is " +
"dead-lettered " + count + " time(s).");
} else {
// handle the message after exceeding the max dead-lettered count
proceedAfterMaxDeadLetteredCount(envelope, properties, body);
}
} else {
// the message might be dead-lettered or discard if an error occurred in the mediation flow
channel.basicReject(envelope.getDeliveryTag(), false);
log.info("The rejected message with message id: " + properties.getMessageId() + " and " +
"delivery tag: " + envelope.getDeliveryTag() + " on the queue: " + queueName + " will " +
"discard or dead-lettered.");
}
}
}
/**
* The message will publish to the exchange with routing key or discard
*
* @param envelope packaging data for the message
* @param properties content header data for the message
* @param body the message body
* @throws IOException
*/
private void proceedAfterMaxDeadLetteredCount(Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String routingKey =
rabbitMQProperties.get(RabbitMQConstants.MESSAGE_ERROR_QUEUE_ROUTING_KEY);
String exchangeName =
rabbitMQProperties.get(RabbitMQConstants.MESSAGE_ERROR_EXCHANGE_NAME);
if (StringUtils.isNotEmpty(routingKey) && StringUtils.isNotEmpty(exchangeName)) {
// publish message to the given exchange with the routing key
channel.basicPublish(exchangeName, routingKey, properties, body);
channel.basicAck(envelope.getDeliveryTag(), false);
log.info("The max dead lettered count exceeded. Hence message with message id: " +
properties.getMessageId() + " and delivery tag: " + envelope.getDeliveryTag() +
" publish to the exchange: " + exchangeName + " with the routing key: " + routingKey + ".");
} else if (StringUtils.isNotEmpty(routingKey) && StringUtils.isEmpty(exchangeName)) {
// publish message to the default exchange with the routing key
channel.basicPublish("", routingKey, properties, body);
channel.basicAck(envelope.getDeliveryTag(), false);
log.info("The max dead lettered count exceeded. Hence message with message id: " +
properties.getMessageId() + " and delivery tag: " + envelope.getDeliveryTag() + " publish to the " +
"default exchange with the routing key: " + routingKey + ".");
} else {
// discard the message
channel.basicAck(envelope.getDeliveryTag(), false);
log.info("The max dead lettered count exceeded. " +
"No 'rabbitmq.message.error.queue.routing.key' specified for publishing the message. " +
"Hence the message with message id: " + properties.getMessageId() + " and delivery tag: " +
envelope.getDeliveryTag() + " on the queue: " + queueName + " will discard.");
}
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
@Test
public void testMessagesRejectedOnStop() throws TimeoutException, IOException {
final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1"));
final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) {
sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
sender.publish("world".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
sender.publish("good-bye".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
LocalConsumeAMQP proc = new LocalConsumeAMQP(connection);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(ConsumeAMQP.HOST, "injvm");
runner.setProperty(ConsumeAMQP.QUEUE, "queue1");
runner.setProperty(ConsumeAMQP.BATCH_SIZE, "1");
runner.run();
proc.close();
runner.assertTransferCount(PublishAMQP.REL_SUCCESS, 1);
final MockFlowFile helloFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
helloFF.assertContentEquals("hello");
// A single cumulative ack should be used
assertTrue(((TestChannel) connection.createChannel()).isAck(0));
// Messages 1 and 2 will have been delivered but on stop should be rejected. They will be rejected
// cumulatively, though, so only delivery Tag 2 will be nack'ed explicitly
assertTrue(((TestChannel) connection.createChannel()).isNack(2));
// Any newly delivered messages should also be immediately nack'ed.
proc.getAMQPWorker().getConsumer().handleDelivery("123", new Envelope(3, false, "myExchange", "key1"), new BasicProperties(), new byte[0]);
assertTrue(((TestChannel) connection.createChannel()).isNack(3));
}
}