下面列出了org.mockito.internal.stubbing.answers.ReturnsElementsOf#com.rabbitmq.client.AMQP 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String message = new String(body, "UTF-8");
System.out.printf("in consumer B (delivery tag is %d): %s\n", envelope.getDeliveryTag(), message);
try {
TimeUnit.MILLISECONDS.sleep(2000);
} catch (InterruptedException e) {
}
// 第二个参数为 false 表示不批量签收
channel.basicAck(envelope.getDeliveryTag(), false);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("body: " + new String(body));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if ((Integer) properties.getHeaders().get("num") == 0) {
// noack 重回队列
channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
public boolean publish(String exchangeName, String routingKey, AMQP.BasicProperties props, byte[] body) {
boolean queueLengthLimitReached = queueLengthLimitReached() || queueLengthBytesLimitReached();
if (queueLengthLimitReached && arguments.overflow() == AmqArguments.Overflow.REJECT_PUBLISH) {
return true;
}
Message message = new Message(
messageSequence.incrementAndGet(),
exchangeName,
routingKey,
props,
body,
computeExpiryTime(props)
);
if (message.expiryTime != -1) {
LOGGER.debug(localized("Message published expiring at " + Instant.ofEpochMilli(message.expiryTime)) + ": " + message);
} else {
LOGGER.debug(localized("Message published" + ": " + message));
}
messages.offer(message);
if (queueLengthLimitReached) {
deadLetterWithReason(messages.poll(), DeadLettering.ReasonType.MAX_LEN);
}
return true;
}
@SuppressWarnings("unchecked")
public AMQP.BasicProperties prependOn(AMQP.BasicProperties props) {
Map<String, Object> headers = Optional.ofNullable(props.getHeaders()).map(HashMap::new).orElseGet(HashMap::new);
List<Map<String, Object>> xDeathHeader = (List<Map<String, Object>>) headers.computeIfAbsent(X_DEATH_HEADER, key -> new ArrayList<>());
Optional<Map<String, Object>> previousEvent = xDeathHeader.stream()
.filter(this::sameQueueAndReason)
.findFirst();
final Map<String, Object> currentEvent;
if (previousEvent.isPresent()) {
xDeathHeader.remove(previousEvent.get());
currentEvent = incrementCount(previousEvent.get());
} else {
currentEvent = asHeaderEntry();
}
xDeathHeader.add(0, currentEvent);
return props.builder().headers(Collections.unmodifiableMap(headers)).build();
}
@Test
void multiple_expired_messages_are_not_delivered_to_consumer() throws IOException, TimeoutException, InterruptedException {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
queue("fruits").withMessageTtl(-1L).declare(channel);
List<String> messages = new ArrayList<>();
channel.basicConsume("fruits", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) {
messages.add(new String(body));
}
});
channel.basicPublish("", "fruits", null, "banana".getBytes());
channel.basicPublish("", "fruits", null, "orange".getBytes());
TimeUnit.MILLISECONDS.sleep(100L);
assertThat(messages).hasSize(0);
}
}
}
@Test
public void validateUpdateFlowFileAttributesWithAmqpProperties() {
PublishAMQP processor = new PublishAMQP();
ProcessSession processSession = new MockProcessSession(new SharedSessionState(processor, new AtomicLong()),
processor);
FlowFile sourceFlowFile = processSession.create();
BasicProperties amqpProperties = new AMQP.BasicProperties.Builder()
.contentType("text/plain").deliveryMode(2)
.priority(1).userId("joe")
.build();
FlowFile f2 = AMQPUtils.updateFlowFileAttributesWithAmqpProperties(amqpProperties, sourceFlowFile,
processSession);
assertEquals("text/plain", f2.getAttributes().get(AMQPUtils.AMQP_PROP_PREFIX + "contentType"));
assertEquals("joe", f2.getAttributes().get(AMQPUtils.AMQP_PROP_PREFIX + "userId"));
assertEquals("2", f2.getAttributes().get(AMQPUtils.AMQP_PROP_PREFIX + "deliveryMode"));
}
@Override
public boolean publish(String previousExchangeName, String routingKey, AMQP.BasicProperties props, byte[] body) {
Set<Receiver> matchingReceivers = matchingReceivers(routingKey, props)
.map(receiverRegistry::getReceiver)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toSet());
if (matchingReceivers.isEmpty()) {
return getAlternateExchange().map(e -> {
LOGGER.debug(localized("message to alternate " + e));
return e.publish(name, routingKey, props, body);
}).orElse(false);
} else {
matchingReceivers
.forEach(e -> {
LOGGER.debug(localized("message to " + e));
e.publish(name, routingKey, props, body);
});
return true;
}
}
@Test
void nominal_use() {
try (MockConnection conn = new MockConnectionFactory().newConnection()) {
try (MockChannel channel = conn.createChannel()) {
String queueName = dynamicQueue().withMaxPriority(10).declare(channel).getQueue();
channel.basicPublish("", queueName, new AMQP.BasicProperties.Builder().priority(2).build(), "first".getBytes());
channel.basicPublish("", queueName, new AMQP.BasicProperties.Builder().priority(6).build(), "second".getBytes());
channel.basicPublish("", queueName, new AMQP.BasicProperties.Builder().priority(4).build(), "third".getBytes());
assertThat(new String(channel.basicGet("", true).getBody())).isEqualTo("second");
assertThat(new String(channel.basicGet("", true).getBody())).isEqualTo("third");
assertThat(new String(channel.basicGet("", true).getBody())).isEqualTo("first");
}
}
}
@Override
public void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, AMQP.BasicProperties props, byte[] body) {
if (props != null && DIRECT_REPLY_TO_QUEUE.equals(props.getReplyTo())) {
props = props.builder().replyTo(directReplyToQueue).build();
}
boolean delivered = getTransactionOrNode().basicPublish(exchange, routingKey, mandatory, immediate, nullToEmpty(props), body);
if (!delivered && mandatory) {
for (ReturnListener returnListener : returnListeners) {
try {
returnListener.handleReturn(312, "No route", exchange, routingKey, props, body);
} catch (IOException | RuntimeException e) {
LOGGER.warn("ConfirmListener threw an exception " + returnListener, e);
}
}
}
metricsCollectorWrapper.basicPublish(this);
if (confirmMode) {
safelyInvokeConfirmListeners();
nextPublishSeqNo++;
}
}
public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ地址
factory.setHost("127.0.0.1");
// 创建一个新的连接
Connection connection = factory.newConnection();
// 创建一个频道
Channel channel = connection.createChannel();
// 声明要关注的队列 -- 在RabbitMQ中,队列声明是幂等性的(一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同),也就是说,如果不存在,就创建,如果存在,不会对已经存在的队列产生任何影响。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("C [*] Waiting for messages. To exit press CTRL+C");
// DefaultConsumer类实现了Consumer接口,通过传入一个频道,告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("C [x] Received '" + message + "'");
}
};
// 自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(QUEUE_NAME, true, consumer);
}
@Test
void non_long_message_ttl_in_publishers_is_not_used() throws IOException, TimeoutException, InterruptedException {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
channel.queueDeclare("fruits", true, false, false, null);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("test")
.build();
channel.basicPublish("", "fruits", properties, "banana".getBytes());
TimeUnit.MILLISECONDS.sleep(100L);
GetResponse getResponse = channel.basicGet("fruits", true);
assertThat(getResponse).isNotNull();
}
}
}
/**
* The message will publish to the exchange with routing key or discard
*
* @param envelope packaging data for the message
* @param properties content header data for the message
* @param body the message body
* @throws IOException
*/
private void proceedAfterMaxDeadLetteredCount(Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String routingKey =
rabbitMQProperties.get(RabbitMQConstants.MESSAGE_ERROR_QUEUE_ROUTING_KEY);
String exchangeName =
rabbitMQProperties.get(RabbitMQConstants.MESSAGE_ERROR_EXCHANGE_NAME);
if (StringUtils.isNotEmpty(routingKey) && StringUtils.isNotEmpty(exchangeName)) {
// publish message to the given exchange with the routing key
channel.basicPublish(exchangeName, routingKey, properties, body);
channel.basicAck(envelope.getDeliveryTag(), false);
log.info("The max dead lettered count exceeded. Hence message with message id: " +
properties.getMessageId() + " and delivery tag: " + envelope.getDeliveryTag() +
" publish to the exchange: " + exchangeName + " with the routing key: " + routingKey + ".");
} else if (StringUtils.isNotEmpty(routingKey) && StringUtils.isEmpty(exchangeName)) {
// publish message to the default exchange with the routing key
channel.basicPublish("", routingKey, properties, body);
channel.basicAck(envelope.getDeliveryTag(), false);
log.info("The max dead lettered count exceeded. Hence message with message id: " +
properties.getMessageId() + " and delivery tag: " + envelope.getDeliveryTag() + " publish to the " +
"default exchange with the routing key: " + routingKey + ".");
} else {
// discard the message
channel.basicAck(envelope.getDeliveryTag(), false);
log.info("The max dead lettered count exceeded. " +
"No 'rabbitmq.message.error.queue.routing.key' specified for publishing the message. " +
"Hence the message with message id: " + properties.getMessageId() + " and delivery tag: " +
envelope.getDeliveryTag() + " on the queue: " + queueName + " will discard.");
}
}
/**
* Determine the message builder to use, set the message payload to the message context and
* inject the message.
*
* @param properties the AMQP basic properties
* @param body the message body
* @param inboundName Inbound Name
* @return delivery status of the message
*/
public boolean onMessage(AMQP.BasicProperties properties, byte[] body, String inboundName) {
org.apache.synapse.MessageContext msgCtx = createMessageContext();
try {
MessageContext axis2MsgCtx = ((org.apache.synapse.core.axis2.Axis2MessageContext) msgCtx)
.getAxis2MessageContext();
RabbitMQUtils.buildMessage(properties, body, axis2MsgCtx);
axis2MsgCtx.setProperty(MessageContext.TRANSPORT_HEADERS, RabbitMQUtils.getTransportHeaders(properties));
if (seq != null) {
if (log.isDebugEnabled()) {
log.debug("injecting message to sequence : " + injectingSeq);
}
seq.setErrorHandler(onErrorSeq);
msgCtx.setProperty(SynapseConstants.IS_INBOUND, true);
msgCtx.setProperty(SynapseConstants.INBOUND_ENDPOINT_NAME, inboundName);
msgCtx.setProperty(SynapseConstants.ARTIFACT_NAME,
SynapseConstants.FAIL_SAFE_MODE_INBOUND_ENDPOINT + inboundName);
synapseEnvironment.injectInbound(msgCtx, seq, sequential);
} else {
log.error("Sequence: " + injectingSeq + " not found");
}
Object rollbackProperty = msgCtx.getProperty(RabbitMQConstants.SET_ROLLBACK_ONLY);
if ((rollbackProperty instanceof Boolean && ((Boolean) rollbackProperty)) ||
(rollbackProperty instanceof String && Boolean.parseBoolean((String) rollbackProperty))) {
return false;
}
} catch (AxisFault axisFault) {
log.error("Error when trying to read incoming message ...", axisFault);
return false;
}
return true;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
//channel.basicAck(envelope.getDeliveryTag(), false);
}
public static void main(String[] args) throws Exception {
//1 创建一个ConnectionFactory, 并进行配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.ip);
connectionFactory.setPort(Constant.port);
connectionFactory.setVirtualHost("/");
//2 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3 通过connection创建一个Channel
Channel channel = connection.createChannel();
// 自定义属性
Map<String, Object> headers = new HashMap<>();
headers.put("my1", "111");
headers.put("my2", "222");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")
.headers(headers)
.build();
//4 通过Channel发送数据
for(int i=0; i < 5; i++){
String msg = "Hello RabbitMQ!";
//1 exchange 2 routingKey
channel.basicPublish("", "test001", properties, msg.getBytes());
}
//5 记得要关闭相关的连接
channel.close();
connection.close();
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
public static void handleDeliveryStart(Object thiz, Object props) {
if (WrapperProxy.isWrapper(thiz, TracingConsumer.class))
return;
if (AgentRuleUtil.callerEquals(1, 3, "io.opentracing.contrib.rabbitmq.TracingConsumer.handleDelivery"))
return;
final AMQP.BasicProperties properties = (AMQP.BasicProperties)props;
final Tracer tracer = GlobalTracer.get();
final Span span = TracingUtils.buildChildSpan(properties, null, tracer);
final Scope scope = tracer.activateSpan(span);
LocalSpanContext.set(COMPONENT_NAME, span, scope);
}
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.ip);
connectionFactory.setPort(Constant.port);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_ack_exchange";
String routingKey = "ack.save";
for(int i =0; i<5; i ++){
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("num", i);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers)
.build();
String msg = "Hello RabbitMQ ACK Message " + i;
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
}
}
@Test
void basic_consume_case() throws IOException, TimeoutException, InterruptedException {
String exchangeName = "test-exchange";
String routingKey = "test.key";
try (Connection conn = new MockConnectionFactory().newConnection()) {
assertThat(conn).isInstanceOf(MockConnection.class);
try (Channel channel = conn.createChannel()) {
assertThat(channel).isInstanceOf(MockChannel.class);
channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);
List<String> messages = new ArrayList<>();
channel.basicConsume(queueName, false, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();
messages.add(new String(body));
// (process the message components here ...)
channel.basicAck(deliveryTag, false);
}
});
byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
TimeUnit.MILLISECONDS.sleep(200L);
assertThat(messages).containsExactly("Hello, world!");
}
}
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String message = new String(body, "UTF-8");
System.out.printf("in consumer B (delivery tag is %d): %s\n", envelope.getDeliveryTag(), message);
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
}
//channel.basicAck(envelope.getDeliveryTag(), false);
}
public static AMQP.BasicProperties enterPublish(final Object exchange, final Object routingKey, final Object props) {
final AMQP.BasicProperties properties = (AMQP.BasicProperties)props;
final Tracer tracer = GlobalTracer.get();
final Span span = TracingUtils.buildSpan((String)exchange, (String)routingKey, properties, tracer);
final Scope scope = tracer.activateSpan(span);
LocalSpanContext.set(SpanDecorator.COMPONENT_NAME, span, scope);
return inject(properties, span, tracer);
}
private Message(int id, String exchangeName, String routingKey, AMQP.BasicProperties props, byte[] body, long expiryTime, boolean redelivered) {
this.id = id;
this.exchangeName = exchangeName;
this.routingKey = routingKey;
this.props = props;
this.body = body;
this.expiryTime = expiryTime;
this.redelivered = redelivered;
}
@Test
void can_consume_messages_published_in_a_previous_connection() throws InterruptedException {
MockConnectionFactory connectionFactory = new MockConnectionFactory();
try (MockConnection conn = connectionFactory.newConnection()) {
try (MockChannel channel = conn.createChannel()) {
queue("numbers").declare(channel);
Arrays.asList("one", "two", "three").stream().forEach(message ->
channel.basicPublish("", "numbers", null, message.getBytes())
);
}
}
try (MockConnection conn = connectionFactory.newConnection()) {
try (MockChannel channel = conn.createChannel()) {
List<String> messages = new ArrayList<>();
Semaphore deliveries = new Semaphore(-2);
channel.basicConsume("numbers", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) {
messages.add(new String(body));
deliveries.release();
}
});
assertThat(deliveries.tryAcquire(1, TimeUnit.SECONDS)).as("Messages have been delivered").isTrue();
assertThat(messages).containsExactly("one", "two", "three");
}
}
}
@Override
public AMQP.Confirm.SelectOk confirmSelect() {
if (transaction != null) {
throw new IllegalStateException("A transactional channel cannot be put into confirm mode");
}
confirmMode = true;
return new AMQImpl.Confirm.SelectOk();
}
@Override
public AMQP.Tx.CommitOk txCommit() {
if (transaction == null) {
throw new IllegalStateException("No started transaction (make sure you called txSelect before txCommit");
}
transaction.commit();
return new AMQImpl.Tx.CommitOk();
}
@RepeatedTest(31)
void basicConsume_concurrent_queue_access() throws IOException, TimeoutException, InterruptedException {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
String queueName = channel.queueDeclare().getQueue();
BlockingQueue<String> messages = new LinkedBlockingQueue<>();
channel.basicConsume("", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) {
messages.offer(new String(body));
}
});
int totalMessages = 101;
for (int i = 1; i <= totalMessages; i++) {
channel.basicPublish("", queueName, null, "test message".getBytes());
}
for (int i = 1; i <= totalMessages; i++) {
assertThat(messages.poll(200L, TimeUnit.MILLISECONDS)).isNotNull();
}
}
}
}
@Test
public void basicConsume(final MockTracer tracer) throws IOException, InterruptedException {
final String exchangeName = "basicConsumeExchange";
final String queueName = "basicConsumeQueue";
final String routingKey = "#";
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
final byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
final CountDownLatch latch = new CountDownLatch(1);
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(final String consumerTag, final Envelope envelope, final AMQP.BasicProperties properties, final byte[] body) throws IOException {
final long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, false);
latch.countDown();
}
});
latch.await(15, TimeUnit.SECONDS);
List<MockSpan> finishedSpans = tracer.finishedSpans();
for (int tries = 10; tries > 0 && finishedSpans.size() < 2; --tries) {
TimeUnit.SECONDS.sleep(1L);
finishedSpans = tracer.finishedSpans();
}
assertEquals(2, finishedSpans.size());
assertNull(tracer.activeSpan());
}
@Test
void no_priority_is_considered_zero() {
try (MockConnection conn = new MockConnectionFactory().newConnection()) {
try (MockChannel channel = conn.createChannel()) {
String queueName = dynamicQueue().withMaxPriority(10).declare(channel).getQueue();
channel.basicPublish("", queueName, null, "first".getBytes());
channel.basicPublish("", queueName, new AMQP.BasicProperties.Builder().priority(2).build(), "second".getBytes());
assertThat(new String(channel.basicGet("", true).getBody())).isEqualTo("second");
assertThat(new String(channel.basicGet("", true).getBody())).isEqualTo("first");
}
}
}
@Test
void redelivered_message_should_have_redelivery_marked_as_true() throws IOException, TimeoutException, InterruptedException {
try (Connection conn = new MockConnectionFactory().newConnection()) {
CountDownLatch messagesToBeProcessed = new CountDownLatch(2);
try (Channel channel = conn.createChannel()) {
queue("fruits").declare(channel);
AtomicReference<Envelope> redeliveredMessageEnvelope = new AtomicReference();
channel.basicConsume("fruits", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) {
if(messagesToBeProcessed.getCount() == 1){
redeliveredMessageEnvelope.set(envelope);
runAndEatExceptions(messagesToBeProcessed::countDown);
}else{
runAndEatExceptions(() -> channel.basicNack(envelope.getDeliveryTag(), false, true));
runAndEatExceptions(messagesToBeProcessed::countDown);
}
}
});
channel.basicPublish("", "fruits", null, "banana".getBytes());
final boolean finishedProperly = messagesToBeProcessed.await(1000, TimeUnit.SECONDS);
assertThat(finishedProperly).isTrue();
assertThat(redeliveredMessageEnvelope.get()).isNotNull();
assertThat(redeliveredMessageEnvelope.get().isRedeliver()).isTrue();
}
}
}
@Test
void message_ttl_in_publishers_reject_messages_after_expiration_is_reached() throws IOException, TimeoutException, InterruptedException {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
channel.queueDeclare("fruits", true, false, false, null);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("200")
.build();
channel.basicPublish("", "fruits", properties, "banana".getBytes());
TimeUnit.MILLISECONDS.sleep(400L);
GetResponse getResponse = channel.basicGet("fruits", true);
assertThat(getResponse).isNull();
}
}
}