org.mockito.internal.stubbing.answers.ReturnsElementsOf#com.rabbitmq.client.AMQP源码实例Demo

下面列出了org.mockito.internal.stubbing.answers.ReturnsElementsOf#com.rabbitmq.client.AMQP 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: SpringBoot-Course   文件: MyConsumer.java
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    super.handleDelivery(consumerTag, envelope, properties, body);

    String message = new String(body, "UTF-8");
    System.out.printf("in consumer B (delivery tag is %d): %s\n", envelope.getDeliveryTag(), message);

    try {
        TimeUnit.MILLISECONDS.sleep(2000);
    } catch (InterruptedException e) {

    }

    // 第二个参数为 false 表示不批量签收
    channel.basicAck(envelope.getDeliveryTag(), false);
}
 
源代码2 项目: code   文件: MyConsumer.java
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.err.println("-----------consume message----------");
    System.err.println("body: " + new String(body));
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    if ((Integer) properties.getHeaders().get("num") == 0) {
        // noack 重回队列
        channel.basicNack(envelope.getDeliveryTag(), false, true);
    } else {
        channel.basicAck(envelope.getDeliveryTag(), false);
    }

}
 
源代码3 项目: rabbitmq-mock   文件: MockQueue.java
public boolean publish(String exchangeName, String routingKey, AMQP.BasicProperties props, byte[] body) {
    boolean queueLengthLimitReached = queueLengthLimitReached() || queueLengthBytesLimitReached();
    if (queueLengthLimitReached && arguments.overflow() == AmqArguments.Overflow.REJECT_PUBLISH) {
        return true;
    }
    Message message = new Message(
        messageSequence.incrementAndGet(),
        exchangeName,
        routingKey,
        props,
        body,
        computeExpiryTime(props)
    );
    if (message.expiryTime != -1) {
        LOGGER.debug(localized("Message published expiring at " + Instant.ofEpochMilli(message.expiryTime)) + ": " + message);
    } else {
        LOGGER.debug(localized("Message published" + ": " + message));
    }
    messages.offer(message);
    if (queueLengthLimitReached) {
        deadLetterWithReason(messages.poll(), DeadLettering.ReasonType.MAX_LEN);
    }
    return true;
}
 
源代码4 项目: rabbitmq-mock   文件: DeadLettering.java
@SuppressWarnings("unchecked")
public AMQP.BasicProperties prependOn(AMQP.BasicProperties props) {
    Map<String, Object> headers = Optional.ofNullable(props.getHeaders()).map(HashMap::new).orElseGet(HashMap::new);

    List<Map<String, Object>> xDeathHeader = (List<Map<String, Object>>) headers.computeIfAbsent(X_DEATH_HEADER, key -> new ArrayList<>());

    Optional<Map<String, Object>> previousEvent = xDeathHeader.stream()
        .filter(this::sameQueueAndReason)
        .findFirst();

    final Map<String, Object> currentEvent;
    if (previousEvent.isPresent()) {
        xDeathHeader.remove(previousEvent.get());
        currentEvent = incrementCount(previousEvent.get());
    } else {
        currentEvent = asHeaderEntry();
    }
    xDeathHeader.add(0, currentEvent);

    return props.builder().headers(Collections.unmodifiableMap(headers)).build();
}
 
源代码5 项目: rabbitmq-mock   文件: ComplexUseCasesTests.java
@Test
void multiple_expired_messages_are_not_delivered_to_consumer() throws IOException, TimeoutException, InterruptedException {
    try (Connection conn = new MockConnectionFactory().newConnection()) {
        try (Channel channel = conn.createChannel()) {
            queue("fruits").withMessageTtl(-1L).declare(channel);

            List<String> messages = new ArrayList<>();
            channel.basicConsume("fruits", new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) {
                    messages.add(new String(body));
                }
            });

            channel.basicPublish("", "fruits", null, "banana".getBytes());
            channel.basicPublish("", "fruits", null, "orange".getBytes());
            TimeUnit.MILLISECONDS.sleep(100L);
            assertThat(messages).hasSize(0);
        }
    }
}
 
源代码6 项目: localization_nifi   文件: AMQPUtilsTest.java
@Test
public void validateUpdateFlowFileAttributesWithAmqpProperties() {
    PublishAMQP processor = new PublishAMQP();
    ProcessSession processSession = new MockProcessSession(new SharedSessionState(processor, new AtomicLong()),
            processor);
    FlowFile sourceFlowFile = processSession.create();
    BasicProperties amqpProperties = new AMQP.BasicProperties.Builder()
            .contentType("text/plain").deliveryMode(2)
            .priority(1).userId("joe")
            .build();
    FlowFile f2 = AMQPUtils.updateFlowFileAttributesWithAmqpProperties(amqpProperties, sourceFlowFile,
            processSession);

    assertEquals("text/plain", f2.getAttributes().get(AMQPUtils.AMQP_PROP_PREFIX + "contentType"));
    assertEquals("joe", f2.getAttributes().get(AMQPUtils.AMQP_PROP_PREFIX + "userId"));
    assertEquals("2", f2.getAttributes().get(AMQPUtils.AMQP_PROP_PREFIX + "deliveryMode"));
}
 
源代码7 项目: rabbitmq-mock   文件: BindableMockExchange.java
@Override
public boolean publish(String previousExchangeName, String routingKey, AMQP.BasicProperties props, byte[] body) {
    Set<Receiver> matchingReceivers = matchingReceivers(routingKey, props)
        .map(receiverRegistry::getReceiver)
        .filter(Optional::isPresent)
        .map(Optional::get)
        .collect(Collectors.toSet());

    if (matchingReceivers.isEmpty()) {
        return getAlternateExchange().map(e -> {
            LOGGER.debug(localized("message to alternate " + e));
            return e.publish(name, routingKey, props, body);
        }).orElse(false);
    } else {
        matchingReceivers
            .forEach(e -> {
                LOGGER.debug(localized("message to " + e));
                e.publish(name, routingKey, props, body);
            });
        return true;
    }
}
 
源代码8 项目: rabbitmq-mock   文件: ExtensionTest.java
@Test
void nominal_use() {
    try (MockConnection conn = new MockConnectionFactory().newConnection()) {
        try (MockChannel channel = conn.createChannel()) {
            String queueName = dynamicQueue().withMaxPriority(10).declare(channel).getQueue();

            channel.basicPublish("", queueName, new AMQP.BasicProperties.Builder().priority(2).build(), "first".getBytes());
            channel.basicPublish("", queueName, new AMQP.BasicProperties.Builder().priority(6).build(), "second".getBytes());
            channel.basicPublish("", queueName, new AMQP.BasicProperties.Builder().priority(4).build(), "third".getBytes());


            assertThat(new String(channel.basicGet("", true).getBody())).isEqualTo("second");
            assertThat(new String(channel.basicGet("", true).getBody())).isEqualTo("third");
            assertThat(new String(channel.basicGet("", true).getBody())).isEqualTo("first");
        }
    }
}
 
源代码9 项目: rabbitmq-mock   文件: MockChannel.java
@Override
public void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, AMQP.BasicProperties props, byte[] body) {
    if (props != null && DIRECT_REPLY_TO_QUEUE.equals(props.getReplyTo())) {
        props = props.builder().replyTo(directReplyToQueue).build();
    }
    boolean delivered = getTransactionOrNode().basicPublish(exchange, routingKey, mandatory, immediate, nullToEmpty(props), body);
    if (!delivered && mandatory) {
        for (ReturnListener returnListener : returnListeners) {
            try {
                returnListener.handleReturn(312, "No route", exchange, routingKey, props, body);
            } catch (IOException | RuntimeException e) {
                LOGGER.warn("ConfirmListener threw an exception " + returnListener, e);
            }
        }
    }
    metricsCollectorWrapper.basicPublish(this);
    if (confirmMode) {
        safelyInvokeConfirmListeners();
        nextPublishSeqNo++;
    }
}
 
源代码10 项目: java-study   文件: C.java
public static void main(String[] argv) throws Exception {
		// 创建连接工厂
		ConnectionFactory factory = new ConnectionFactory();
//		设置RabbitMQ地址
		factory.setHost("127.0.0.1");
//		创建一个新的连接
		Connection connection = factory.newConnection();
//		创建一个频道
		Channel channel = connection.createChannel();
//		声明要关注的队列 -- 在RabbitMQ中,队列声明是幂等性的(一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同),也就是说,如果不存在,就创建,如果存在,不会对已经存在的队列产生任何影响。
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		System.out.println("C [*] Waiting for messages. To exit press CTRL+C");
//		DefaultConsumer类实现了Consumer接口,通过传入一个频道,告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
		Consumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
				String message = new String(body, "UTF-8");
				System.out.println("C [x] Received '" + message + "'");
			}
		};
//		自动回复队列应答 -- RabbitMQ中的消息确认机制
		channel.basicConsume(QUEUE_NAME, true, consumer);
	}
 
源代码11 项目: rabbitmq-mock   文件: ExtensionTest.java
@Test
void non_long_message_ttl_in_publishers_is_not_used() throws IOException, TimeoutException, InterruptedException {
    try (Connection conn = new MockConnectionFactory().newConnection()) {
        try (Channel channel = conn.createChannel()) {
            channel.queueDeclare("fruits", true, false, false, null);
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .expiration("test")
                .build();
            channel.basicPublish("", "fruits", properties, "banana".getBytes());
            TimeUnit.MILLISECONDS.sleep(100L);
            GetResponse getResponse = channel.basicGet("fruits", true);
            assertThat(getResponse).isNotNull();
        }
    }
}
 
源代码12 项目: micro-integrator   文件: RabbitMQConsumer.java
/**
 * The message will publish to the exchange with routing key or discard
 *
 * @param envelope   packaging data for the message
 * @param properties content header data for the message
 * @param body       the message body
 * @throws IOException
 */
private void proceedAfterMaxDeadLetteredCount(Envelope envelope, AMQP.BasicProperties properties, byte[] body)
        throws IOException {
    String routingKey =
            rabbitMQProperties.get(RabbitMQConstants.MESSAGE_ERROR_QUEUE_ROUTING_KEY);
    String exchangeName =
            rabbitMQProperties.get(RabbitMQConstants.MESSAGE_ERROR_EXCHANGE_NAME);
    if (StringUtils.isNotEmpty(routingKey) && StringUtils.isNotEmpty(exchangeName)) {
        // publish message to the given exchange with the routing key
        channel.basicPublish(exchangeName, routingKey, properties, body);
        channel.basicAck(envelope.getDeliveryTag(), false);
        log.info("The max dead lettered count exceeded. Hence message with message id: " +
                properties.getMessageId() + " and delivery tag: " + envelope.getDeliveryTag() +
                " publish to the exchange: " + exchangeName + " with the routing key: " + routingKey + ".");
    } else if (StringUtils.isNotEmpty(routingKey) && StringUtils.isEmpty(exchangeName)) {
        // publish message to the default exchange with the routing key
        channel.basicPublish("", routingKey, properties, body);
        channel.basicAck(envelope.getDeliveryTag(), false);
        log.info("The max dead lettered count exceeded. Hence message with message id: " +
                properties.getMessageId() + " and delivery tag: " + envelope.getDeliveryTag() + " publish to the " +
                "default exchange with the routing key: " + routingKey + ".");
    } else {
        // discard the message
        channel.basicAck(envelope.getDeliveryTag(), false);
        log.info("The max dead lettered count exceeded. " +
                "No 'rabbitmq.message.error.queue.routing.key' specified for publishing the message. " +
                "Hence the message with message id: " + properties.getMessageId() + " and delivery tag: " +
                envelope.getDeliveryTag() + " on the queue: " + queueName + " will discard.");
    }
}
 
源代码13 项目: micro-integrator   文件: RabbitMQInjectHandler.java
/**
 * Determine the message builder to use, set the message payload to the message context and
 * inject the message.
 *
 * @param properties  the AMQP basic properties
 * @param body        the message body
 * @param inboundName Inbound Name
 * @return delivery status of the message
 */
public boolean onMessage(AMQP.BasicProperties properties, byte[] body, String inboundName) {
    org.apache.synapse.MessageContext msgCtx = createMessageContext();
    try {
        MessageContext axis2MsgCtx = ((org.apache.synapse.core.axis2.Axis2MessageContext) msgCtx)
                .getAxis2MessageContext();
        RabbitMQUtils.buildMessage(properties, body, axis2MsgCtx);
        axis2MsgCtx.setProperty(MessageContext.TRANSPORT_HEADERS, RabbitMQUtils.getTransportHeaders(properties));

        if (seq != null) {
            if (log.isDebugEnabled()) {
                log.debug("injecting message to sequence : " + injectingSeq);
            }
            seq.setErrorHandler(onErrorSeq);
            msgCtx.setProperty(SynapseConstants.IS_INBOUND, true);
            msgCtx.setProperty(SynapseConstants.INBOUND_ENDPOINT_NAME, inboundName);
            msgCtx.setProperty(SynapseConstants.ARTIFACT_NAME,
                               SynapseConstants.FAIL_SAFE_MODE_INBOUND_ENDPOINT + inboundName);
            synapseEnvironment.injectInbound(msgCtx, seq, sequential);
        } else {
            log.error("Sequence: " + injectingSeq + " not found");
        }

        Object rollbackProperty = msgCtx.getProperty(RabbitMQConstants.SET_ROLLBACK_ONLY);
        if ((rollbackProperty instanceof Boolean && ((Boolean) rollbackProperty)) ||
            (rollbackProperty instanceof String && Boolean.parseBoolean((String) rollbackProperty))) {
            return false;
        }

    } catch (AxisFault axisFault) {
        log.error("Error when trying to read incoming message ...", axisFault);
        return false;
    }
    return true;
}
 
源代码14 项目: code   文件: MyConsumer.java
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.err.println("-----------consume message----------");
    System.err.println("consumerTag: " + consumerTag);
    System.err.println("envelope: " + envelope);
    System.err.println("properties: " + properties);
    System.err.println("body: " + new String(body));

    //channel.basicAck(envelope.getDeliveryTag(), false);

}
 
源代码15 项目: code   文件: Producer.java
public static void main(String[] args) throws Exception {
	//1 创建一个ConnectionFactory, 并进行配置
	ConnectionFactory connectionFactory = new ConnectionFactory();
	connectionFactory.setHost(Constant.ip);
	connectionFactory.setPort(Constant.port);
	connectionFactory.setVirtualHost("/");
	
	//2 通过连接工厂创建连接
	Connection connection = connectionFactory.newConnection();
	
	//3 通过connection创建一个Channel
	Channel channel = connection.createChannel();

	// 自定义属性
	Map<String, Object> headers = new HashMap<>();
	headers.put("my1", "111");
	headers.put("my2", "222");

	AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
			.deliveryMode(2)
			.contentEncoding("UTF-8")
			.expiration("10000")
			.headers(headers)
			.build();
	
	//4 通过Channel发送数据
	for(int i=0; i < 5; i++){
		String msg = "Hello RabbitMQ!";
		//1 exchange   2 routingKey
		channel.basicPublish("", "test001", properties, msg.getBytes());
	}

	//5 记得要关闭相关的连接
	channel.close();
	connection.close();
}
 
源代码16 项目: code   文件: MyConsumer.java
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
	System.err.println("-----------consume message----------");
	System.err.println("consumerTag: " + consumerTag);
	System.err.println("envelope: " + envelope);
	System.err.println("properties: " + properties);
	System.err.println("body: " + new String(body));
}
 
public static void handleDeliveryStart(Object thiz, Object props) {
  if (WrapperProxy.isWrapper(thiz, TracingConsumer.class))
    return;

  if (AgentRuleUtil.callerEquals(1, 3, "io.opentracing.contrib.rabbitmq.TracingConsumer.handleDelivery"))
    return;

  final AMQP.BasicProperties properties = (AMQP.BasicProperties)props;
  final Tracer tracer = GlobalTracer.get();
  final Span span = TracingUtils.buildChildSpan(properties, null, tracer);
  final Scope scope = tracer.activateSpan(span);
  LocalSpanContext.set(COMPONENT_NAME, span, scope);
}
 
源代码18 项目: code   文件: Producer.java
public static void main(String[] args) throws Exception {
	
	ConnectionFactory connectionFactory = new ConnectionFactory();
	connectionFactory.setHost(Constant.ip);
	connectionFactory.setPort(Constant.port);
	connectionFactory.setVirtualHost("/");
	
	Connection connection = connectionFactory.newConnection();
	Channel channel = connection.createChannel();
	
	String exchange = "test_ack_exchange";
	String routingKey = "ack.save";

	for(int i =0; i<5; i ++){
		
		Map<String, Object> headers = new HashMap<String, Object>();
		headers.put("num", i);
		
		AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
				.deliveryMode(2)
				.contentEncoding("UTF-8")
				.headers(headers)
				.build();
		String msg = "Hello RabbitMQ ACK Message " + i;
		channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
	}
	
}
 
源代码19 项目: rabbitmq-mock   文件: IntegrationTest.java
@Test
void basic_consume_case() throws IOException, TimeoutException, InterruptedException {
    String exchangeName = "test-exchange";
    String routingKey = "test.key";

    try (Connection conn = new MockConnectionFactory().newConnection()) {
        assertThat(conn).isInstanceOf(MockConnection.class);

        try (Channel channel = conn.createChannel()) {
            assertThat(channel).isInstanceOf(MockChannel.class);

            channel.exchangeDeclare(exchangeName, "direct", true);
            String queueName = channel.queueDeclare().getQueue();
            channel.queueBind(queueName, exchangeName, routingKey);

            List<String> messages = new ArrayList<>();
            channel.basicConsume(queueName, false, "myConsumerTag",
                new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag,
                                               Envelope envelope,
                                               AMQP.BasicProperties properties,
                                               byte[] body) throws IOException {
                        long deliveryTag = envelope.getDeliveryTag();
                        messages.add(new String(body));
                        // (process the message components here ...)
                        channel.basicAck(deliveryTag, false);
                    }
                });

            byte[] messageBodyBytes = "Hello, world!".getBytes();
            channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

            TimeUnit.MILLISECONDS.sleep(200L);

            assertThat(messages).containsExactly("Hello, world!");
        }
    }
}
 
源代码20 项目: SpringBoot-Course   文件: MyConsumer.java
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    super.handleDelivery(consumerTag, envelope, properties, body);

    String message = new String(body, "UTF-8");
    System.out.printf("in consumer B (delivery tag is %d): %s\n", envelope.getDeliveryTag(), message);

    try {
        TimeUnit.MILLISECONDS.sleep(200);
    } catch (InterruptedException e) {

    }

    //channel.basicAck(envelope.getDeliveryTag(), false);
}
 
源代码21 项目: java-specialagent   文件: RabbitMQAgentIntercept.java
public static AMQP.BasicProperties enterPublish(final Object exchange, final Object routingKey, final Object props) {
  final AMQP.BasicProperties properties = (AMQP.BasicProperties)props;
  final Tracer tracer = GlobalTracer.get();
  final Span span = TracingUtils.buildSpan((String)exchange, (String)routingKey, properties, tracer);

  final Scope scope = tracer.activateSpan(span);
  LocalSpanContext.set(SpanDecorator.COMPONENT_NAME, span, scope);

  return inject(properties, span, tracer);
}
 
源代码22 项目: rabbitmq-mock   文件: Message.java
private Message(int id, String exchangeName, String routingKey, AMQP.BasicProperties props, byte[] body, long expiryTime, boolean redelivered) {
    this.id = id;
    this.exchangeName = exchangeName;
    this.routingKey = routingKey;
    this.props = props;
    this.body = body;
    this.expiryTime = expiryTime;
    this.redelivered = redelivered;
}
 
源代码23 项目: rabbitmq-mock   文件: ComplexUseCasesTests.java
@Test
void can_consume_messages_published_in_a_previous_connection() throws InterruptedException {
    MockConnectionFactory connectionFactory = new MockConnectionFactory();
    try (MockConnection conn = connectionFactory.newConnection()) {
        try (MockChannel channel = conn.createChannel()) {
            queue("numbers").declare(channel);
            Arrays.asList("one", "two", "three").stream().forEach(message ->
                channel.basicPublish("", "numbers", null, message.getBytes())
            );
        }
    }

    try (MockConnection conn = connectionFactory.newConnection()) {
        try (MockChannel channel = conn.createChannel()) {

            List<String> messages = new ArrayList<>();
            Semaphore deliveries = new Semaphore(-2);

            channel.basicConsume("numbers", new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) {
                    messages.add(new String(body));
                    deliveries.release();
                }
            });

            assertThat(deliveries.tryAcquire(1, TimeUnit.SECONDS)).as("Messages have been delivered").isTrue();

            assertThat(messages).containsExactly("one", "two", "three");
        }
    }
}
 
源代码24 项目: rabbitmq-mock   文件: MockChannel.java
@Override
public AMQP.Confirm.SelectOk confirmSelect() {
    if (transaction != null) {
        throw new IllegalStateException("A transactional channel cannot be put into confirm mode");
    }
    confirmMode = true;
    return new AMQImpl.Confirm.SelectOk();
}
 
源代码25 项目: rabbitmq-mock   文件: MockChannel.java
@Override
public AMQP.Tx.CommitOk txCommit() {
    if (transaction == null) {
        throw new IllegalStateException("No started transaction (make sure you called txSelect before txCommit");
    }
    transaction.commit();
    return new AMQImpl.Tx.CommitOk();
}
 
源代码26 项目: rabbitmq-mock   文件: ChannelTest.java
@RepeatedTest(31)
void basicConsume_concurrent_queue_access() throws IOException, TimeoutException, InterruptedException {
    try (Connection conn = new MockConnectionFactory().newConnection()) {
        try (Channel channel = conn.createChannel()) {
            String queueName = channel.queueDeclare().getQueue();

            BlockingQueue<String> messages = new LinkedBlockingQueue<>();
            channel.basicConsume("", new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) {
                    messages.offer(new String(body));
                }
            });

            int totalMessages = 101;
            for (int i = 1; i <= totalMessages; i++) {
                channel.basicPublish("", queueName, null, "test message".getBytes());
            }
            for (int i = 1; i <= totalMessages; i++) {
                assertThat(messages.poll(200L, TimeUnit.MILLISECONDS)).isNotNull();
            }
        }
    }
}
 
源代码27 项目: java-specialagent   文件: RabbitMQTest.java
@Test
public void basicConsume(final MockTracer tracer) throws IOException, InterruptedException {
  final String exchangeName = "basicConsumeExchange";
  final String queueName = "basicConsumeQueue";
  final String routingKey = "#";

  channel.exchangeDeclare(exchangeName, "direct", true);
  channel.queueDeclare(queueName, true, false, false, null);
  channel.queueBind(queueName, exchangeName, routingKey);

  final byte[] messageBodyBytes = "Hello, world!".getBytes();
  channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

  final CountDownLatch latch = new CountDownLatch(1);
  channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(final String consumerTag, final Envelope envelope, final AMQP.BasicProperties properties, final byte[] body) throws IOException {
      final long deliveryTag = envelope.getDeliveryTag();
      channel.basicAck(deliveryTag, false);
      latch.countDown();
    }
  });

  latch.await(15, TimeUnit.SECONDS);
  List<MockSpan> finishedSpans = tracer.finishedSpans();
  for (int tries = 10; tries > 0 && finishedSpans.size() < 2; --tries) {
    TimeUnit.SECONDS.sleep(1L);
    finishedSpans = tracer.finishedSpans();
  }

  assertEquals(2, finishedSpans.size());
  assertNull(tracer.activeSpan());
}
 
源代码28 项目: rabbitmq-mock   文件: ExtensionTest.java
@Test
void no_priority_is_considered_zero() {
    try (MockConnection conn = new MockConnectionFactory().newConnection()) {
        try (MockChannel channel = conn.createChannel()) {
            String queueName = dynamicQueue().withMaxPriority(10).declare(channel).getQueue();

            channel.basicPublish("", queueName, null, "first".getBytes());
            channel.basicPublish("", queueName, new AMQP.BasicProperties.Builder().priority(2).build(), "second".getBytes());


            assertThat(new String(channel.basicGet("", true).getBody())).isEqualTo("second");
            assertThat(new String(channel.basicGet("", true).getBody())).isEqualTo("first");
        }
    }
}
 
源代码29 项目: rabbitmq-mock   文件: IntegrationTest.java
@Test
void redelivered_message_should_have_redelivery_marked_as_true() throws IOException, TimeoutException, InterruptedException {
    try (Connection conn = new MockConnectionFactory().newConnection()) {
        CountDownLatch messagesToBeProcessed = new CountDownLatch(2);
        try (Channel channel = conn.createChannel()) {
            queue("fruits").declare(channel);
            AtomicReference<Envelope> redeliveredMessageEnvelope = new AtomicReference();

            channel.basicConsume("fruits", new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) {
                    if(messagesToBeProcessed.getCount() == 1){
                        redeliveredMessageEnvelope.set(envelope);
                        runAndEatExceptions(messagesToBeProcessed::countDown);

                    }else{
                        runAndEatExceptions(() -> channel.basicNack(envelope.getDeliveryTag(), false, true));
                        runAndEatExceptions(messagesToBeProcessed::countDown);
                    }

                }
            });

            channel.basicPublish("", "fruits", null, "banana".getBytes());

            final boolean finishedProperly = messagesToBeProcessed.await(1000, TimeUnit.SECONDS);
            assertThat(finishedProperly).isTrue();
            assertThat(redeliveredMessageEnvelope.get()).isNotNull();
            assertThat(redeliveredMessageEnvelope.get().isRedeliver()).isTrue();
        }
    }
}
 
源代码30 项目: rabbitmq-mock   文件: ExtensionTest.java
@Test
void message_ttl_in_publishers_reject_messages_after_expiration_is_reached() throws IOException, TimeoutException, InterruptedException {
    try (Connection conn = new MockConnectionFactory().newConnection()) {
        try (Channel channel = conn.createChannel()) {
            channel.queueDeclare("fruits", true, false, false, null);
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .expiration("200")
                .build();
            channel.basicPublish("", "fruits", properties, "banana".getBytes());
            TimeUnit.MILLISECONDS.sleep(400L);
            GetResponse getResponse = channel.basicGet("fruits", true);
            assertThat(getResponse).isNull();
        }
    }
}