类com.rabbitmq.client.Envelope源码实例Demo

下面列出了怎么用com.rabbitmq.client.Envelope的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: code   文件: MyConsumer.java
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.err.println("-----------consume message----------");
    System.err.println("body: " + new String(body));
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    if ((Integer) properties.getHeaders().get("num") == 0) {
        // noack 重回队列
        channel.basicNack(envelope.getDeliveryTag(), false, true);
    } else {
        channel.basicAck(envelope.getDeliveryTag(), false);
    }

}
 
源代码2 项目: rabbitmq-mock   文件: ComplexUseCasesTests.java
@Test
void expired_message_should_be_consumable_after_being_dead_lettered() throws IOException, TimeoutException, InterruptedException {
    try (Connection conn = new MockConnectionFactory().newConnection()) {
        try (Channel channel = conn.createChannel()) {
            channel.exchangeDeclare("rejected-ex", BuiltinExchangeType.FANOUT);
            channel.queueDeclare("rejected", true, false, false, null);
            channel.queueBindNoWait("rejected", "rejected-ex", "unused", null);
            queue("fruits").withMessageTtl(10L).withDeadLetterExchange("rejected-ex").declare(channel);

            List<String> messages = new ArrayList<>();
            channel.basicConsume("rejected", new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) {
                    messages.add(new String(body));
                }
            });

            channel.basicPublish("", "fruits", null, "banana".getBytes());
            TimeUnit.MILLISECONDS.sleep(100L);
            assertThat(messages).hasSize(1);
        }
    }
}
 
源代码3 项目: rabbitmq-mock   文件: ComplexUseCasesTests.java
@Test
void multiple_expired_messages_are_not_delivered_to_consumer() throws IOException, TimeoutException, InterruptedException {
    try (Connection conn = new MockConnectionFactory().newConnection()) {
        try (Channel channel = conn.createChannel()) {
            queue("fruits").withMessageTtl(-1L).declare(channel);

            List<String> messages = new ArrayList<>();
            channel.basicConsume("fruits", new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) {
                    messages.add(new String(body));
                }
            });

            channel.basicPublish("", "fruits", null, "banana".getBytes());
            channel.basicPublish("", "fruits", null, "orange".getBytes());
            TimeUnit.MILLISECONDS.sleep(100L);
            assertThat(messages).hasSize(0);
        }
    }
}
 
源代码4 项目: java-study   文件: C.java
public static void main(String[] argv) throws Exception {
		// 创建连接工厂
		ConnectionFactory factory = new ConnectionFactory();
//		设置RabbitMQ地址
		factory.setHost("127.0.0.1");
//		创建一个新的连接
		Connection connection = factory.newConnection();
//		创建一个频道
		Channel channel = connection.createChannel();
//		声明要关注的队列 -- 在RabbitMQ中,队列声明是幂等性的(一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同),也就是说,如果不存在,就创建,如果存在,不会对已经存在的队列产生任何影响。
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		System.out.println("C [*] Waiting for messages. To exit press CTRL+C");
//		DefaultConsumer类实现了Consumer接口,通过传入一个频道,告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
		Consumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
				String message = new String(body, "UTF-8");
				System.out.println("C [x] Received '" + message + "'");
			}
		};
//		自动回复队列应答 -- RabbitMQ中的消息确认机制
		channel.basicConsume(QUEUE_NAME, true, consumer);
	}
 
源代码5 项目: util4j   文件: ProducerExchangeConsumer_Direct.java
public void consumer1() throws Exception {
	//1、获取连接
       Connection connection =RabbitMqConnectionFactoy.getConnection();
       //2、声明通道
       Channel channel = connection.createChannel();
       //3、声明队列
       channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
       //绑定队列到交换机
       channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME,ROUTER_KEY_1);
       //同一时刻服务器只会发送一条消息给消费者(如果设置为N,则当客户端堆积N条消息后服务端不会推送给客户端了)
       //channel.basicQos(1);//每次处理1个
       //4、定义队列的消费者
       //定义消费者
       DefaultConsumer consumer = new DefaultConsumer(channel) {
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body)
                   throws IOException {
               //获取并转成String
               String message = new String(body, "UTF-8");
               System.out.println("-->消费者1号,收到消息,msg :"+message+",header:"+properties.getHeaders().toString());
               channel.basicAck(envelope.getDeliveryTag(), false);
           }
       };
       channel.basicConsume(QUEUE_NAME1, autoAck,consumer);
}
 
源代码6 项目: util4j   文件: RpcTest.java
public void init() throws Exception {
 //1、获取连接
    connection = RabbitMqConnectionFactoy.getConnection();
 //2、声明信道
 channel = connection.createChannel();
 //定义一个临时变量的接受队列名    
 replyQueueName = channel.queueDeclare().getQueue();
 DefaultConsumer consumer= new DefaultConsumer(channel) {
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties,
                   byte[] body) throws IOException {
               //检查它的correlationId是否是我们所要找的那个
           	CompletableFuture<String> future=call.get(properties.getCorrelationId());
           	if(future!=null)
           	{
           		future.complete(new String(body,"UTF-8"));
           	}
           }
       };
 channel.basicConsume(replyQueueName, autoAck,consumer);
}
 
源代码7 项目: util4j   文件: ProducerExchangeConsumer_Topic.java
public void consumer1A() throws Exception {
	//1、获取连接
       Connection connection =RabbitMqConnectionFactoy.getConnection();
       //2、声明通道
       Channel channel = connection.createChannel();
       //3、声明队列
       channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
       //绑定队列到交换机
       channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME,"test.a");//只收到基数
       //同一时刻服务器只会发送一条消息给消费者(如果设置为N,则当客户端堆积N条消息后服务端不会推送给客户端了)
       //channel.basicQos(1);//每次处理1个
       //4、定义队列的消费者
       //定义消费者
       DefaultConsumer consumer = new DefaultConsumer(channel) {
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body)
                   throws IOException {
               //获取并转成String
               String message = new String(body, "UTF-8");
               System.out.println("-->消费者1A号,收到消息,msg :"+message+",header:"+properties.getHeaders().toString());
               channel.basicAck(envelope.getDeliveryTag(), false);
           }
       };
       channel.basicConsume(QUEUE_NAME1, autoAck,consumer);
}
 
源代码8 项目: Insights   文件: MessageSubscriberFactory.java
public void registerSubscriber(String routingKey, final EngineSubscriberResponseHandler responseHandler) throws Exception {
	Channel channel = connection.createChannel();
	String queueName = routingKey.replace(".", "_");
	channel.queueDeclare(queueName, true, false, false, null);
	channel.queueBind(queueName, MessageConstants.EXCHANGE_NAME, routingKey);
	channel.basicQos(ApplicationConfigProvider.getInstance().getMessageQueue().getPrefetchCount());
	responseHandler.setChannel(channel);
	log.debug("prefetchCount "+ApplicationConfigProvider.getInstance().getMessageQueue().getPrefetchCount() );
	Consumer consumer = new DefaultConsumer(channel) {
		@Override
		public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
				byte[] body) throws IOException {
			responseHandler.handleDelivery(consumerTag, envelope, properties, body);
		}
	};
	channel.basicConsume(queueName, false, routingKey, consumer);
}
 
源代码9 项目: conductor   文件: AMQPObservableQueueTest.java
List<GetResponse> buildQueue(final Random random, final int bound) {
	final LinkedList<GetResponse> queue = new LinkedList();
	for (int i = 0; i < bound; i++) {
		AMQP.BasicProperties props = Mockito.mock(AMQP.BasicProperties.class);
		Mockito.when(props.getMessageId()).thenReturn(UUID.randomUUID().toString());
		Envelope envelope = Mockito.mock(Envelope.class);
		Mockito.when(envelope.getDeliveryTag()).thenReturn(random.nextLong());
		GetResponse response = Mockito.mock(GetResponse.class);
		Mockito.when(response.getProps()).thenReturn(props);
		Mockito.when(response.getEnvelope()).thenReturn(envelope);
		Mockito.when(response.getBody()).thenReturn("{}".getBytes());
		Mockito.when(response.getMessageCount()).thenReturn(bound - i);
		queue.add(response);
	}
	return queue;
}
 
源代码10 项目: EDDI   文件: DifferPublisherTest.java
@Test
public void negativeDeliveryAck() throws IOException, TimeoutException, InterruptedException {
    //setup
    long deliveryTag = 1L;
    String exchange = "someExchange";
    String routingKey = "someRoutingKey";
    AMQP.BasicProperties properties = new AMQP.BasicProperties();
    byte[] body = new byte[0];
    Delivery delivery = new Delivery(new Envelope(deliveryTag, false, exchange, routingKey), properties, body);

    //test
    differPublisher.negativeDeliveryAck(delivery);

    //assert
    Mockito.verify(channel).basicNack(eq(deliveryTag), eq(false), eq(false));
    Mockito.verify(channel).basicPublish(
            eq(EDDI_EXCHANGE), eq(MESSAGE_CREATED_EDDI_FAILED_ROUTING_KEY), eq(null), eq(body));
    Mockito.verify(channel).waitForConfirmsOrDie(eq(TIMEOUT_CONFIRMS_IN_MILLIS));
}
 
源代码11 项目: roboconf-platform   文件: RoboconfConsumer.java
@Override
public void handleDelivery( String consumerTag, Envelope envelope, BasicProperties properties, byte[] body )
throws IOException {

	try {
		Message message = SerializationUtils.deserializeObject( body );
		this.logger.finer( this.sourceName + " received a message " + message.getClass().getSimpleName()
				+ " on routing key '" + envelope.getRoutingKey() + "'.");

		this.messageQueue.add( message );

	} catch( ClassNotFoundException | IOException e ) {
		this.logger.severe( this.sourceName + ": a message could not be deserialized. => " + e.getClass().getSimpleName());
		Utils.logException( this.logger, e );
		this.messageQueue.errorWhileReceivingMessage();
	}
}
 
源代码12 项目: flowing-retail-old   文件: RabbitMqConsumer.java
protected void connect() throws Exception {
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  Connection connection = factory.newConnection();
  channel = connection.createChannel();

  String queueName = "flowing-retail-" + name;
  channel.queueDeclare(queueName, true, false, false, null);
  channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true); // publish/subscribe model
  channel.queueBind(queueName, EXCHANGE_NAME, "*");

  System.out.println(" [*] Waiting for messages.");

  Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
      String message = new String(body, "UTF-8");
      System.out.println(" [x] Received '" + message + "'");
      eventHandler.handleEvent(message);
    }
  };
  channel.basicConsume(queueName, true, consumer);
}
 
源代码13 项目: zipkin-reporter-java   文件: ITRabbitMQSender.java
byte[] readMessage() throws Exception {
  final CountDownLatch countDown = new CountDownLatch(1);
  final AtomicReference<byte[]> result = new AtomicReference<>();

  // Don't close this as it invalidates the sender's connection!
  Channel channel = sender.localChannel();
  channel.basicConsume(sender.queue, true, new DefaultConsumer(channel) {
    @Override public void handleDelivery(String consumerTag, Envelope envelope,
        AMQP.BasicProperties properties, byte[] body) {
      result.set(body);
      countDown.countDown();
    }
  });
  assertThat(countDown.await(10, TimeUnit.SECONDS))
      .withFailMessage("Timed out waiting to read message")
      .isTrue();
  assertThat(result)
      .withFailMessage("handleDelivery set null body")
      .isNotNull();
  return result.get();
}
 
源代码14 项目: rabbitmq-cdi   文件: ConsumerHolder.java
void deliverWithAck(String consumerTag, Delivery message) throws IOException {
  Envelope envelope = message.getEnvelope();
  long deliveryTag = envelope.getDeliveryTag();
  LOGGER.debug("Consuming message {} for consumer tag {}", envelope, consumerTag);
  if (consumer.consume(consumerTag, envelope, message.getProperties(), message.getBody())) {
    invokeAckAction(ch -> {
      ch.basicAck(deliveryTag, false);
      LOGGER.debug("Acknowledged {}", message);
    });
  } else {
    invokeAckAction(ch -> {
      ch.basicNack(deliveryTag, false, false);
      LOGGER.debug("Not acknowledged {}", envelope);
    });
  }
}
 
源代码15 项目: rabbitmq-cdi   文件: TestClient.java
public static void main(String[] args) {
  CountDownLatch countDown = new CountDownLatch(1);
  ConnectionFactory factory = new ConnectionFactory();
  factory.setUsername("guest");
  factory.setPassword("guest");
  factory.setHost("127.0.0.1");
  try (Connection con = factory.newConnection(); Channel chn = con.createChannel()) {
    AtomicLong receivedMessages = new AtomicLong();
    String consumerTag =
        chn.basicConsume("product.catalog_item.sync", true, new DefaultConsumer(chn) {
          @Override
          public void handleDelivery(String tag, Envelope envelope, BasicProperties properties,
              byte[] body) throws IOException {
            long actualCount = receivedMessages.incrementAndGet();
            if (actualCount % 1000 == 0) {
              System.out.println("Received " + actualCount + " messages so far.");
            }
            // countDown.countDown();
          }
        });
    System.out.println(consumerTag);
    countDown.await();
  } catch (Exception e) {
    e.printStackTrace();
  }
}
 
源代码16 项目: rabbitmq-cdi   文件: EventConsumerTest.java
@SuppressWarnings("boxing")
@Test
public void testHandleDelivery() throws Exception {
  TestEvent event = new TestEvent();
  byte[] body = "the message".getBytes();
  Envelope envelope = new Envelope(123L, false, null, null);
  BasicProperties properties = new BasicProperties();

  when(decoder.willDecode(null)).thenReturn(true);
  when(decoder.decode(body)).thenReturn(event);
  when(eventSink.select(TestEvent.class)).thenReturn(testEventSink);

  assertTrue(consumer.consume("consumerTag", envelope, properties, body));

  verify(testEventSink).fire(event);
}
 
源代码17 项目: rabbitmq-cdi   文件: ConsumerHolderTest.java
@Test
void deliverWithAckSuccess() throws IOException {
  sut = new ConsumerHolder(eventConsumerMock, "queue", true, PREFETCH_COUNT,
      consumerChannelFactoryMock, declarationsListMock, declarerRepositoryMock);
  BasicProperties properties = MessageProperties.BASIC;
  byte[] body = "some body".getBytes();
  Envelope envelope = new Envelope(123L, false, "exchange", "routingKey");
  Delivery message = new Delivery(envelope, properties, body);

  when(consumerChannelFactoryMock.createChannel()).thenReturn(channelMock);
  when(eventConsumerMock.consume("consumerTag", envelope, properties, body)).thenReturn(true);

  sut.activate();
  sut.deliverWithAck("consumerTag", message);

  verify(channelMock).basicAck(123L, false);
}
 
源代码18 项目: rabbitmq-cdi   文件: ConsumerHolderTest.java
@Test
void deliverWithAckSendFailed() throws IOException {
  sut = new ConsumerHolder(eventConsumerMock, "queue", true, PREFETCH_COUNT,
      consumerChannelFactoryMock, declarationsListMock, declarerRepositoryMock);
  BasicProperties properties = MessageProperties.BASIC;
  byte[] body = "some body".getBytes();
  Envelope envelope = new Envelope(123L, false, "exchange", "routingKey");
  Delivery message = new Delivery(envelope, properties, body);

  when(consumerChannelFactoryMock.createChannel()).thenReturn(channelMock);
  when(eventConsumerMock.consume("consumerTag", envelope, properties, body)).thenReturn(false);

  sut.activate();
  sut.deliverWithAck("consumerTag", message);

  verify(channelMock).basicNack(123L, false, false);
}
 
源代码19 项目: rabbitmq-cdi   文件: ConsumerHolderTest.java
@Test
void deliverWithAckFailedAck() throws IOException {
  sut = new ConsumerHolder(eventConsumerMock, "queue", true, PREFETCH_COUNT,
      consumerChannelFactoryMock, declarationsListMock, declarerRepositoryMock);
  BasicProperties properties = MessageProperties.BASIC;
  byte[] body = "some body".getBytes();
  Envelope envelope = new Envelope(123L, false, "exchange", "routingKey");
  Delivery message = new Delivery(envelope, properties, body);
  IOException ioe = new IOException("some error");

  when(consumerChannelFactoryMock.createChannel()).thenReturn(channelMock);
  when(eventConsumerMock.consume("consumerTag", envelope, properties, body)).thenReturn(false);
  doThrow(ioe).when(channelMock).basicNack(123L, false, false);

  sut.activate();
  assertThrows(IOException.class, () -> sut.deliverWithAck("consumerTag", message));
}
 
@Override
public void handleDelivery(String consumer_Tag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
  long tag = envelope.getDeliveryTag();
  if (envelope.isRedeliver() && (recoveredTags.contains(tag) || pendingAck.contains(tag))) {
    if (recoveredTags.contains(tag)) {
      pendingAck.add(tag);
    }
    return;
  }

  // Acknowledgements are sent at the end of the window after adding to idempotency manager
  pendingAck.add(tag);
  holdingBuffer.add(new KeyValPair<Long, byte[]>(tag, body));
  logger.debug("Received Async message: {}  buffersize: {} ", new String(body), holdingBuffer.size());
}
 
源代码21 项目: skywalking   文件: RabbitMQConsumerInterceptor.java
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
    MethodInterceptResult result) throws Throwable {
    ContextCarrier contextCarrier = new ContextCarrier();
    String url = (String) objInst.getSkyWalkingDynamicField();
    Envelope envelope = (Envelope) allArguments[1];
    AMQP.BasicProperties properties = (AMQP.BasicProperties) allArguments[2];
    AbstractSpan activeSpan = ContextManager.createEntrySpan(OPERATE_NAME_PREFIX + "Topic/" + envelope.getExchange() + "Queue/" + envelope
        .getRoutingKey() + CONSUMER_OPERATE_NAME_SUFFIX, null).start(System.currentTimeMillis());
    Tags.MQ_BROKER.set(activeSpan, url);
    Tags.MQ_TOPIC.set(activeSpan, envelope.getExchange());
    Tags.MQ_QUEUE.set(activeSpan, envelope.getRoutingKey());
    activeSpan.setComponent(ComponentsDefine.RABBITMQ_CONSUMER);
    SpanLayer.asMQ(activeSpan);
    CarrierItem next = contextCarrier.items();
    while (next.hasNext()) {
        next = next.next();
        if (properties.getHeaders() != null && properties.getHeaders().get(next.getHeadKey()) != null) {
            next.setHeadValue(properties.getHeaders().get(next.getHeadKey()).toString());
        }
    }
    ContextManager.extract(contextCarrier);

}
 
@Test
public void TestRabbitMQConsumerInterceptor() throws Throwable {
    Envelope envelope = new Envelope(1111, false, "", "rabbitmq-test");
    Map<String, Object> headers = new HashMap<String, Object>();
    headers.put(SW8CarrierItem.HEADER_NAME, "1-My40LjU=-MS4yLjM=-3-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA=");
    AMQP.BasicProperties.Builder propsBuilder = new AMQP.BasicProperties.Builder();
    Object[] arguments = new Object[] {
        0,
        envelope,
        propsBuilder.headers(headers).build()
    };

    rabbitMQConsumerInterceptor.beforeMethod(enhancedInstance, null, arguments, null, null);
    rabbitMQConsumerInterceptor.afterMethod(enhancedInstance, null, arguments, null, null);
    List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
    Assert.assertThat(traceSegments.size(), is(1));
}
 
源代码23 项目: tracee   文件: TraceeMessagePropertiesConverter.java
/**
 * Incoming messages
 */
@Override
public MessageProperties toMessageProperties(AMQP.BasicProperties source, Envelope envelope, String charset) {
	final MessageProperties messageProperties = super.toMessageProperties(source, envelope, charset);

	final TraceeFilterConfiguration filterConfiguration = backend.getConfiguration(profile);
	if (filterConfiguration.shouldProcessContext(AsyncProcess)) {
		// Values are stored as type of LongStringHelper.ByteArrayLongString - but it's private
		final Map<String, String> traceeContextMap = transformToTraceeContextMap(
			(Map<String, ?>) messageProperties.getHeaders().get(TPIC_HEADER));
		if (traceeContextMap != null && !traceeContextMap.isEmpty()) {
			backend.putAll(filterConfiguration.filterDeniedParams(traceeContextMap, AsyncProcess));
		}
	}

	Utilities.generateInvocationIdIfNecessary(backend);
	return messageProperties;
}
 
@Test
public void testRabbitMQConsumerInterceptorWithEmptyHeaders() throws Throwable {
    Envelope envelope = new Envelope(1111, false, "", "rabbitmq-test");
    Map<String, Object> headers = new HashMap<String, Object>();
    AMQP.BasicProperties.Builder propsBuilder = new AMQP.BasicProperties.Builder();
    Object[] arguments = new Object[] {
        0,
        envelope,
        propsBuilder.headers(headers).build()
    };

    rabbitMQConsumerInterceptor.beforeMethod(enhancedInstance, null, arguments, null, null);
    rabbitMQConsumerInterceptor.afterMethod(enhancedInstance, null, arguments, null, null);
    List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
    Assert.assertThat(traceSegments.size(), is(1));
}
 
@Test
public void testConsumerSingleMessage() throws Exception {
  TransferQueue<RabbitMessage> messages = new LinkedTransferQueue<>();

  Channel channel = mock(Channel.class);

  final Consumer consumer = new StreamSetsMessageConsumer(channel, messages);
  final Envelope envelope = new Envelope(1L, false, EXCHANGE_NAME, QUEUE_NAME);

  executor.submit(new Runnable() {
    @Override
    public void run() {
      try {
        consumer.handleDelivery("consumerTag", envelope, null, TEST_MESSAGE_1.getBytes());
      } catch (IOException ignored) {
        // no op
      }
    }
  });

  RabbitMessage message = messages.take();
  assertEquals(TEST_MESSAGE_1, new String(message.getBody(), StandardCharsets.UTF_8));
}
 
源代码26 项目: micro-integrator   文件: RabbitMQConsumer.java
/**
 * Called when a basic.deliver is received for this consumer.
 *
 * @param consumerTag the consumer tag associated with the consumer
 * @param envelope    packaging data for the message
 * @param properties  content header data for the message
 * @param body        the message body (opaque, client-specific byte array)
 * @throws IOException if the consumer encounters an I/O error while processing the message
 * @see Envelope
 */
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
        throws IOException {
    boolean successful = injectHandler.onMessage(properties, body, inboundName);
    if (successful) {
        if (!autoAck) {
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    } else {
        List<HashMap<String, Object>> xDeathHeader =
                (ArrayList<HashMap<String, Object>>) properties.getHeaders().get("x-death");
        // check if message has been already dead-lettered
        if (xDeathHeader != null && xDeathHeader.size() > 0 && maxDeadLetteredCount != -1) {
            Long count = (Long) xDeathHeader.get(0).get("count");
            if (count <= maxDeadLetteredCount) {
                channel.basicReject(envelope.getDeliveryTag(), false);
                log.info("The rejected message with message id: " + properties.getMessageId() + " and " +
                        "delivery tag: " + envelope.getDeliveryTag() + " on the queue: " + queueName + " is " +
                        "dead-lettered " + count + " time(s).");
            } else {
                // handle the message after exceeding the max dead-lettered count
                proceedAfterMaxDeadLetteredCount(envelope, properties, body);
            }
        } else {
            // the message might be dead-lettered or discard if an error occurred in the mediation flow
            channel.basicReject(envelope.getDeliveryTag(), false);
            log.info("The rejected message with message id: " + properties.getMessageId() + " and " +
                    "delivery tag: " + envelope.getDeliveryTag() + " on the queue: " + queueName + " will " +
                    "discard or dead-lettered.");
        }
    }
}
 
源代码27 项目: micro-integrator   文件: RabbitMQConsumer.java
/**
 * The message will publish to the exchange with routing key or discard
 *
 * @param envelope   packaging data for the message
 * @param properties content header data for the message
 * @param body       the message body
 * @throws IOException
 */
private void proceedAfterMaxDeadLetteredCount(Envelope envelope, AMQP.BasicProperties properties, byte[] body)
        throws IOException {
    String routingKey =
            rabbitMQProperties.get(RabbitMQConstants.MESSAGE_ERROR_QUEUE_ROUTING_KEY);
    String exchangeName =
            rabbitMQProperties.get(RabbitMQConstants.MESSAGE_ERROR_EXCHANGE_NAME);
    if (StringUtils.isNotEmpty(routingKey) && StringUtils.isNotEmpty(exchangeName)) {
        // publish message to the given exchange with the routing key
        channel.basicPublish(exchangeName, routingKey, properties, body);
        channel.basicAck(envelope.getDeliveryTag(), false);
        log.info("The max dead lettered count exceeded. Hence message with message id: " +
                properties.getMessageId() + " and delivery tag: " + envelope.getDeliveryTag() +
                " publish to the exchange: " + exchangeName + " with the routing key: " + routingKey + ".");
    } else if (StringUtils.isNotEmpty(routingKey) && StringUtils.isEmpty(exchangeName)) {
        // publish message to the default exchange with the routing key
        channel.basicPublish("", routingKey, properties, body);
        channel.basicAck(envelope.getDeliveryTag(), false);
        log.info("The max dead lettered count exceeded. Hence message with message id: " +
                properties.getMessageId() + " and delivery tag: " + envelope.getDeliveryTag() + " publish to the " +
                "default exchange with the routing key: " + routingKey + ".");
    } else {
        // discard the message
        channel.basicAck(envelope.getDeliveryTag(), false);
        log.info("The max dead lettered count exceeded. " +
                "No 'rabbitmq.message.error.queue.routing.key' specified for publishing the message. " +
                "Hence the message with message id: " + properties.getMessageId() + " and delivery tag: " +
                envelope.getDeliveryTag() + " on the queue: " + queueName + " will discard.");
    }
}
 
源代码28 项目: code   文件: MyConsumer.java
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
	System.err.println("-----------consume message----------");
	System.err.println("consumerTag: " + consumerTag);
	System.err.println("envelope: " + envelope);
	System.err.println("properties: " + properties);
	System.err.println("body: " + new String(body));
}
 
源代码29 项目: code   文件: MyConsumer.java
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.err.println("-----------consume message----------");
    System.err.println("consumerTag: " + consumerTag);
    System.err.println("envelope: " + envelope);
    System.err.println("properties: " + properties);
    System.err.println("body: " + new String(body));
}
 
源代码30 项目: nifi   文件: ConsumeAMQPTest.java
@Test
public void testMessagesRejectedOnStop() throws TimeoutException, IOException {
    final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1"));
    final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");

    final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);

    try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) {
        sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
        sender.publish("world".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
        sender.publish("good-bye".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");

        LocalConsumeAMQP proc = new LocalConsumeAMQP(connection);
        TestRunner runner = TestRunners.newTestRunner(proc);
        runner.setProperty(ConsumeAMQP.HOST, "injvm");
        runner.setProperty(ConsumeAMQP.QUEUE, "queue1");
        runner.setProperty(ConsumeAMQP.BATCH_SIZE, "1");

        runner.run();
        proc.close();

        runner.assertTransferCount(PublishAMQP.REL_SUCCESS, 1);

        final MockFlowFile helloFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
        helloFF.assertContentEquals("hello");


        // A single cumulative ack should be used
        assertTrue(((TestChannel) connection.createChannel()).isAck(0));

        // Messages 1 and 2 will have been delivered but on stop should be rejected. They will be rejected
        // cumulatively, though, so only delivery Tag 2 will be nack'ed explicitly
        assertTrue(((TestChannel) connection.createChannel()).isNack(2));

        // Any newly delivered messages should also be immediately nack'ed.
        proc.getAMQPWorker().getConsumer().handleDelivery("123", new Envelope(3, false, "myExchange", "key1"), new BasicProperties(), new byte[0]);
        assertTrue(((TestChannel) connection.createChannel()).isNack(3));
    }
}