javax.jms.MessageEOFException#org.apache.qpid.proton.amqp.messaging.Section源码实例Demo

下面列出了javax.jms.MessageEOFException#org.apache.qpid.proton.amqp.messaging.Section 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Test
public void testBodyOnlyEncodeDecode() throws Exception {
   MessageImpl incomingMessage = (MessageImpl) Proton.message();

   incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));

   ICoreMessage core = encodeAndCreateAMQPMessage(incomingMessage).toCore();
   AMQPMessage outboudMessage = AMQPConverter.getInstance().fromCore(core, null);

   assertNull(outboudMessage.getHeader());

   Section body = outboudMessage.getBody();
   assertNotNull(body);
   assertTrue(body instanceof AmqpValue);
   assertTrue(((AmqpValue) body).getValue() instanceof String);
}
 
@Test
public void testHeaderButNoPropertiesEncodeDecode() throws Exception {
   MessageImpl incomingMessage = (MessageImpl) Proton.message();

   incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
   incomingMessage.setDurable(true);

   ICoreMessage core = encodeAndCreateAMQPMessage(incomingMessage).toCore();
   AMQPMessage outboudMessage = AMQPConverter.getInstance().fromCore(core, null);

   assertNotNull(outboudMessage.getHeader());

   Section body = outboudMessage.getBody();
   assertNotNull(body);
   assertTrue(body instanceof AmqpValue);
   assertTrue(((AmqpValue) body).getValue() instanceof String);
}
 
源代码3 项目: qpid-jms   文件: AmqpTypedObjectDelegate.java
@Override
public void setObject(Serializable value) throws IOException {
    if (value == null) {
        parent.setBody(NULL_OBJECT_BODY);
        encodedBody = null;
    } else if (isSupportedAmqpValueObjectType(value)) {
        // Exchange the incoming body value for one that is created from encoding
        // and decoding the value. Save the bytes for subsequent getObject and
        // copyInto calls to use.
        encodedBody = AmqpCodec.encode(new AmqpValue(value));
        Section decodedBody = AmqpCodec.decode(encodedBody);

        // This step requires a heavy-weight operation of both encoding and decoding the
        // incoming body value in order to create a copy such that changes to the original
        // do not affect the stored value, and also verifies we can actually encode it at all
        // now instead of later during send. In the future it makes sense to try to enhance
        // proton such that we can encode the body and use those bytes directly on the
        // message as it is being sent.

        parent.setBody(decodedBody);
    } else {
        // TODO: Data and AmqpSequence?
        throw new IllegalArgumentException("Encoding this object type with the AMQP type system is not supported: " + value.getClass().getName());
    }
}
 
源代码4 项目: qpid-jms   文件: AmqpJmsMapMessageFacade.java
@SuppressWarnings("unchecked")
@Override
void setBody(Section body) {
    if (body == null) {
        initializeEmptyBody();
    } else if (body instanceof AmqpValue) {
        Object o = ((AmqpValue) body).getValue();
        if (o == null) {
            initializeEmptyBody();
        } else if (o instanceof Map) {
            messageBodyMap = (Map<String, Object>) o;
            super.setBody(body);
        } else {
            throw new IllegalStateException("Unexpected message body type: " + body.getClass().getSimpleName());
        }
    } else {
        throw new IllegalStateException("Unexpected message body type: " + body.getClass().getSimpleName());
    }
}
 
源代码5 项目: qpid-jms   文件: AmqpJmsObjectMessageFacadeTest.java
/**
 * Test that setting an object on a new message results in the expected
 * content in the body section of the underlying message.
 *
 * @throws Exception if an error occurs during the test.
 */
@Test
public void testSetObjectOnNewMessage() throws Exception {
    String content = "myStringContent";

    AmqpJmsObjectMessageFacade amqpObjectMessageFacade = createNewObjectMessageFacade(false);
    amqpObjectMessageFacade.setObject(content);

    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    ObjectOutputStream oos = new ObjectOutputStream(baos);
    oos.writeObject(content);
    oos.flush();
    oos.close();
    byte[] bytes = baos.toByteArray();

    // retrieve the bytes from the underlying message, check they match expectation
    Section section = amqpObjectMessageFacade.getBody();
    assertNotNull(section);
    assertEquals(Data.class, section.getClass());
    assertArrayEquals("Underlying message data section did not contain the expected bytes", bytes, ((Data) section).getValue().getArray());
}
 
源代码6 项目: qpid-proton-j   文件: Message.java
public static Message create(Header header,
                             DeliveryAnnotations deliveryAnnotations,
                             MessageAnnotations messageAnnotations,
                             Properties properties,
                             ApplicationProperties applicationProperties,
                             Section body,
                             Footer footer) {
    return new MessageImpl(header, deliveryAnnotations,
                           messageAnnotations, properties,
                           applicationProperties, body, footer);
}
 
源代码7 项目: qpid-proton-j   文件: Proton.java
public static Message message(Header header,
                  DeliveryAnnotations deliveryAnnotations, MessageAnnotations messageAnnotations,
                  Properties properties, ApplicationProperties applicationProperties,
                  Section body, Footer footer)
{
    return Message.Factory.create(header, deliveryAnnotations,
                                  messageAnnotations, properties,
                                  applicationProperties, body, footer);
}
 
源代码8 项目: activemq-artemis   文件: AmqpLargeMessageTest.java
private void validateMessage(byte[] expectedPayload, int msgNum, AmqpMessage message) {
   assertNotNull("failed at " + msgNum, message);

   Section body = message.getWrappedMessage().getBody();
   assertNotNull("No message body for msg " + msgNum, body);

   assertTrue("Unexpected message body type for msg " + body.getClass(), body instanceof Data);
   assertEquals("Unexpected body content for msg", new Binary(expectedPayload, 0, expectedPayload.length), ((Data) body).getValue());
}
 
源代码9 项目: activemq-artemis   文件: AMQPMessage.java
/**
 * Retrieves the AMQP Section that composes the body of this message by decoding a
 * fresh copy from the encoded message data.  Changes to the returned value are not
 * reflected in the value encoded in the original message.
 *
 * @return the Section that makes up the body of this message.
 */
public final Section getBody() {
   ensureScanning();

   // We only handle Sections of AmqpSequence, AmqpValue and Data types so we filter on those.
   // There could also be a Footer and no body so this will prevent a faulty return type in case
   // of no body or message type we don't handle.
   return scanForMessageSection(Math.max(0, remainingBodyPosition), AmqpSequence.class, AmqpValue.class, Data.class);
}
 
源代码10 项目: vertx-proton   文件: VertxProtonExamples.java
public void example3(ProtonConnection connection) {
  connection.createReceiver("myQueue").handler((delivery, msg) -> {
    Section body = msg.getBody();
    if (body instanceof AmqpValue) {
      System.out.println("Received message with content: " + ((AmqpValue) body).getValue());
    }
    // By default, the receiver automatically accepts (and settles) the delivery
    // when the handler returns if no other disposition has already been applied.
  }).open();
}
 
源代码11 项目: vertx-proton   文件: HelloWorld.java
private static void helloWorldSendAndConsumeExample(ProtonConnection connection) {
  connection.open();

  // Receive messages from queue "foo" (using an ActiveMQ style address as example).
  String address = "queue://foo";

  connection.createReceiver(address).handler((delivery, msg) -> {
    Section body = msg.getBody();
    if (body instanceof AmqpValue) {
      String content = (String) ((AmqpValue) body).getValue();
      System.out.println("Received message with content: " + content);
    }
    // By default, the receiver automatically accepts (and settles) the delivery
    // when the handler returns, if no other disposition has been applied.
    // To change this and always manage dispositions yourself, use the
    // setAutoAccept method on the receiver.
  }).open();

  // Create an anonymous (no address) sender, have the message carry its destination
  ProtonSender sender = connection.createSender(null);

  // Create a message to send, have it carry its destination for use with the anonymous sender
  Message message = message(address, "Hello World from client");

  // Can optionally add an openHandler or sendQueueDrainHandler
  // to await remote sender open completing or credit to send being
  // granted. But here we will just buffer the send immediately.
  sender.open();
  System.out.println("Sending message to server");
  sender.send(message, delivery -> {
    System.out.println(String.format("The message was received by the server: remote state=%s, remotely settled=%s",
        delivery.getRemoteState(), delivery.remotelySettled()));
  });
}
 
源代码12 项目: vertx-proton   文件: ProtonClientTest.java
private Object getMessageBody(TestContext context, Message msg) {
  Section body = msg.getBody();

  context.assertNotNull(body);
  context.assertTrue(body instanceof AmqpValue);

  return ((AmqpValue) body).getValue();
}
 
源代码13 项目: vertx-proton   文件: ProtonSubscriberIntTest.java
private Object getMessageBody(TestContext context, Message msg) {
  Section body = msg.getBody();

  context.assertNotNull(body);
  context.assertTrue(body instanceof AmqpValue);

  return ((AmqpValue) body).getValue();
}
 
源代码14 项目: vertx-proton   文件: ProtonPublisherIntTest.java
private Object getMessageBody(TestContext context, Message msg) {
  Section body = msg.getBody();

  context.assertNotNull(body);
  context.assertTrue(body instanceof AmqpValue);

  return ((AmqpValue) body).getValue();
}
 
源代码15 项目: qpid-jms   文件: AmqpTypedObjectDelegate.java
@Override
public Serializable getObject() throws IOException, ClassNotFoundException {
    Section body = null;

    if (encodedBody != null) {
        body = AmqpCodec.decode(encodedBody);
    }

    if (body == null) {
        return null;
    } else if (body instanceof AmqpValue) {
        // TODO: This is assuming the object can be immediately returned, and is
        //       deeply Serializable. We will actually have to ensure elements are
        //       Serializable and e.g convert the Uint/Ubyte etc wrappers.
        return (Serializable) ((AmqpValue) body).getValue();
    } else if (body instanceof Data) {
        // TODO: return as byte[]? ByteBuffer?
        throw new UnsupportedOperationException("Data support still to be added");
    } else if (body instanceof AmqpSequence) {
        // TODO: This is assuming the object can be immediately returned, and is
        //       deeply Serializable. We will actually have to ensure elements are
        //       Serializable and e.g convert the Uint/Ubyte etc wrappers.
        return (Serializable) ((AmqpSequence) body).getValue();
    } else {
        throw new IllegalStateException("Unexpected body type: " + body.getClass().getSimpleName());
    }
}
 
源代码16 项目: qpid-jms   文件: AmqpJmsTextMessageFacade.java
@Override
public String getText() throws JMSException {
    Section body = getBody();

    if (body == null) {
        return null;
    } else if (body instanceof Data) {
        Data data = (Data) body;
        if (data.getValue() == null || data.getValue().getLength() == 0) {
            return "";
        } else {
            Binary b = data.getValue();
            ByteBuffer buf = ByteBuffer.wrap(b.getArray(), b.getArrayOffset(), b.getLength());

            try {
                CharBuffer chars = charset.newDecoder().decode(buf);
                return String.valueOf(chars);
            } catch (CharacterCodingException e) {
                throw JmsExceptionSupport.create("Cannot decode String in " + charset.displayName(), e);
            }
        }
    } else if (body instanceof AmqpValue) {
        Object value = ((AmqpValue) body).getValue();

        if (value == null || value instanceof String) {
            return (String) value;
        } else {
            throw new IllegalStateException("Unexpected amqp-value body content type: " + value.getClass().getSimpleName());
        }
    } else {
        throw new IllegalStateException("Unexpected message body type: " + body.getClass().getSimpleName());
    }
}
 
源代码17 项目: qpid-jms   文件: AmqpCodec.java
/**
 * Given an AMQP Section encode it and return the buffer holding the encoded value
 *
 * @param section
 *      the AMQP Section value to encode.
 *
 * @return a buffer holding the encoded bytes of the given AMQP Section object.
 */
public static ByteBuf encode(Section section) {
    if (section == null) {
        return null;
    }

    AmqpWritableBuffer buffer = new AmqpWritableBuffer();

    EncoderImpl encoder = getEncoder();
    encoder.setByteBuffer(buffer);
    encoder.writeObject(section);
    encoder.setByteBuffer((WritableBuffer) null);

    return buffer.getBuffer();
}
 
源代码18 项目: qpid-jms   文件: AmqpCodec.java
/**
 * Given an encoded AMQP Section, decode the value previously written there.
 *
 * @param encoded
 *      the AMQP Section value to decode.
 *
 * @return a Section object read from its encoded form.
 */
public static Section decode(ByteBuf encoded) {
    if (encoded == null || !encoded.isReadable()) {
        return null;
    }

    DecoderImpl decoder = TLS_CODEC.get().decoder;
    decoder.setByteBuffer(encoded.nioBuffer());
    Section result = (Section) decoder.readObject();
    decoder.setByteBuffer(null);
    encoded.resetReaderIndex();

    return result;
}
 
源代码19 项目: qpid-jms   文件: AmqpJmsBytesMessageFacadeTest.java
private void assertDataBodyAsExpected(Section body, int length) {
    assertNotNull("Expected body section to be present", body);
    assertEquals("Unexpected body section type", Data.class, body.getClass());
    Binary value = ((Data) body).getValue();
    assertNotNull(value);
    assertEquals("Unexpected body length", length, value.getLength());
}
 
源代码20 项目: qpid-jms   文件: AmqpJmsObjectMessageFacadeTest.java
/**
 * Test that setting an object on a new message results in the expected
 * content in the body section of the underlying message.
 *
 * @throws Exception if an error occurs during the test.
 */
@Test
public void testSetObjectOnNewAmqpTypedMessage() throws Exception {
    String content = "myStringContent";

    AmqpJmsObjectMessageFacade amqpObjectMessageFacade = createNewObjectMessageFacade(true);
    amqpObjectMessageFacade.setObject(content);

    // retrieve the body from the underlying message, check it matches expectation
    Section section = amqpObjectMessageFacade.getBody();
    assertNotNull(section);
    assertEquals(AmqpValue.class, section.getClass());
    assertEquals("Underlying message body did not contain the expected content", content, ((AmqpValue) section).getValue());
}
 
源代码21 项目: qpid-jms   文件: AmqpJmsStreamMessageFacadeTest.java
@Test
public void testNewMessageToSendContainsAmqpSequenceBody() throws Exception {
    AmqpJmsStreamMessageFacade amqpStreamMessageFacade = createNewStreamMessageFacade();

    Section body = amqpStreamMessageFacade.getBody();

    assertNotNull("Body section was not present", body);
    assertTrue("Body section was not of expected type: " + body.getClass(), body instanceof AmqpSequence);
}
 
源代码22 项目: strimzi-kafka-bridge   文件: AmqpBridgeReceiver.java
public void run(Vertx vertx) {

            this.receivers = new ProtonReceiver[ExampleOne.RECEIVERS_COUNT];

            ProtonClient client = ProtonClient.create(vertx);

            client.connect(AmqpBridgeReceiver.BRIDGE_HOST, AmqpBridgeReceiver.BRIDGE_PORT, ar -> {

                if (ar.succeeded()) {

                    this.received = 0;

                    this.connection = ar.result();
                    this.connection.open();

                    log.info("Connected as {}", this.connection.getContainer());

                    for (int i = 0; i < this.receivers.length; i++) {

                        if (ExampleOne.IS_SAME_GROUP_ID) {
                            this.receivers[i] = this.connection.createReceiver(String.format("%s/group.id/%s", ExampleOne.TOPIC, ExampleOne.GROUP_ID_PREFIX));
                        } else {
                            this.receivers[i] = this.connection.createReceiver(String.format("%s/group.id/%s%d", ExampleOne.TOPIC, ExampleOne.GROUP_ID_PREFIX, i));
                        }

                        int index = i;

                        this.receivers[i].handler((delivery, message) -> {

                            this.received++;

                            Section body = message.getBody();

                            if (body instanceof Data) {
                                byte[] value = ((Data) body).getValue().getArray();
                                log.info("Message received {} by receiver {} ...", new String(value), index);

                            } else if (body instanceof AmqpValue) {
                                Object amqpValue = ((AmqpValue) body).getValue();
                                // encoded as String
                                if (amqpValue instanceof String) {
                                    String content = (String) ((AmqpValue) body).getValue();
                                    log.info("Message received {} by receiver {} ...", content, index);
                                // encoded as a List
                                } else if (amqpValue instanceof List) {
                                    List<?> list = (List<?>) ((AmqpValue) body).getValue();
                                    log.info("Message received {} by receiver {} ...", list, index);
                                // encoded as an array
                                } else if (amqpValue instanceof Object[]) {
                                    Object[] array = (Object[]) ((AmqpValue) body).getValue();
                                    log.info("Message received {} by receiver {} ...", array, index);
                                // encoded as a Map
                                } else if (amqpValue instanceof Map) {
                                    Map<?, ?> map = (Map<?, ?>) ((AmqpValue) body).getValue();
                                    log.info("Message received {} by receiver {} ...", map, index);
                                }
                            }

                            // default is AT_LEAST_ONCE QoS (unsettled) so we need to send disposition (settle) to sender
                            delivery.disposition(Accepted.getInstance(), true);

                            MessageAnnotations messageAnnotations = message.getMessageAnnotations();
                            if (messageAnnotations != null) {
                                Object partition = messageAnnotations.getValue().get(Symbol.getSymbol("x-opt-bridge.partition"));
                                Object offset = messageAnnotations.getValue().get(Symbol.getSymbol("x-opt-bridge.offset"));
                                Object key = messageAnnotations.getValue().get(Symbol.getSymbol("x-opt-bridge.key"));
                                Object topic = messageAnnotations.getValue().get(Symbol.valueOf("x-opt-bridge.topic"));
                                log.info("... on topic {} partition {} [{}], key = {}", topic, partition, offset, key);
                            }
                        }).open();
                    }
                }

            });

            try {
                System.in.read();

                for (int i = 0; i < this.receivers.length; i++) {
                    if (this.receivers[i].isOpen())
                        this.receivers[i].close();
                }
                this.connection.close();

                log.info("Total received {}", this.received);

            } catch (IOException e) {
                e.printStackTrace();
            }
        }
 
源代码23 项目: strimzi-kafka-bridge   文件: AmqpBridgeTest.java
@Test
void sendReceiveInMultiplexing(VertxTestContext context) throws InterruptedException {
    String topic = "sendReceiveInMultiplexing";
    kafkaCluster.createTopic(topic, 1, 1);

    ProtonClient client = ProtonClient.create(this.vertx);

    Checkpoint consume = context.checkpoint();
    client.connect(AmqpBridgeTest.BRIDGE_HOST, AmqpBridgeTest.BRIDGE_PORT, ar -> {

        if (ar.succeeded()) {

            ProtonConnection connection = ar.result();
            connection.open();

            String sentBody = "Simple message from " + connection.getContainer();
            Message sentMessage = ProtonHelper.message(topic, sentBody);

            ProtonReceiver receiver = connection.createReceiver(topic + "/group.id/my_group");
            receiver.handler((delivery, receivedMessage) -> {

                Section receivedBody = receivedMessage.getBody();
                if (receivedBody instanceof Data) {
                    byte[] value = ((Data) receivedBody).getValue().getArray();
                    log.info("Message received {}", new String(value));
                    // default is AT_LEAST_ONCE QoS (unsettled) so we need to send disposition (settle) to sender
                    delivery.disposition(Accepted.getInstance(), true);
                    context.verify(() -> assertThat(sentBody, is(new String(value))));
                    consume.flag();
                }
            })
                    .setPrefetch(this.bridgeConfig.getAmqpConfig().getFlowCredit()).open();

            ProtonSender sender = connection.createSender(null);
            sender.open();

            sender.send(ProtonHelper.tag("my_tag"), sentMessage, delivery -> {
                log.info("Message delivered {}", delivery.getRemoteState());
                context.verify(() -> assertThat(Accepted.getInstance(), is(delivery.getRemoteState())));
            });

        } else {
            context.failNow(ar.cause());
        }
    });
    assertThat(context.awaitCompletion(60, TimeUnit.SECONDS), is(true));
}
 
源代码24 项目: strimzi-kafka-bridge   文件: AmqpBridgeTest.java
@Test
void receiveSimpleMessage(VertxTestContext context) throws InterruptedException, ExecutionException, TimeoutException {
    String topic = "receiveSimpleMessage";
    kafkaCluster.createTopic(topic, 1, 1);

    String sentBody = "Simple message";

    Checkpoint consume = context.checkpoint();
    kafkaCluster.produceStrings(topic, sentBody, 1, 0);

    ProtonClient client = ProtonClient.create(this.vertx);
    client.connect(AmqpBridgeTest.BRIDGE_HOST, AmqpBridgeTest.BRIDGE_PORT, ar -> {
        if (ar.succeeded()) {

            ProtonConnection connection = ar.result();
            connection.open();

            ProtonReceiver receiver = connection.createReceiver(topic + "/group.id/my_group");
            receiver.handler((delivery, message) -> {

                Section body = message.getBody();
                if (body instanceof Data) {
                    byte[] value = ((Data) body).getValue().getArray();

                    // default is AT_LEAST_ONCE QoS (unsettled) so we need to send disposition (settle) to sender
                    delivery.disposition(Accepted.getInstance(), true);

                    // get topic, partition, offset and key from AMQP annotations
                    MessageAnnotations annotations = message.getMessageAnnotations();
                    context.verify(() -> {
                        assertThat(annotations, notNullValue());
                        String topicAnnotation = String.valueOf(annotations.getValue().get(Symbol.valueOf(AmqpBridge.AMQP_TOPIC_ANNOTATION)));
                        assertThat(topicAnnotation, notNullValue());
                        Integer partitionAnnotation = (Integer) annotations.getValue().get(Symbol.valueOf(AmqpBridge.AMQP_PARTITION_ANNOTATION));
                        assertThat(partitionAnnotation, notNullValue());
                        Long offsetAnnotation = (Long) annotations.getValue().get(Symbol.valueOf(AmqpBridge.AMQP_OFFSET_ANNOTATION));
                        assertThat(offsetAnnotation, notNullValue());
                        Object keyAnnotation = annotations.getValue().get(Symbol.valueOf(AmqpBridge.AMQP_KEY_ANNOTATION));
                        assertThat(keyAnnotation, nullValue());
                        log.info("Message consumed topic={} partition={} offset={}, key={}, value={}",
                                topicAnnotation, partitionAnnotation, offsetAnnotation, keyAnnotation, new String(value));

                        assertThat(topicAnnotation, is(topic));
                        assertThat(partitionAnnotation, is(0));
                        assertThat(offsetAnnotation, is(0L));
                        assertThat(sentBody, is(new String(value)));
                    });
                    consume.flag();
                }
            })
                    .setPrefetch(this.bridgeConfig.getAmqpConfig().getFlowCredit()).open();
        } else {
            context.failNow(ar.cause());
        }
    });
    assertThat(context.awaitCompletion(60, TimeUnit.SECONDS), is(true));
}
 
源代码25 项目: strimzi-kafka-bridge   文件: AmqpBridgeTest.java
@Test
void receiveSimpleMessageFromPartition(VertxTestContext context) throws InterruptedException, ExecutionException, TimeoutException {
    String topic = "receiveSimpleMessageFromPartition";
    kafkaCluster.createTopic(topic, 2, 1);

    String sentBody = "Simple message";

    // Futures for wait
    Checkpoint consume = context.checkpoint();
    kafkaCluster.produceStrings(topic, sentBody, 1, 1);

    ProtonClient client = ProtonClient.create(this.vertx);

    client.connect(AmqpBridgeTest.BRIDGE_HOST, AmqpBridgeTest.BRIDGE_PORT, ar -> {
        if (ar.succeeded()) {

            ProtonConnection connection = ar.result();
            connection.open();

            ProtonReceiver receiver = connection.createReceiver(topic + "/group.id/my_group");

            Source source = (Source) receiver.getSource();

            // filter on specific partition
            Map<Symbol, Object> map = new HashMap<>();
            map.put(Symbol.valueOf(AmqpBridge.AMQP_PARTITION_FILTER), 1);
            source.setFilter(map);

            receiver.handler((delivery, message) -> {

                Section body = message.getBody();
                if (body instanceof Data) {
                    byte[] value = ((Data) body).getValue().getArray();

                    // default is AT_LEAST_ONCE QoS (unsettled) so we need to send disposition (settle) to sender
                    delivery.disposition(Accepted.getInstance(), true);

                    // get topic, partition, offset and key from AMQP annotations
                    MessageAnnotations annotations = message.getMessageAnnotations();
                    context.verify(() -> assertThat(annotations, notNullValue()));
                    String topicAnnotation = String.valueOf(annotations.getValue().get(Symbol.valueOf(AmqpBridge.AMQP_TOPIC_ANNOTATION)));
                    assertThat(topicAnnotation, notNullValue());
                    Integer partitionAnnotation = (Integer) annotations.getValue().get(Symbol.valueOf(AmqpBridge.AMQP_PARTITION_ANNOTATION));
                    assertThat(partitionAnnotation, notNullValue());
                    Long offsetAnnotation = (Long) annotations.getValue().get(Symbol.valueOf(AmqpBridge.AMQP_OFFSET_ANNOTATION));
                    assertThat(offsetAnnotation, notNullValue());
                    Object keyAnnotation = annotations.getValue().get(Symbol.valueOf(AmqpBridge.AMQP_KEY_ANNOTATION));
                    assertThat(keyAnnotation, nullValue());
                    log.info("Message consumed topic={} partition={} offset={}, key={}, value={}",
                            topicAnnotation, partitionAnnotation, offsetAnnotation, keyAnnotation, new String(value));

                    assertThat(topicAnnotation, is(topic));
                    assertThat(partitionAnnotation, is(1));
                    assertThat(offsetAnnotation, is(0L));
                    assertThat(sentBody, is(new String(value)));
                    consume.flag();
                }
            })
                    .setPrefetch(this.bridgeConfig.getAmqpConfig().getFlowCredit()).open();
        } else {
            context.failNow(ar.cause());
        }
    });
    assertThat(context.awaitCompletion(60, TimeUnit.SECONDS), is(true));
}
 
源代码26 项目: strimzi-kafka-bridge   文件: AmqpBridgeTest.java
@Test
void receiveSimpleMessageFromPartitionAndOffset(VertxTestContext context) throws InterruptedException, ExecutionException, TimeoutException {
    String topic = "receiveSimpleMessageFromPartitionAndOffset";
    kafkaCluster.createTopic(topic, 1, 1);

    // Futures for wait
    Checkpoint consume = context.checkpoint();
    kafkaCluster.produceStrings(topic, 11, 0);

    ProtonClient client = ProtonClient.create(this.vertx);

    client.connect(AmqpBridgeTest.BRIDGE_HOST, AmqpBridgeTest.BRIDGE_PORT, ar -> {
        if (ar.succeeded()) {

            ProtonConnection connection = ar.result();
            connection.open();

            ProtonReceiver receiver = connection.createReceiver(topic + "/group.id/my_group");

            Source source = (Source) receiver.getSource();

            // filter on specific partition
            Map<Symbol, Object> map = new HashMap<>();
            map.put(Symbol.valueOf(AmqpBridge.AMQP_PARTITION_FILTER), 0);
            map.put(Symbol.valueOf(AmqpBridge.AMQP_OFFSET_FILTER), (long) 10);
            source.setFilter(map);

            receiver.handler((delivery, message) -> {

                Section body = message.getBody();
                if (body instanceof Data) {
                    byte[] value = ((Data) body).getValue().getArray();

                    // default is AT_LEAST_ONCE QoS (unsettled) so we need to send disposition (settle) to sender
                    delivery.disposition(Accepted.getInstance(), true);

                    // get topic, partition, offset and key from AMQP annotations
                    MessageAnnotations annotations = message.getMessageAnnotations();
                    context.verify(() -> assertThat(annotations, notNullValue()));
                    String topicAnnotation = String.valueOf(annotations.getValue().get(Symbol.valueOf(AmqpBridge.AMQP_TOPIC_ANNOTATION)));
                    assertThat(topicAnnotation, notNullValue());
                    Integer partitionAnnotation = (Integer) annotations.getValue().get(Symbol.valueOf(AmqpBridge.AMQP_PARTITION_ANNOTATION));
                    assertThat(partitionAnnotation, notNullValue());
                    Long offsetAnnotation = (Long) annotations.getValue().get(Symbol.valueOf(AmqpBridge.AMQP_OFFSET_ANNOTATION));
                    assertThat(offsetAnnotation, notNullValue());
                    Object keyAnnotation = annotations.getValue().get(Symbol.valueOf(AmqpBridge.AMQP_KEY_ANNOTATION));
                    assertThat(keyAnnotation, notNullValue());
                    log.info("Message consumed topic={} partition={} offset={}, key={}, value={}",
                            topicAnnotation, partitionAnnotation, offsetAnnotation, keyAnnotation, new String(value));

                    assertThat(topicAnnotation, is(topic));
                    assertThat(partitionAnnotation, is(0));
                    assertThat(offsetAnnotation, is(10L));
                    assertThat(keyAnnotation, is("key-10"));
                    assertThat(new String(value), is("value-10"));
                    consume.flag();
                }
            })
                    .setPrefetch(this.bridgeConfig.getAmqpConfig().getFlowCredit()).open();
        } else {
            context.failNow(ar.cause());
        }
    });
    assertThat(context.awaitCompletion(60, TimeUnit.SECONDS), is(true));
}
 
源代码27 项目: vertx-proton   文件: HelloWorldServer.java
private static void helloProcessConnection(Vertx vertx, ProtonConnection connection) {
  connection.openHandler(res -> {
    System.out.println("Client connected: " + connection.getRemoteContainer());
    connection.open();
  }).closeHandler(c -> {
    System.out.println("Client closing amqp connection: " + connection.getRemoteContainer());
    connection.close();
    connection.disconnect();
  }).disconnectHandler(c -> {
    System.out.println("Client socket disconnected: " + connection.getRemoteContainer());
    connection.disconnect();
  })
  .sessionOpenHandler(session -> session.open());

  connection.receiverOpenHandler(receiver -> {
    receiver.setTarget(receiver.getRemoteTarget()) // This is rather naive, for example use only, proper
                                                   // servers should ensure that they advertise their own
                                                   // Target settings that actually reflect what is in place.
                                                   // The request may have also been for a dynamic address.
        .handler((delivery, msg) -> {

          String address = msg.getAddress();
          if (address == null) {
            address = receiver.getRemoteTarget().getAddress();
          }

          Section body = msg.getBody();
          if (body instanceof AmqpValue) {
            String content = (String) ((AmqpValue) body).getValue();
            System.out.println("message to:" + address + ", body: " + content);
          }
        }).open();
  });

  connection.senderOpenHandler(sender -> {
    System.out.println("Sending to client from: " + sender.getRemoteSource().getAddress());
    sender.setSource(sender.getRemoteSource()); // This is rather naive, for example use only, proper
                                                // servers should ensure that they advertise their own
                                                // Source settings that actually reflect what is in place.
                                                // The request may have also been for a dynamic address.
    sender.open();
    vertx.setPeriodic(1000, timer -> {
      if (connection.isDisconnected()) {
        vertx.cancelTimer(timer);
      } else {
        System.out.println("Sending message to client");
        Message m = message("Hello World from Server!");
        sender.send(m, delivery -> {
          System.out.println("The message was received by the client.");
        });
      }
    });
  });

}
 
private TestServer createServer() throws Exception {
  return new TestServer(vertx, (connection) -> {
    connection.openHandler(res -> {
      LOG.trace("Client connected: " + connection.getRemoteContainer());
      connection.open();
    }).closeHandler(c -> {
      LOG.trace("Client closing amqp connection: " + connection.getRemoteContainer());
      connection.close();
      connection.disconnect();
    }).disconnectHandler(c -> {
      LOG.trace("Client socket disconnected: " + connection.getRemoteContainer());
      connection.disconnect();
    })
    .sessionOpenHandler(session -> session.open());

    connection.receiverOpenHandler(receiver -> {
      if(!server.getDetachLink()) {
        LOG.trace("Receiving from client to: " + receiver.getRemoteTarget().getAddress());
        receiver.setTarget(receiver.getRemoteTarget()); // This is rather naive, for example use only, proper
                                                        // servers should ensure that they advertise their own
                                                        // Target settings that actually reflect what is in place.
                                                        // The request may have also been for a dynamic address.
        receiver.handler((delivery, msg) -> {

          String address = msg.getAddress();
          if (address == null) {
            address = receiver.getRemoteTarget().getAddress();
          }

          Section body = msg.getBody();
          String content  = "unknown";
          if (body instanceof AmqpValue) {
            content = (String) ((AmqpValue) body).getValue();
          }

          LOG.trace("message to:" + address + ", body: " + content);
        });

        receiver.closeHandler(s -> {
          s.result().close();
        });
      }

      receiver.open();

      if(server.getDetachLink()) {
        receiver.setCondition(new ErrorCondition(Symbol.getSymbol("Failed Subscriber Requested"), ""));
        receiver.close();
      }
    });
  });
}
 
private TestServer createServer() throws Exception {
  return new TestServer(vertx, (connection) -> {
    connection.openHandler(res -> {
      LOG.trace("Client connected: " + connection.getRemoteContainer());
      connection.open();
    }).closeHandler(c -> {
      LOG.trace("Client closing amqp connection: " + connection.getRemoteContainer());
      connection.close();
      connection.disconnect();
    }).disconnectHandler(c -> {
      LOG.trace("Client socket disconnected: " + connection.getRemoteContainer());
      connection.disconnect();
    })
    .sessionOpenHandler(session -> session.open());

    connection.receiverOpenHandler(receiver -> {
      if(!server.getDetachLink()) {
        LOG.trace("Receiving from client to: " + receiver.getRemoteTarget().getAddress());
        receiver.setTarget(receiver.getRemoteTarget()); // This is rather naive, for example use only, proper
                                                        // servers should ensure that they advertise their own
                                                        // Target settings that actually reflect what is in place.
                                                        // The request may have also been for a dynamic address.
        receiver.handler((delivery, msg) -> {

          String address = msg.getAddress();
          if (address == null) {
            address = receiver.getRemoteTarget().getAddress();
          }

          Section body = msg.getBody();
          String content  = "unknown";
          if (body instanceof AmqpValue) {
            content = (String) ((AmqpValue) body).getValue();
          }

          LOG.trace("message to:" + address + ", body: " + content);
        });

        receiver.closeHandler(s -> {
          s.result().close();
        });
      }

      receiver.open();

      if(server.getDetachLink()) {
        receiver.setCondition(new ErrorCondition(Symbol.getSymbol("Failed Subscriber Requested"), ""));
        receiver.close();
      }
    });
  });
}
 
private TestServer createServer() throws Exception {
  return new TestServer(vertx, (connection) -> {
    connection.openHandler(res -> {
      LOG.trace("Client connected: " + connection.getRemoteContainer());
      connection.open();
    }).closeHandler(c -> {
      LOG.trace("Client closing amqp connection: " + connection.getRemoteContainer());
      connection.close();
      connection.disconnect();
    }).disconnectHandler(c -> {
      LOG.trace("Client socket disconnected: " + connection.getRemoteContainer());
      connection.disconnect();
    })
    .sessionOpenHandler(session -> session.open());

    connection.receiverOpenHandler(receiver -> {
      if(!server.getDetachLink()) {
        LOG.trace("Receiving from client to: " + receiver.getRemoteTarget().getAddress());
        receiver.setTarget(receiver.getRemoteTarget()); // This is rather naive, for example use only, proper
                                                        // servers should ensure that they advertise their own
                                                        // Target settings that actually reflect what is in place.
                                                        // The request may have also been for a dynamic address.
        receiver.handler((delivery, msg) -> {

          String address = msg.getAddress();
          if (address == null) {
            address = receiver.getRemoteTarget().getAddress();
          }

          Section body = msg.getBody();
          String content  = "unknown";
          if (body instanceof AmqpValue) {
            content = (String) ((AmqpValue) body).getValue();
          }

          LOG.trace("message to:" + address + ", body: " + content);
        });

        receiver.closeHandler(s -> {
          s.result().close();
        });
      }

      receiver.open();

      if(server.getDetachLink()) {
        receiver.setCondition(new ErrorCondition(Symbol.getSymbol("Failed Subscriber Requested"), ""));
        receiver.close();
      }
    });
  });
}