下面列出了javax.jms.MessageEOFException#org.apache.qpid.proton.amqp.messaging.Section 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testBodyOnlyEncodeDecode() throws Exception {
MessageImpl incomingMessage = (MessageImpl) Proton.message();
incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
ICoreMessage core = encodeAndCreateAMQPMessage(incomingMessage).toCore();
AMQPMessage outboudMessage = AMQPConverter.getInstance().fromCore(core, null);
assertNull(outboudMessage.getHeader());
Section body = outboudMessage.getBody();
assertNotNull(body);
assertTrue(body instanceof AmqpValue);
assertTrue(((AmqpValue) body).getValue() instanceof String);
}
@Test
public void testHeaderButNoPropertiesEncodeDecode() throws Exception {
MessageImpl incomingMessage = (MessageImpl) Proton.message();
incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
incomingMessage.setDurable(true);
ICoreMessage core = encodeAndCreateAMQPMessage(incomingMessage).toCore();
AMQPMessage outboudMessage = AMQPConverter.getInstance().fromCore(core, null);
assertNotNull(outboudMessage.getHeader());
Section body = outboudMessage.getBody();
assertNotNull(body);
assertTrue(body instanceof AmqpValue);
assertTrue(((AmqpValue) body).getValue() instanceof String);
}
@Override
public void setObject(Serializable value) throws IOException {
if (value == null) {
parent.setBody(NULL_OBJECT_BODY);
encodedBody = null;
} else if (isSupportedAmqpValueObjectType(value)) {
// Exchange the incoming body value for one that is created from encoding
// and decoding the value. Save the bytes for subsequent getObject and
// copyInto calls to use.
encodedBody = AmqpCodec.encode(new AmqpValue(value));
Section decodedBody = AmqpCodec.decode(encodedBody);
// This step requires a heavy-weight operation of both encoding and decoding the
// incoming body value in order to create a copy such that changes to the original
// do not affect the stored value, and also verifies we can actually encode it at all
// now instead of later during send. In the future it makes sense to try to enhance
// proton such that we can encode the body and use those bytes directly on the
// message as it is being sent.
parent.setBody(decodedBody);
} else {
// TODO: Data and AmqpSequence?
throw new IllegalArgumentException("Encoding this object type with the AMQP type system is not supported: " + value.getClass().getName());
}
}
@SuppressWarnings("unchecked")
@Override
void setBody(Section body) {
if (body == null) {
initializeEmptyBody();
} else if (body instanceof AmqpValue) {
Object o = ((AmqpValue) body).getValue();
if (o == null) {
initializeEmptyBody();
} else if (o instanceof Map) {
messageBodyMap = (Map<String, Object>) o;
super.setBody(body);
} else {
throw new IllegalStateException("Unexpected message body type: " + body.getClass().getSimpleName());
}
} else {
throw new IllegalStateException("Unexpected message body type: " + body.getClass().getSimpleName());
}
}
/**
* Test that setting an object on a new message results in the expected
* content in the body section of the underlying message.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testSetObjectOnNewMessage() throws Exception {
String content = "myStringContent";
AmqpJmsObjectMessageFacade amqpObjectMessageFacade = createNewObjectMessageFacade(false);
amqpObjectMessageFacade.setObject(content);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(content);
oos.flush();
oos.close();
byte[] bytes = baos.toByteArray();
// retrieve the bytes from the underlying message, check they match expectation
Section section = amqpObjectMessageFacade.getBody();
assertNotNull(section);
assertEquals(Data.class, section.getClass());
assertArrayEquals("Underlying message data section did not contain the expected bytes", bytes, ((Data) section).getValue().getArray());
}
public static Message create(Header header,
DeliveryAnnotations deliveryAnnotations,
MessageAnnotations messageAnnotations,
Properties properties,
ApplicationProperties applicationProperties,
Section body,
Footer footer) {
return new MessageImpl(header, deliveryAnnotations,
messageAnnotations, properties,
applicationProperties, body, footer);
}
public static Message message(Header header,
DeliveryAnnotations deliveryAnnotations, MessageAnnotations messageAnnotations,
Properties properties, ApplicationProperties applicationProperties,
Section body, Footer footer)
{
return Message.Factory.create(header, deliveryAnnotations,
messageAnnotations, properties,
applicationProperties, body, footer);
}
private void validateMessage(byte[] expectedPayload, int msgNum, AmqpMessage message) {
assertNotNull("failed at " + msgNum, message);
Section body = message.getWrappedMessage().getBody();
assertNotNull("No message body for msg " + msgNum, body);
assertTrue("Unexpected message body type for msg " + body.getClass(), body instanceof Data);
assertEquals("Unexpected body content for msg", new Binary(expectedPayload, 0, expectedPayload.length), ((Data) body).getValue());
}
/**
* Retrieves the AMQP Section that composes the body of this message by decoding a
* fresh copy from the encoded message data. Changes to the returned value are not
* reflected in the value encoded in the original message.
*
* @return the Section that makes up the body of this message.
*/
public final Section getBody() {
ensureScanning();
// We only handle Sections of AmqpSequence, AmqpValue and Data types so we filter on those.
// There could also be a Footer and no body so this will prevent a faulty return type in case
// of no body or message type we don't handle.
return scanForMessageSection(Math.max(0, remainingBodyPosition), AmqpSequence.class, AmqpValue.class, Data.class);
}
public void example3(ProtonConnection connection) {
connection.createReceiver("myQueue").handler((delivery, msg) -> {
Section body = msg.getBody();
if (body instanceof AmqpValue) {
System.out.println("Received message with content: " + ((AmqpValue) body).getValue());
}
// By default, the receiver automatically accepts (and settles) the delivery
// when the handler returns if no other disposition has already been applied.
}).open();
}
private static void helloWorldSendAndConsumeExample(ProtonConnection connection) {
connection.open();
// Receive messages from queue "foo" (using an ActiveMQ style address as example).
String address = "queue://foo";
connection.createReceiver(address).handler((delivery, msg) -> {
Section body = msg.getBody();
if (body instanceof AmqpValue) {
String content = (String) ((AmqpValue) body).getValue();
System.out.println("Received message with content: " + content);
}
// By default, the receiver automatically accepts (and settles) the delivery
// when the handler returns, if no other disposition has been applied.
// To change this and always manage dispositions yourself, use the
// setAutoAccept method on the receiver.
}).open();
// Create an anonymous (no address) sender, have the message carry its destination
ProtonSender sender = connection.createSender(null);
// Create a message to send, have it carry its destination for use with the anonymous sender
Message message = message(address, "Hello World from client");
// Can optionally add an openHandler or sendQueueDrainHandler
// to await remote sender open completing or credit to send being
// granted. But here we will just buffer the send immediately.
sender.open();
System.out.println("Sending message to server");
sender.send(message, delivery -> {
System.out.println(String.format("The message was received by the server: remote state=%s, remotely settled=%s",
delivery.getRemoteState(), delivery.remotelySettled()));
});
}
private Object getMessageBody(TestContext context, Message msg) {
Section body = msg.getBody();
context.assertNotNull(body);
context.assertTrue(body instanceof AmqpValue);
return ((AmqpValue) body).getValue();
}
private Object getMessageBody(TestContext context, Message msg) {
Section body = msg.getBody();
context.assertNotNull(body);
context.assertTrue(body instanceof AmqpValue);
return ((AmqpValue) body).getValue();
}
private Object getMessageBody(TestContext context, Message msg) {
Section body = msg.getBody();
context.assertNotNull(body);
context.assertTrue(body instanceof AmqpValue);
return ((AmqpValue) body).getValue();
}
@Override
public Serializable getObject() throws IOException, ClassNotFoundException {
Section body = null;
if (encodedBody != null) {
body = AmqpCodec.decode(encodedBody);
}
if (body == null) {
return null;
} else if (body instanceof AmqpValue) {
// TODO: This is assuming the object can be immediately returned, and is
// deeply Serializable. We will actually have to ensure elements are
// Serializable and e.g convert the Uint/Ubyte etc wrappers.
return (Serializable) ((AmqpValue) body).getValue();
} else if (body instanceof Data) {
// TODO: return as byte[]? ByteBuffer?
throw new UnsupportedOperationException("Data support still to be added");
} else if (body instanceof AmqpSequence) {
// TODO: This is assuming the object can be immediately returned, and is
// deeply Serializable. We will actually have to ensure elements are
// Serializable and e.g convert the Uint/Ubyte etc wrappers.
return (Serializable) ((AmqpSequence) body).getValue();
} else {
throw new IllegalStateException("Unexpected body type: " + body.getClass().getSimpleName());
}
}
@Override
public String getText() throws JMSException {
Section body = getBody();
if (body == null) {
return null;
} else if (body instanceof Data) {
Data data = (Data) body;
if (data.getValue() == null || data.getValue().getLength() == 0) {
return "";
} else {
Binary b = data.getValue();
ByteBuffer buf = ByteBuffer.wrap(b.getArray(), b.getArrayOffset(), b.getLength());
try {
CharBuffer chars = charset.newDecoder().decode(buf);
return String.valueOf(chars);
} catch (CharacterCodingException e) {
throw JmsExceptionSupport.create("Cannot decode String in " + charset.displayName(), e);
}
}
} else if (body instanceof AmqpValue) {
Object value = ((AmqpValue) body).getValue();
if (value == null || value instanceof String) {
return (String) value;
} else {
throw new IllegalStateException("Unexpected amqp-value body content type: " + value.getClass().getSimpleName());
}
} else {
throw new IllegalStateException("Unexpected message body type: " + body.getClass().getSimpleName());
}
}
/**
* Given an AMQP Section encode it and return the buffer holding the encoded value
*
* @param section
* the AMQP Section value to encode.
*
* @return a buffer holding the encoded bytes of the given AMQP Section object.
*/
public static ByteBuf encode(Section section) {
if (section == null) {
return null;
}
AmqpWritableBuffer buffer = new AmqpWritableBuffer();
EncoderImpl encoder = getEncoder();
encoder.setByteBuffer(buffer);
encoder.writeObject(section);
encoder.setByteBuffer((WritableBuffer) null);
return buffer.getBuffer();
}
/**
* Given an encoded AMQP Section, decode the value previously written there.
*
* @param encoded
* the AMQP Section value to decode.
*
* @return a Section object read from its encoded form.
*/
public static Section decode(ByteBuf encoded) {
if (encoded == null || !encoded.isReadable()) {
return null;
}
DecoderImpl decoder = TLS_CODEC.get().decoder;
decoder.setByteBuffer(encoded.nioBuffer());
Section result = (Section) decoder.readObject();
decoder.setByteBuffer(null);
encoded.resetReaderIndex();
return result;
}
private void assertDataBodyAsExpected(Section body, int length) {
assertNotNull("Expected body section to be present", body);
assertEquals("Unexpected body section type", Data.class, body.getClass());
Binary value = ((Data) body).getValue();
assertNotNull(value);
assertEquals("Unexpected body length", length, value.getLength());
}
/**
* Test that setting an object on a new message results in the expected
* content in the body section of the underlying message.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testSetObjectOnNewAmqpTypedMessage() throws Exception {
String content = "myStringContent";
AmqpJmsObjectMessageFacade amqpObjectMessageFacade = createNewObjectMessageFacade(true);
amqpObjectMessageFacade.setObject(content);
// retrieve the body from the underlying message, check it matches expectation
Section section = amqpObjectMessageFacade.getBody();
assertNotNull(section);
assertEquals(AmqpValue.class, section.getClass());
assertEquals("Underlying message body did not contain the expected content", content, ((AmqpValue) section).getValue());
}
@Test
public void testNewMessageToSendContainsAmqpSequenceBody() throws Exception {
AmqpJmsStreamMessageFacade amqpStreamMessageFacade = createNewStreamMessageFacade();
Section body = amqpStreamMessageFacade.getBody();
assertNotNull("Body section was not present", body);
assertTrue("Body section was not of expected type: " + body.getClass(), body instanceof AmqpSequence);
}
public void run(Vertx vertx) {
this.receivers = new ProtonReceiver[ExampleOne.RECEIVERS_COUNT];
ProtonClient client = ProtonClient.create(vertx);
client.connect(AmqpBridgeReceiver.BRIDGE_HOST, AmqpBridgeReceiver.BRIDGE_PORT, ar -> {
if (ar.succeeded()) {
this.received = 0;
this.connection = ar.result();
this.connection.open();
log.info("Connected as {}", this.connection.getContainer());
for (int i = 0; i < this.receivers.length; i++) {
if (ExampleOne.IS_SAME_GROUP_ID) {
this.receivers[i] = this.connection.createReceiver(String.format("%s/group.id/%s", ExampleOne.TOPIC, ExampleOne.GROUP_ID_PREFIX));
} else {
this.receivers[i] = this.connection.createReceiver(String.format("%s/group.id/%s%d", ExampleOne.TOPIC, ExampleOne.GROUP_ID_PREFIX, i));
}
int index = i;
this.receivers[i].handler((delivery, message) -> {
this.received++;
Section body = message.getBody();
if (body instanceof Data) {
byte[] value = ((Data) body).getValue().getArray();
log.info("Message received {} by receiver {} ...", new String(value), index);
} else if (body instanceof AmqpValue) {
Object amqpValue = ((AmqpValue) body).getValue();
// encoded as String
if (amqpValue instanceof String) {
String content = (String) ((AmqpValue) body).getValue();
log.info("Message received {} by receiver {} ...", content, index);
// encoded as a List
} else if (amqpValue instanceof List) {
List<?> list = (List<?>) ((AmqpValue) body).getValue();
log.info("Message received {} by receiver {} ...", list, index);
// encoded as an array
} else if (amqpValue instanceof Object[]) {
Object[] array = (Object[]) ((AmqpValue) body).getValue();
log.info("Message received {} by receiver {} ...", array, index);
// encoded as a Map
} else if (amqpValue instanceof Map) {
Map<?, ?> map = (Map<?, ?>) ((AmqpValue) body).getValue();
log.info("Message received {} by receiver {} ...", map, index);
}
}
// default is AT_LEAST_ONCE QoS (unsettled) so we need to send disposition (settle) to sender
delivery.disposition(Accepted.getInstance(), true);
MessageAnnotations messageAnnotations = message.getMessageAnnotations();
if (messageAnnotations != null) {
Object partition = messageAnnotations.getValue().get(Symbol.getSymbol("x-opt-bridge.partition"));
Object offset = messageAnnotations.getValue().get(Symbol.getSymbol("x-opt-bridge.offset"));
Object key = messageAnnotations.getValue().get(Symbol.getSymbol("x-opt-bridge.key"));
Object topic = messageAnnotations.getValue().get(Symbol.valueOf("x-opt-bridge.topic"));
log.info("... on topic {} partition {} [{}], key = {}", topic, partition, offset, key);
}
}).open();
}
}
});
try {
System.in.read();
for (int i = 0; i < this.receivers.length; i++) {
if (this.receivers[i].isOpen())
this.receivers[i].close();
}
this.connection.close();
log.info("Total received {}", this.received);
} catch (IOException e) {
e.printStackTrace();
}
}
@Test
void sendReceiveInMultiplexing(VertxTestContext context) throws InterruptedException {
String topic = "sendReceiveInMultiplexing";
kafkaCluster.createTopic(topic, 1, 1);
ProtonClient client = ProtonClient.create(this.vertx);
Checkpoint consume = context.checkpoint();
client.connect(AmqpBridgeTest.BRIDGE_HOST, AmqpBridgeTest.BRIDGE_PORT, ar -> {
if (ar.succeeded()) {
ProtonConnection connection = ar.result();
connection.open();
String sentBody = "Simple message from " + connection.getContainer();
Message sentMessage = ProtonHelper.message(topic, sentBody);
ProtonReceiver receiver = connection.createReceiver(topic + "/group.id/my_group");
receiver.handler((delivery, receivedMessage) -> {
Section receivedBody = receivedMessage.getBody();
if (receivedBody instanceof Data) {
byte[] value = ((Data) receivedBody).getValue().getArray();
log.info("Message received {}", new String(value));
// default is AT_LEAST_ONCE QoS (unsettled) so we need to send disposition (settle) to sender
delivery.disposition(Accepted.getInstance(), true);
context.verify(() -> assertThat(sentBody, is(new String(value))));
consume.flag();
}
})
.setPrefetch(this.bridgeConfig.getAmqpConfig().getFlowCredit()).open();
ProtonSender sender = connection.createSender(null);
sender.open();
sender.send(ProtonHelper.tag("my_tag"), sentMessage, delivery -> {
log.info("Message delivered {}", delivery.getRemoteState());
context.verify(() -> assertThat(Accepted.getInstance(), is(delivery.getRemoteState())));
});
} else {
context.failNow(ar.cause());
}
});
assertThat(context.awaitCompletion(60, TimeUnit.SECONDS), is(true));
}
@Test
void receiveSimpleMessage(VertxTestContext context) throws InterruptedException, ExecutionException, TimeoutException {
String topic = "receiveSimpleMessage";
kafkaCluster.createTopic(topic, 1, 1);
String sentBody = "Simple message";
Checkpoint consume = context.checkpoint();
kafkaCluster.produceStrings(topic, sentBody, 1, 0);
ProtonClient client = ProtonClient.create(this.vertx);
client.connect(AmqpBridgeTest.BRIDGE_HOST, AmqpBridgeTest.BRIDGE_PORT, ar -> {
if (ar.succeeded()) {
ProtonConnection connection = ar.result();
connection.open();
ProtonReceiver receiver = connection.createReceiver(topic + "/group.id/my_group");
receiver.handler((delivery, message) -> {
Section body = message.getBody();
if (body instanceof Data) {
byte[] value = ((Data) body).getValue().getArray();
// default is AT_LEAST_ONCE QoS (unsettled) so we need to send disposition (settle) to sender
delivery.disposition(Accepted.getInstance(), true);
// get topic, partition, offset and key from AMQP annotations
MessageAnnotations annotations = message.getMessageAnnotations();
context.verify(() -> {
assertThat(annotations, notNullValue());
String topicAnnotation = String.valueOf(annotations.getValue().get(Symbol.valueOf(AmqpBridge.AMQP_TOPIC_ANNOTATION)));
assertThat(topicAnnotation, notNullValue());
Integer partitionAnnotation = (Integer) annotations.getValue().get(Symbol.valueOf(AmqpBridge.AMQP_PARTITION_ANNOTATION));
assertThat(partitionAnnotation, notNullValue());
Long offsetAnnotation = (Long) annotations.getValue().get(Symbol.valueOf(AmqpBridge.AMQP_OFFSET_ANNOTATION));
assertThat(offsetAnnotation, notNullValue());
Object keyAnnotation = annotations.getValue().get(Symbol.valueOf(AmqpBridge.AMQP_KEY_ANNOTATION));
assertThat(keyAnnotation, nullValue());
log.info("Message consumed topic={} partition={} offset={}, key={}, value={}",
topicAnnotation, partitionAnnotation, offsetAnnotation, keyAnnotation, new String(value));
assertThat(topicAnnotation, is(topic));
assertThat(partitionAnnotation, is(0));
assertThat(offsetAnnotation, is(0L));
assertThat(sentBody, is(new String(value)));
});
consume.flag();
}
})
.setPrefetch(this.bridgeConfig.getAmqpConfig().getFlowCredit()).open();
} else {
context.failNow(ar.cause());
}
});
assertThat(context.awaitCompletion(60, TimeUnit.SECONDS), is(true));
}
@Test
void receiveSimpleMessageFromPartition(VertxTestContext context) throws InterruptedException, ExecutionException, TimeoutException {
String topic = "receiveSimpleMessageFromPartition";
kafkaCluster.createTopic(topic, 2, 1);
String sentBody = "Simple message";
// Futures for wait
Checkpoint consume = context.checkpoint();
kafkaCluster.produceStrings(topic, sentBody, 1, 1);
ProtonClient client = ProtonClient.create(this.vertx);
client.connect(AmqpBridgeTest.BRIDGE_HOST, AmqpBridgeTest.BRIDGE_PORT, ar -> {
if (ar.succeeded()) {
ProtonConnection connection = ar.result();
connection.open();
ProtonReceiver receiver = connection.createReceiver(topic + "/group.id/my_group");
Source source = (Source) receiver.getSource();
// filter on specific partition
Map<Symbol, Object> map = new HashMap<>();
map.put(Symbol.valueOf(AmqpBridge.AMQP_PARTITION_FILTER), 1);
source.setFilter(map);
receiver.handler((delivery, message) -> {
Section body = message.getBody();
if (body instanceof Data) {
byte[] value = ((Data) body).getValue().getArray();
// default is AT_LEAST_ONCE QoS (unsettled) so we need to send disposition (settle) to sender
delivery.disposition(Accepted.getInstance(), true);
// get topic, partition, offset and key from AMQP annotations
MessageAnnotations annotations = message.getMessageAnnotations();
context.verify(() -> assertThat(annotations, notNullValue()));
String topicAnnotation = String.valueOf(annotations.getValue().get(Symbol.valueOf(AmqpBridge.AMQP_TOPIC_ANNOTATION)));
assertThat(topicAnnotation, notNullValue());
Integer partitionAnnotation = (Integer) annotations.getValue().get(Symbol.valueOf(AmqpBridge.AMQP_PARTITION_ANNOTATION));
assertThat(partitionAnnotation, notNullValue());
Long offsetAnnotation = (Long) annotations.getValue().get(Symbol.valueOf(AmqpBridge.AMQP_OFFSET_ANNOTATION));
assertThat(offsetAnnotation, notNullValue());
Object keyAnnotation = annotations.getValue().get(Symbol.valueOf(AmqpBridge.AMQP_KEY_ANNOTATION));
assertThat(keyAnnotation, nullValue());
log.info("Message consumed topic={} partition={} offset={}, key={}, value={}",
topicAnnotation, partitionAnnotation, offsetAnnotation, keyAnnotation, new String(value));
assertThat(topicAnnotation, is(topic));
assertThat(partitionAnnotation, is(1));
assertThat(offsetAnnotation, is(0L));
assertThat(sentBody, is(new String(value)));
consume.flag();
}
})
.setPrefetch(this.bridgeConfig.getAmqpConfig().getFlowCredit()).open();
} else {
context.failNow(ar.cause());
}
});
assertThat(context.awaitCompletion(60, TimeUnit.SECONDS), is(true));
}
@Test
void receiveSimpleMessageFromPartitionAndOffset(VertxTestContext context) throws InterruptedException, ExecutionException, TimeoutException {
String topic = "receiveSimpleMessageFromPartitionAndOffset";
kafkaCluster.createTopic(topic, 1, 1);
// Futures for wait
Checkpoint consume = context.checkpoint();
kafkaCluster.produceStrings(topic, 11, 0);
ProtonClient client = ProtonClient.create(this.vertx);
client.connect(AmqpBridgeTest.BRIDGE_HOST, AmqpBridgeTest.BRIDGE_PORT, ar -> {
if (ar.succeeded()) {
ProtonConnection connection = ar.result();
connection.open();
ProtonReceiver receiver = connection.createReceiver(topic + "/group.id/my_group");
Source source = (Source) receiver.getSource();
// filter on specific partition
Map<Symbol, Object> map = new HashMap<>();
map.put(Symbol.valueOf(AmqpBridge.AMQP_PARTITION_FILTER), 0);
map.put(Symbol.valueOf(AmqpBridge.AMQP_OFFSET_FILTER), (long) 10);
source.setFilter(map);
receiver.handler((delivery, message) -> {
Section body = message.getBody();
if (body instanceof Data) {
byte[] value = ((Data) body).getValue().getArray();
// default is AT_LEAST_ONCE QoS (unsettled) so we need to send disposition (settle) to sender
delivery.disposition(Accepted.getInstance(), true);
// get topic, partition, offset and key from AMQP annotations
MessageAnnotations annotations = message.getMessageAnnotations();
context.verify(() -> assertThat(annotations, notNullValue()));
String topicAnnotation = String.valueOf(annotations.getValue().get(Symbol.valueOf(AmqpBridge.AMQP_TOPIC_ANNOTATION)));
assertThat(topicAnnotation, notNullValue());
Integer partitionAnnotation = (Integer) annotations.getValue().get(Symbol.valueOf(AmqpBridge.AMQP_PARTITION_ANNOTATION));
assertThat(partitionAnnotation, notNullValue());
Long offsetAnnotation = (Long) annotations.getValue().get(Symbol.valueOf(AmqpBridge.AMQP_OFFSET_ANNOTATION));
assertThat(offsetAnnotation, notNullValue());
Object keyAnnotation = annotations.getValue().get(Symbol.valueOf(AmqpBridge.AMQP_KEY_ANNOTATION));
assertThat(keyAnnotation, notNullValue());
log.info("Message consumed topic={} partition={} offset={}, key={}, value={}",
topicAnnotation, partitionAnnotation, offsetAnnotation, keyAnnotation, new String(value));
assertThat(topicAnnotation, is(topic));
assertThat(partitionAnnotation, is(0));
assertThat(offsetAnnotation, is(10L));
assertThat(keyAnnotation, is("key-10"));
assertThat(new String(value), is("value-10"));
consume.flag();
}
})
.setPrefetch(this.bridgeConfig.getAmqpConfig().getFlowCredit()).open();
} else {
context.failNow(ar.cause());
}
});
assertThat(context.awaitCompletion(60, TimeUnit.SECONDS), is(true));
}
private static void helloProcessConnection(Vertx vertx, ProtonConnection connection) {
connection.openHandler(res -> {
System.out.println("Client connected: " + connection.getRemoteContainer());
connection.open();
}).closeHandler(c -> {
System.out.println("Client closing amqp connection: " + connection.getRemoteContainer());
connection.close();
connection.disconnect();
}).disconnectHandler(c -> {
System.out.println("Client socket disconnected: " + connection.getRemoteContainer());
connection.disconnect();
})
.sessionOpenHandler(session -> session.open());
connection.receiverOpenHandler(receiver -> {
receiver.setTarget(receiver.getRemoteTarget()) // This is rather naive, for example use only, proper
// servers should ensure that they advertise their own
// Target settings that actually reflect what is in place.
// The request may have also been for a dynamic address.
.handler((delivery, msg) -> {
String address = msg.getAddress();
if (address == null) {
address = receiver.getRemoteTarget().getAddress();
}
Section body = msg.getBody();
if (body instanceof AmqpValue) {
String content = (String) ((AmqpValue) body).getValue();
System.out.println("message to:" + address + ", body: " + content);
}
}).open();
});
connection.senderOpenHandler(sender -> {
System.out.println("Sending to client from: " + sender.getRemoteSource().getAddress());
sender.setSource(sender.getRemoteSource()); // This is rather naive, for example use only, proper
// servers should ensure that they advertise their own
// Source settings that actually reflect what is in place.
// The request may have also been for a dynamic address.
sender.open();
vertx.setPeriodic(1000, timer -> {
if (connection.isDisconnected()) {
vertx.cancelTimer(timer);
} else {
System.out.println("Sending message to client");
Message m = message("Hello World from Server!");
sender.send(m, delivery -> {
System.out.println("The message was received by the client.");
});
}
});
});
}
private TestServer createServer() throws Exception {
return new TestServer(vertx, (connection) -> {
connection.openHandler(res -> {
LOG.trace("Client connected: " + connection.getRemoteContainer());
connection.open();
}).closeHandler(c -> {
LOG.trace("Client closing amqp connection: " + connection.getRemoteContainer());
connection.close();
connection.disconnect();
}).disconnectHandler(c -> {
LOG.trace("Client socket disconnected: " + connection.getRemoteContainer());
connection.disconnect();
})
.sessionOpenHandler(session -> session.open());
connection.receiverOpenHandler(receiver -> {
if(!server.getDetachLink()) {
LOG.trace("Receiving from client to: " + receiver.getRemoteTarget().getAddress());
receiver.setTarget(receiver.getRemoteTarget()); // This is rather naive, for example use only, proper
// servers should ensure that they advertise their own
// Target settings that actually reflect what is in place.
// The request may have also been for a dynamic address.
receiver.handler((delivery, msg) -> {
String address = msg.getAddress();
if (address == null) {
address = receiver.getRemoteTarget().getAddress();
}
Section body = msg.getBody();
String content = "unknown";
if (body instanceof AmqpValue) {
content = (String) ((AmqpValue) body).getValue();
}
LOG.trace("message to:" + address + ", body: " + content);
});
receiver.closeHandler(s -> {
s.result().close();
});
}
receiver.open();
if(server.getDetachLink()) {
receiver.setCondition(new ErrorCondition(Symbol.getSymbol("Failed Subscriber Requested"), ""));
receiver.close();
}
});
});
}
private TestServer createServer() throws Exception {
return new TestServer(vertx, (connection) -> {
connection.openHandler(res -> {
LOG.trace("Client connected: " + connection.getRemoteContainer());
connection.open();
}).closeHandler(c -> {
LOG.trace("Client closing amqp connection: " + connection.getRemoteContainer());
connection.close();
connection.disconnect();
}).disconnectHandler(c -> {
LOG.trace("Client socket disconnected: " + connection.getRemoteContainer());
connection.disconnect();
})
.sessionOpenHandler(session -> session.open());
connection.receiverOpenHandler(receiver -> {
if(!server.getDetachLink()) {
LOG.trace("Receiving from client to: " + receiver.getRemoteTarget().getAddress());
receiver.setTarget(receiver.getRemoteTarget()); // This is rather naive, for example use only, proper
// servers should ensure that they advertise their own
// Target settings that actually reflect what is in place.
// The request may have also been for a dynamic address.
receiver.handler((delivery, msg) -> {
String address = msg.getAddress();
if (address == null) {
address = receiver.getRemoteTarget().getAddress();
}
Section body = msg.getBody();
String content = "unknown";
if (body instanceof AmqpValue) {
content = (String) ((AmqpValue) body).getValue();
}
LOG.trace("message to:" + address + ", body: " + content);
});
receiver.closeHandler(s -> {
s.result().close();
});
}
receiver.open();
if(server.getDetachLink()) {
receiver.setCondition(new ErrorCondition(Symbol.getSymbol("Failed Subscriber Requested"), ""));
receiver.close();
}
});
});
}
private TestServer createServer() throws Exception {
return new TestServer(vertx, (connection) -> {
connection.openHandler(res -> {
LOG.trace("Client connected: " + connection.getRemoteContainer());
connection.open();
}).closeHandler(c -> {
LOG.trace("Client closing amqp connection: " + connection.getRemoteContainer());
connection.close();
connection.disconnect();
}).disconnectHandler(c -> {
LOG.trace("Client socket disconnected: " + connection.getRemoteContainer());
connection.disconnect();
})
.sessionOpenHandler(session -> session.open());
connection.receiverOpenHandler(receiver -> {
if(!server.getDetachLink()) {
LOG.trace("Receiving from client to: " + receiver.getRemoteTarget().getAddress());
receiver.setTarget(receiver.getRemoteTarget()); // This is rather naive, for example use only, proper
// servers should ensure that they advertise their own
// Target settings that actually reflect what is in place.
// The request may have also been for a dynamic address.
receiver.handler((delivery, msg) -> {
String address = msg.getAddress();
if (address == null) {
address = receiver.getRemoteTarget().getAddress();
}
Section body = msg.getBody();
String content = "unknown";
if (body instanceof AmqpValue) {
content = (String) ((AmqpValue) body).getValue();
}
LOG.trace("message to:" + address + ", body: " + content);
});
receiver.closeHandler(s -> {
s.result().close();
});
}
receiver.open();
if(server.getDetachLink()) {
receiver.setCondition(new ErrorCondition(Symbol.getSymbol("Failed Subscriber Requested"), ""));
receiver.close();
}
});
});
}