下面列出了javax.jms.MessageEOFException#org.apache.qpid.proton.amqp.messaging.Data 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testSourceWithDataContent() {
String topic = UUID.randomUUID().toString();
Map<String, Object> config = getConfig(topic);
List<Message<byte[]>> messages = new ArrayList<>();
provider = new AmqpConnector();
provider.setup(executionHolder);
PublisherBuilder<? extends Message<?>> builder = provider.getPublisherBuilder(new MapBasedConfig(config));
AtomicBoolean opened = new AtomicBoolean();
builder.to(createSubscriber(messages, opened)).run();
await().until(opened::get);
await().until(() -> provider.isReady(config.get(CHANNEL_NAME_ATTRIBUTE).toString()));
List<String> list = new ArrayList<>();
list.add("hello");
list.add("world");
usage.produce(topic, 1, () -> new Data(new Binary(list.toString().getBytes())));
await().atMost(2, TimeUnit.MINUTES).until(() -> !messages.isEmpty());
byte[] result = messages.get(0).getPayload();
assertThat(new String(result))
.isEqualTo(list.toString());
}
/**
* Verifies that the client creates and sends a message based on provided headers and payload
* and sets a timer for canceling the request if no response is received.
*/
@Test
public void testCreateAndSendRequestSendsProperRequestMessage() {
// GIVEN a request-response client that times out requests after 200 ms
client.setRequestTimeout(200);
// WHEN sending a request message with some headers and payload
final JsonObject payload = new JsonObject().put("key", "value");
final Map<String, Object> props = Collections.singletonMap("test-key", "test-value");
client.createAndSendRequest("get", props, payload.toBuffer(), s -> {});
// THEN the message is sent and the message being sent contains the headers as application properties
final ArgumentCaptor<Message> messageCaptor = ArgumentCaptor.forClass(Message.class);
verify(sender).send(messageCaptor.capture(), VertxMockSupport.anyHandler());
assertThat(messageCaptor.getValue()).isNotNull();
assertThat(messageCaptor.getValue().getBody()).isNotNull();
assertThat(messageCaptor.getValue().getBody()).isInstanceOf(Data.class);
final Buffer body = MessageHelper.getPayload(messageCaptor.getValue());
assertThat(body.getBytes()).isEqualTo(payload.toBuffer().getBytes());
assertThat(messageCaptor.getValue().getApplicationProperties()).isNotNull();
assertThat(messageCaptor.getValue().getApplicationProperties().getValue().get("test-key")).isEqualTo("test-value");
// and a timer has been set to time out the request after 200 ms
verify(vertx).setTimer(eq(200L), VertxMockSupport.anyHandler());
}
@Override
public Message toMessage(String address, KafkaConsumerRecord<String, byte[]> record) {
Message message = Proton.message();
message.setAddress(address);
// put message annotations about partition, offset and key (if not null)
Map<Symbol, Object> map = new HashMap<>();
map.put(Symbol.valueOf(AmqpBridge.AMQP_PARTITION_ANNOTATION), record.partition());
map.put(Symbol.valueOf(AmqpBridge.AMQP_OFFSET_ANNOTATION), record.offset());
map.put(Symbol.valueOf(AmqpBridge.AMQP_KEY_ANNOTATION), record.key());
map.put(Symbol.valueOf(AmqpBridge.AMQP_TOPIC_ANNOTATION), record.topic());
MessageAnnotations messageAnnotations = new MessageAnnotations(map);
message.setMessageAnnotations(messageAnnotations);
message.setBody(new Data(new Binary(record.value())));
return message;
}
private void handleMessage(final Message message) {
if (log.isInfoEnabled()) {
String str = message.toString();
if (str.length() > MAX_MESSAGE_DUMP_LENGTH) {
str = str.substring(0, MAX_MESSAGE_DUMP_LENGTH) + "…";
}
log.info("Received message - {}", str);
}
var body = message.getBody();
if (!(body instanceof Data)) {
handleInvalidMessage(message);
return;
}
var json = new JsonObject(Buffer.buffer(((Data) body).getValue().getArray()));
var testId = json.getString("test-id");
var timestamp = json.getLong("timestamp");
if (!this.testId.equals(testId) || timestamp == null) {
handleInvalidMessage(message);
return;
}
handleValidMessage(message, timestamp, json);
}
@Test
public void testConvertBytesMessageToAmqpMessageWithDataBody() throws Exception {
byte[] expectedPayload = new byte[]{8, 16, 24, 32};
ServerJMSBytesMessage outbound = createBytesMessage();
outbound.writeBytes(expectedPayload);
outbound.encode();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage(), null);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
assertEquals(4, ((Data) amqp.getBody()).getValue().getLength());
Binary amqpData = ((Data) amqp.getBody()).getValue();
Binary inputData = new Binary(expectedPayload);
assertTrue(inputData.equals(amqpData));
}
@Test
public void testConvertObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception {
ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE);
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
outbound.encode();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage(), null);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
assertFalse(0 == ((Data) amqp.getBody()).getValue().getLength());
Object value = deserialize(((Data) amqp.getBody()).getValue().getArray());
assertNotNull(value);
assertTrue(value instanceof UUID);
}
@Test
public void testConvertTextMessageCreatesBodyUsingOriginalEncodingWithDataSection() throws Exception {
String contentString = "myTextMessageContent";
ServerJMSTextMessage outbound = createTextMessage(contentString);
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
outbound.encode();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage(), null);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
Binary data = ((Data) amqp.getBody()).getValue();
String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(), StandardCharsets.UTF_8);
assertEquals(contentString, contents);
}
@Test
public void testConvertTextMessageContentNotStoredCreatesBodyUsingOriginalEncodingWithDataSection() throws Exception {
String contentString = "myTextMessageContent";
ServerJMSTextMessage outbound = createTextMessage(contentString);
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
outbound.encode();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage(), null);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
Binary data = ((Data) amqp.getBody()).getValue();
String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(), StandardCharsets.UTF_8);
assertEquals(contentString, contents);
}
@Test
public void testGetTextUsingReceivedMessageWithDataSectionContainingStringBytes() throws Exception {
String encodedString = "myEncodedString";
byte[] encodedBytes = encodedString.getBytes(Charset.forName("UTF-8"));
org.apache.qpid.proton.codec.Data payloadData = org.apache.qpid.proton.codec.Data.Factory.create();
payloadData.putDescribedType(new DataDescribedType(new Binary(encodedBytes)));
Binary b = payloadData.encode();
Message message = Message.Factory.create();
int decoded = message.decode(b.getArray(), b.getArrayOffset(), b.getLength());
assertEquals(decoded, b.getLength());
AmqpJmsTextMessageFacade amqpTextMessageFacade = createReceivedTextMessageFacade(createMockAmqpConsumer(), message);
assertEquals(encodedString, amqpTextMessageFacade.getText());
}
@Test
public void testGetTextWithUnknownEncodedDataThrowsJMSException() throws Exception {
String encodedString = "myEncodedString";
byte[] encodedBytes = encodedString.getBytes(Charset.forName("UTF-16"));
Message message = Message.Factory.create();
message.setBody(new Data(new Binary(encodedBytes)));
AmqpJmsTextMessageFacade amqpTextMessageFacade = createReceivedTextMessageFacade(createMockAmqpConsumer(), message);
try {
amqpTextMessageFacade.getText();
fail("expected exception not thrown");
} catch (JMSException ise) {
// expected
}
}
/**
* Test that a data body containing nothing, but with the content type set to
* {@value AmqpMessageSupport#OCTET_STREAM_CONTENT_TYPE} results in a BytesMessage when not
* otherwise annotated to indicate the type of JMS message it is.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateBytesMessageFromDataWithEmptyBinaryAndContentType() throws Exception {
Message message = Proton.message();
Binary binary = new Binary(new byte[0]);
message.setBody(new Data(binary));
message.setContentType(AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE.toString());
JmsMessage jmsMessage = AmqpCodec.decodeMessage(mockConsumer, encodeMessage(message)).asJmsMessage();
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsBytesMessage.class, jmsMessage.getClass());
JmsMessageFacade facade = jmsMessage.getFacade();
assertNotNull("Facade should not be null", facade);
assertEquals("Unexpected facade class type", AmqpJmsBytesMessageFacade.class, facade.getClass());
}
/**
* Test that a receiving a data body containing nothing and no content type being set
* results in a BytesMessage when not otherwise annotated to indicate the type of
* JMS message it is.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateBytesMessageFromDataWithEmptyBinaryAndNoContentType() throws Exception {
Message message = Proton.message();
Binary binary = new Binary(new byte[0]);
message.setBody(new Data(binary));
assertNull(message.getContentType());
JmsMessage jmsMessage = AmqpCodec.decodeMessage(mockConsumer, encodeMessage(message)).asJmsMessage();
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsBytesMessage.class, jmsMessage.getClass());
JmsMessageFacade facade = jmsMessage.getFacade();
assertNotNull("Facade should not be null", facade);
assertEquals("Unexpected facade class type", AmqpJmsBytesMessageFacade.class, facade.getClass());
}
/**
* Test that receiving a data body containing nothing, but with the content type set to
* {@value AmqpMessageSupport#SERIALIZED_JAVA_OBJECT_CONTENT_TYPE} results in an ObjectMessage
* when not otherwise annotated to indicate the type of JMS message it is.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateObjectMessageFromDataWithContentTypeAndEmptyBinary() throws Exception {
Message message = Proton.message();
Binary binary = new Binary(new byte[0]);
message.setBody(new Data(binary));
message.setContentType(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString());
JmsMessage jmsMessage = AmqpCodec.decodeMessage(mockConsumer, encodeMessage(message)).asJmsMessage();
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsObjectMessage.class, jmsMessage.getClass());
JmsMessageFacade facade = jmsMessage.getFacade();
assertNotNull("Facade should not be null", facade);
assertEquals("Unexpected facade class type", AmqpJmsObjectMessageFacade.class, facade.getClass());
AmqpObjectTypeDelegate delegate = ((AmqpJmsObjectMessageFacade) facade).getDelegate();
assertTrue("Unexpected delegate type: " + delegate, delegate instanceof AmqpSerializedObjectDelegate);
}
/**
* Test that setting an object on a new message results in the expected
* content in the body section of the underlying message.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testSetObjectOnNewMessage() throws Exception {
String content = "myStringContent";
AmqpJmsObjectMessageFacade amqpObjectMessageFacade = createNewObjectMessageFacade(false);
amqpObjectMessageFacade.setObject(content);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(content);
oos.flush();
oos.close();
byte[] bytes = baos.toByteArray();
// retrieve the bytes from the underlying message, check they match expectation
Section section = amqpObjectMessageFacade.getBody();
assertNotNull(section);
assertEquals(Data.class, section.getClass());
assertArrayEquals("Underlying message data section did not contain the expected bytes", bytes, ((Data) section).getValue().getArray());
}
@Test
public void testClearBodyWithExistingInputStream() throws Exception {
byte[] bytes = "myBytes".getBytes();
Message message = Message.Factory.create();
message.setBody(new Data(new Binary(bytes)));
AmqpJmsBytesMessageFacade amqpBytesMessageFacade = createReceivedBytesMessageFacade(createMockAmqpConsumer(), message);
@SuppressWarnings("unused")
InputStream unused = amqpBytesMessageFacade.getInputStream();
amqpBytesMessageFacade.clearBody();
assertEquals("Expected no data from facade, but got some", END_OF_STREAM, amqpBytesMessageFacade.getInputStream().read(new byte[1]));
assertDataBodyAsExpected(amqpBytesMessageFacade.getBody(), 0);
}
@Test
public void testInputStreamUsingReceivedMessageWithDataSection() throws Exception {
byte[] bytes = "myBytes".getBytes();
Message message = Message.Factory.create();
message.setBody(new Data(new Binary(bytes)));
AmqpJmsBytesMessageFacade amqpBytesMessageFacade = createReceivedBytesMessageFacade(createMockAmqpConsumer(), message);
InputStream bytesStream = amqpBytesMessageFacade.getInputStream();
assertNotNull(bytesStream);
// retrieve the expected bytes, check they match
byte[] receivedBytes = new byte[bytes.length];
bytesStream.read(receivedBytes);
assertTrue(Arrays.equals(bytes, receivedBytes));
// verify no more bytes remain, i.e EOS
assertEquals("Expected input stream to be at end but data was returned", END_OF_STREAM, bytesStream.read(new byte[1]));
}
private void assertDataBodyAsExpected(Section body, int length) {
assertNotNull("Expected body section to be present", body);
assertEquals("Unexpected body section type", Data.class, body.getClass());
Binary value = ((Data) body).getValue();
assertNotNull(value);
assertEquals("Unexpected body length", length, value.getLength());
}
private Message getFakeMessage(final String to, final Buffer payload, final String subject) {
final Message message = mock(Message.class);
when(message.getMessageId()).thenReturn("the-message-id");
when(message.getSubject()).thenReturn(subject);
when(message.getAddress()).thenReturn(to);
if (payload != null) {
final Data data = new Data(new Binary(payload.getBytes()));
when(message.getContentType()).thenReturn("text/plain");
when(message.getBody()).thenReturn(data);
}
return message;
}
@Test
public void testHasBodyWithContent() throws Exception {
byte[] bodyBytes = "myOrigBytes".getBytes();
Message message = Message.Factory.create();
message.setBody(new Data(new Binary(bodyBytes)));
AmqpJmsBytesMessageFacade amqpBytesMessageFacade = createReceivedBytesMessageFacade(createMockAmqpConsumer(), message);
assertTrue(amqpBytesMessageFacade.hasBody());
}
/**
* Verifies that the endpoint returns a response with status code 400
* for a request that contains malformed payload.
*/
@SuppressWarnings("unchecked")
@Test
public void testHandleMessageSendsResponseForMalformedPayload() {
final Message msg = ProtonHelper.message();
msg.setSubject("get");
msg.setReplyTo(REPLY_RESOURCE.toString());
msg.setCorrelationId(UUID.randomUUID().toString());
msg.setBody(new Data(new Binary(new byte[] { 0x01, 0x02, 0x03 })));
final ProtonDelivery delivery = mock(ProtonDelivery.class);
final RequestResponseEndpoint<ServiceConfigProperties> endpoint = getEndpoint(true);
endpoint.onLinkAttach(connection, sender, REPLY_RESOURCE);
// WHEN a request for an operation is received that the client is authorized to invoke
endpoint.handleRequestMessage(connection, receiver, resource, delivery, msg);
// THEN then the message is accepted
verify(delivery).disposition(argThat(d -> d instanceof Accepted), eq(Boolean.TRUE));
// and not forwarded to the service instance
verify(eventBus, never()).request(eq(EVENT_BUS_ADDRESS), any(JsonObject.class), any(DeliveryOptions.class), any(Handler.class));
// and a response with the expected status is sent to the client
verify(sender).send(argThat(m -> hasStatusCode(m, HttpURLConnection.HTTP_BAD_REQUEST)));
verify(receiver).flow(1);
}
@Test
public void testGetObjectUsingReceivedMessageWithDataSectionContainingNothingReturnsNull() throws Exception {
Message message = Message.Factory.create();
message.setContentType(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString());
message.setBody(new Data(null));
AmqpJmsObjectMessageFacade amqpObjectMessageFacade = createReceivedObjectMessageFacade(createMockAmqpConsumer(), message);
assertNull("Expected null object", amqpObjectMessageFacade.getObject());
}
/**
* Sets the payload of an AMQP message using a <em>Data</em> section.
*
* @param message The message.
* @param contentType The type of the payload. The message's <em>content-type</em> property
* will only be set if both this and the payload parameter are not {@code null}.
* @param payload The payload or {@code null} if there is no payload to convey in the message body.
*
* @throws NullPointerException If message is {@code null}.
*/
public static void setPayload(final Message message, final String contentType, final byte[] payload) {
Objects.requireNonNull(message);
if (payload != null) {
message.setBody(new Data(new Binary(payload)));
if (contentType != null) {
message.setContentType(contentType);
}
}
}
/**
* Verifies that the helper properly handles malformed JSON payload.
*/
@Test
public void testGetJsonPayloadHandlesMalformedJson() {
final Message msg = ProtonHelper.message();
msg.setBody(new Data(new Binary(new byte[] { 0x01, 0x02, 0x03, 0x04 }))); // not JSON
assertThatThrownBy(() -> MessageHelper.getJsonPayload(msg)).isInstanceOf(DecodeException.class);
}
/**
* Verifies that the helper does not throw an exception when reading
* invalid UTF-8 from a message's payload.
*/
@Test
public void testGetPayloadAsStringHandlesNonCharacterPayload() {
final Message msg = ProtonHelper.message();
msg.setBody(new Data(new Binary(new byte[] { (byte) 0xc3, (byte) 0x28 })));
assertThat(MessageHelper.getPayloadAsString(msg)).isNotNull();
msg.setBody(new Data(new Binary(new byte[] { (byte) 0xf0, (byte) 0x28, (byte) 0x8c, (byte) 0xbc })));
assertThat(MessageHelper.getPayloadAsString(msg)).isNotNull();
}
/**
* Verifies that the helper does not throw an exception when trying to
* read payload as JSON from an empty Data section.
*/
@Test
public void testGetJsonPayloadHandlesEmptyDataSection() {
final Message msg = ProtonHelper.message();
msg.setBody(new Data(new Binary(new byte[0])));
assertThat(MessageHelper.getJsonPayload(msg)).isNull();
}
private Message createTestMessage(final String contentType, final String payload) {
final Message msg = createTestMessageWithoutBody(payload);
if (payload != null) {
msg.setBody(new Data(new Binary(payload.getBytes(StandardCharsets.UTF_8))));
}
msg.setContentType(contentType);
MessageHelper.setJsonPayload(msg, payload);
return msg;
}
@Override
public Data readValue() {
ReadableBuffer buffer = getDecoder().getBuffer();
byte encodingCode = buffer.get();
int size = 0;
switch (encodingCode) {
case EncodingCodes.VBIN8:
size = buffer.get() & 0xFF;
break;
case EncodingCodes.VBIN32:
size = buffer.getInt();
break;
case EncodingCodes.NULL:
return new Data(null);
default:
throw new ProtonException("Expected Binary type but found encoding: " + encodingCode);
}
if (size > buffer.remaining()) {
throw new IllegalArgumentException("Binary data size " + size + " is specified to be greater than the " +
"amount of data available ("+ buffer.remaining()+")");
}
byte[] data = new byte[size];
buffer.get(data, 0, size);
return new Data(new Binary(data));
}
@Override
public void write(Data data) {
WritableBuffer buffer = getEncoder().getBuffer();
buffer.put(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
buffer.put(EncodingCodes.SMALLULONG);
buffer.put(DESCRIPTOR_CODE);
getEncoder().writeBinary(data.getValue());
}
@Test
public void testCreateWithUnexpectedBodySectionTypeThrowsISE() throws Exception {
Message message = Message.Factory.create();
message.setBody(new Data(new Binary(new byte[0])));
try {
createReceivedStreamMessageFacade(createMockAmqpConsumer(), message);
fail("expected exception to be thrown");
} catch (IllegalStateException ise) {
// expected
}
}
void doMessageEncodingWithDataBodySectionTestImpl(int bytesLength)
{
byte[] bytes = generateByteArray(bytesLength);
byte[] expectedBytes = generateExpectedDataSectionBytes(bytes);
byte[] encodedBytes = new byte[expectedBytes.length];
Message msg = Message.Factory.create();
msg.setBody(new Data(new Binary(bytes)));
int encodedLength = msg.encode(encodedBytes, 0, encodedBytes.length);
assertArrayEquals("Encoded bytes do not match expectation", expectedBytes, encodedBytes);
assertEquals("Encoded length different than expected length", encodedLength, encodedBytes.length);
}