下面列出了javax.jms.MessageNotReadableException#org.apache.qpid.jms.message.JmsMessage 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Create request message
* @param component string representation component name
* @return created message
* @throws JMSException if unable to create message
*/
private Message createMessage(String component) throws JMSException {
logger.debug("Creating request message for: {}", component);
Message message = session.createObjectMessage();
message.setBooleanProperty("JMS_AMQP_TYPED_ENCODING", true);
message.setStringProperty("name", "self");
message.setStringProperty("operation", "QUERY");
message.setStringProperty("type", "org.amqp.management");
Binary requestFor = new Binary(("org.apache.qpid.dispatch." + component).getBytes());
((JmsMessage) message).getFacade().setProperty("entityType", requestFor);
HashMap<String,Object> map = new HashMap<>();
map.put("attributeNames", new ArrayList<>());
((ObjectMessage) message).setObject(map);
message.setJMSReplyTo(destination);
return message;
}
private ExternalMessageBuilder extractPayloadFromMessage(final JmsMessage message,
final ExternalMessageBuilder builder) throws JMSException {
if (message instanceof TextMessage) {
final String payload = ((TextMessage) message).getText();
builder.withText(payload);
} else if (message instanceof BytesMessage) {
final BytesMessage bytesMessage = (BytesMessage) message;
final long bodyLength = bytesMessage.getBodyLength();
if (bodyLength >= Integer.MIN_VALUE && bodyLength <= Integer.MAX_VALUE) {
final int length = (int) bodyLength;
final ByteBuffer byteBuffer = ByteBuffer.allocate(length);
bytesMessage.readBytes(byteBuffer.array());
builder.withBytes(byteBuffer);
} else {
throw new IllegalArgumentException("Message too large...");
}
} else {
if (log.isDebugEnabled()) {
final Destination destination = message.getJMSDestination();
final Map<String, String> headersMapFromJmsMessage = extractHeadersMapFromJmsMessage(message);
log.debug("Received message at '{}' of unsupported type ({}) with headers: {}",
destination, message.getClass().getName(), headersMapFromJmsMessage);
}
}
return builder;
}
private Message toJmsMessage(final ExternalMessage externalMessage) throws JMSException {
final Message message;
final Optional<String> optTextPayload = externalMessage.getTextPayload();
if (optTextPayload.isPresent()) {
message = session.createTextMessage(optTextPayload.get());
} else if (externalMessage.getBytePayload().isPresent()) {
final BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(externalMessage.getBytePayload().map(ByteBuffer::array).orElse(new byte[]{}));
message = bytesMessage;
} else {
message = session.createMessage();
}
JMSPropertyMapper.setPropertiesAndApplicationProperties(message, externalMessage.getHeaders(), log);
// wrap the message to prevent Qpid client from setting properties willy-nilly.
return JMSMessageWorkaround.wrap((JmsMessage) message);
}
@Override
protected void verifyPublishedMessage() throws Exception {
final ArgumentCaptor<JmsMessage> messageCaptor = ArgumentCaptor.forClass(JmsMessage.class);
verify(messageProducer, timeout(1000)).send(messageCaptor.capture(), any(CompletionListener.class));
final Message message = messageCaptor.getValue();
assertThat(message).isNotNull();
assertThat(message.getStringProperty("thing_id")).isEqualTo(TestConstants.Things.THING_ID.toString());
assertThat(message.getStringProperty("suffixed_thing_id")).isEqualTo(
TestConstants.Things.THING_ID + ".some.suffix");
assertThat(message.getStringProperty("prefixed_thing_id")).isEqualTo(
"some.prefix." + TestConstants.Things.THING_ID);
assertThat(message.getStringProperty("eclipse")).isEqualTo("ditto");
assertThat(message.getStringProperty("device_id"))
.isEqualTo(TestConstants.Things.THING_ID.toString());
}
private void prepareSession(final Session mockSession, final JmsMessageConsumer mockConsumer) throws JMSException {
doReturn(mockConsumer).when(mockSession).createConsumer(any(JmsQueue.class));
doAnswer((Answer<MessageProducer>) destinationInv -> {
final MessageProducer messageProducer = mock(MessageProducer.class);
doReturn(destinationInv.getArgument(0)).when(messageProducer).getDestination();
mockProducers.add(messageProducer);
return messageProducer;
}).when(mockSession).createProducer(any(Destination.class));
doAnswer((Answer<JmsMessage>) textMsgInv -> {
final String textMsg = textMsgInv.getArgument(0);
final AmqpJmsTextMessageFacade facade = new AmqpJmsTextMessageFacade();
facade.initialize(Mockito.mock(AmqpConnection.class));
final JmsTextMessage jmsTextMessage = new JmsTextMessage(facade);
jmsTextMessage.setText(textMsg);
return jmsTextMessage;
}).when(mockSession).createTextMessage(anyString());
}
private static void sendThingEventAndExpectPublish(final ActorRef amqpClientActor,
final Target target,
final Supplier<MessageProducer> messageProducerSupplier)
throws JMSException {
final String uuid = UUID.randomUUID().toString();
final ThingModifiedEvent thingModifiedEvent =
TestConstants.thingModified(Collections.emptyList(), Attributes.newBuilder().set("uuid", uuid).build())
.setDittoHeaders(DittoHeaders.newBuilder().putHeader("reply-to", target.getAddress()).build());
final OutboundSignal outboundSignal =
OutboundSignalFactory.newOutboundSignal(thingModifiedEvent, singletonList(target));
amqpClientActor.tell(outboundSignal, ActorRef.noSender());
final ArgumentCaptor<JmsMessage> messageCaptor = ArgumentCaptor.forClass(JmsMessage.class);
final MessageProducer messageProducer = messageProducerSupplier.get();
verify(messageProducer, timeout(2000).times(1))
.send(messageCaptor.capture(), any(CompletionListener.class));
final Message message = messageCaptor.getValue();
assertThat(message).isNotNull();
assertThat(message.getBody(String.class)).contains(uuid);
assertThat(message.getBody(String.class)).contains(
TestConstants.Things.NAMESPACE + "/" + TestConstants.Things.ID + "/" +
TopicPath.Group.THINGS.getName() + "/" + TopicPath.Channel.TWIN.getName() + "/" +
TopicPath.Criterion.EVENTS.getName() + "/" + TopicPath.Action.MODIFIED.getName());
}
public void doTestSendAppliesDeliveryModeWithMessageBody(Class<?> bodyType) throws JMSException {
JMSProducer producer = context.createProducer();
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(JMS_DESTINATION, "text");
JmsOutboundMessageDispatch envelope = remotePeer.getLastReceivedMessage();
assertNotNull(envelope);
JmsMessage message = envelope.getMessage();
assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(JMS_DESTINATION, "text");
envelope = remotePeer.getLastReceivedMessage();
assertNotNull(envelope);
message = envelope.getMessage();
assertEquals(DeliveryMode.NON_PERSISTENT, message.getJMSDeliveryMode());
}
private void doTestSendAppliesDisableMessageIDWithMessageBody(Class<?> bodyType) throws JMSException {
JMSProducer producer = context.createProducer();
producer.setDisableMessageID(true);
producer.send(JMS_DESTINATION, "text");
JmsOutboundMessageDispatch envelope = remotePeer.getLastReceivedMessage();
assertNotNull(envelope);
JmsMessage message = envelope.getMessage();
assertNull(message.getJMSMessageID());
producer.setDisableMessageID(false);
producer.send(JMS_DESTINATION, "text");
envelope = remotePeer.getLastReceivedMessage();
assertNotNull(envelope);
message = envelope.getMessage();
assertNotNull(message.getJMSMessageID());
}
private void doTestSendAppliesDisableTimestampWithMessageBody(Class<?> bodyType) throws JMSException {
JMSProducer producer = context.createProducer();
producer.setDisableMessageTimestamp(true);
producer.send(JMS_DESTINATION, "text");
JmsOutboundMessageDispatch envelope = remotePeer.getLastReceivedMessage();
assertNotNull(envelope);
JmsMessage message = envelope.getMessage();
assertTrue(message.getJMSTimestamp() == 0);
producer.setDisableMessageTimestamp(false);
producer.send(JMS_DESTINATION, "text");
envelope = remotePeer.getLastReceivedMessage();
assertNotNull(envelope);
message = envelope.getMessage();
assertFalse(message.getJMSTimestamp() == 0);
}
private void doTestSendAppliesPriorityWithMessageBody(Class<?> bodyType) throws JMSException {
JMSProducer producer = context.createProducer();
producer.setPriority(0);
producer.send(JMS_DESTINATION, "text");
JmsOutboundMessageDispatch envelope = remotePeer.getLastReceivedMessage();
assertNotNull(envelope);
JmsMessage message = envelope.getMessage();
assertEquals(0, message.getJMSPriority());
producer.setPriority(7);
producer.send(JMS_DESTINATION, "text");
envelope = remotePeer.getLastReceivedMessage();
assertNotNull(envelope);
message = envelope.getMessage();
assertEquals(7, message.getJMSPriority());
}
private void doTestSendAppliesTimeToLiveWithMessageBody(Class<?> bodyType) throws JMSException {
JMSProducer producer = context.createProducer();
producer.setTimeToLive(2000);
producer.send(JMS_DESTINATION, "text");
JmsOutboundMessageDispatch envelope = remotePeer.getLastReceivedMessage();
assertNotNull(envelope);
JmsMessage message = envelope.getMessage();
assertTrue(message.getJMSExpiration() > 0);
producer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
producer.send(JMS_DESTINATION, "text");
envelope = remotePeer.getLastReceivedMessage();
assertNotNull(envelope);
message = envelope.getMessage();
assertTrue(message.getJMSExpiration() == 0);
}
@Test
public void testBytesBodyIsApplied() throws JMSException {
JMSProducer producer = context.createProducer();
final byte[] bodyValue = new byte[] { 0, 1, 2, 3, 4 };
producer.send(JMS_DESTINATION, bodyValue);
JmsOutboundMessageDispatch envelope = remotePeer.getLastReceivedMessage();
assertNotNull(envelope);
JmsMessage message = envelope.getMessage();
byte[] payload = message.getBody(byte[].class);
assertNotNull(payload);
assertEquals(bodyValue.length, payload.length);
for (int i = 0; i < payload.length; ++i) {
assertEquals(bodyValue[i], payload[i]);
}
}
@Test
public void testPersistentSetFromMessageWithNonDefaultValue() throws Exception {
MessageImpl message = (MessageImpl) Message.Factory.create();
message.setDurable(true);
message.setBody(new AmqpValue("test"));
JmsMessage jmsMessage = AmqpCodec.decodeMessage(mockConsumer, encodeMessage(message)).asJmsMessage();
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsTextMessage.class, jmsMessage.getClass());
assertEquals(DeliveryMode.PERSISTENT, jmsMessage.getJMSDeliveryMode());
JmsMessageFacade facade = jmsMessage.getFacade();
assertNotNull("Facade should not be null", facade);
assertEquals("Unexpected facade class type", AmqpJmsTextMessageFacade.class, facade.getClass());
assertTrue(facade.isPersistent());
}
@Test
public void testMessagePrioritySetFromMessageWithNonDefaultValue() throws Exception {
MessageImpl message = (MessageImpl) Message.Factory.create();
message.setPriority((short) 8);
message.setBody(new AmqpValue("test"));
JmsMessage jmsMessage = AmqpCodec.decodeMessage(mockConsumer, encodeMessage(message)).asJmsMessage();
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsTextMessage.class, jmsMessage.getClass());
assertEquals(8, jmsMessage.getJMSPriority());
JmsMessageFacade facade = jmsMessage.getFacade();
assertNotNull("Facade should not be null", facade);
assertEquals("Unexpected facade class type", AmqpJmsTextMessageFacade.class, facade.getClass());
assertEquals(8, facade.getPriority());
}
@Test
public void testDeliveryCountSetFromMessageWithNonDefaultValue() throws Exception {
MessageImpl message = (MessageImpl) Message.Factory.create();
message.setDeliveryCount(2);
message.setBody(new AmqpValue("test"));
JmsMessage jmsMessage = AmqpCodec.decodeMessage(mockConsumer, encodeMessage(message)).asJmsMessage();
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsTextMessage.class, jmsMessage.getClass());
assertTrue(jmsMessage.getJMSRedelivered());
assertEquals("Unexpected facade class type", AmqpJmsTextMessageFacade.class, jmsMessage.getFacade().getClass());
AmqpJmsTextMessageFacade facade = (AmqpJmsTextMessageFacade) jmsMessage.getFacade();
assertNotNull("Facade should not be null", facade);
assertEquals(2, facade.getRedeliveryCount());
assertEquals(2, facade.getAmqpHeader().getDeliveryCount());
assertEquals(UnsignedInteger.valueOf(2), facade.getHeader().getDeliveryCount());
}
/**
* Test that a message with the {@value AmqpMessageSupport#JMS_MSG_TYPE}
* annotation set to {@value AmqpMessageSupport#JMS_MESSAGE} is
* treated as a generic {@link JmsMessage} with {@link AmqpJmsMessageFacade}
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateGenericMessageFromMessageTypeAnnotation() throws Exception {
Message message = Proton.message();
Map<Symbol, Object> map = new HashMap<Symbol, Object>();
map.put(AmqpMessageSupport.JMS_MSG_TYPE, AmqpMessageSupport.JMS_MESSAGE);
MessageAnnotations messageAnnotations = new MessageAnnotations(map);
message.setMessageAnnotations(messageAnnotations);
JmsMessage jmsMessage = AmqpCodec.decodeMessage(mockConsumer, encodeMessage(message)).asJmsMessage();
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsMessage.class, jmsMessage.getClass());
JmsMessageFacade facade = jmsMessage.getFacade();
assertNotNull("Facade should not be null", facade);
assertEquals("Unexpected facade class type", AmqpJmsMessageFacade.class, facade.getClass());
}
/**
* Test that a message with the {@value AmqpMessageSupport#JMS_MSG_TYPE}
* annotation set to {@value AmqpMessageSupport#JMS_BYTES_MESSAGE} is
* treated as a {@link JmsBytesMessage} with {@link AmqpJmsBytesMessageFacade}
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateBytesMessageFromMessageTypeAnnotation() throws Exception {
Message message = Proton.message();
Map<Symbol, Object> map = new HashMap<Symbol, Object>();
map.put(AmqpMessageSupport.JMS_MSG_TYPE, AmqpMessageSupport.JMS_BYTES_MESSAGE);
MessageAnnotations messageAnnotations = new MessageAnnotations(map);
message.setMessageAnnotations(messageAnnotations);
JmsMessage jmsMessage = AmqpCodec.decodeMessage(mockConsumer, encodeMessage(message)).asJmsMessage();
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsBytesMessage.class, jmsMessage.getClass());
JmsMessageFacade facade = jmsMessage.getFacade();
assertNotNull("Facade should not be null", facade);
assertEquals("Unexpected facade class type", AmqpJmsBytesMessageFacade.class, facade.getClass());
}
/**
* Test that a message with the {@value AmqpMessageSupport#JMS_MSG_TYPE}
* annotation set to {@value AmqpMessageSupport#JMS_BYTES_MESSAGE} is
* treated as a {@link JmsTextMessage} with {@link AmqpJmsTextMessageFacade}
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateTextMessageFromMessageTypeAnnotation() throws Exception {
Message message = Proton.message();
Map<Symbol, Object> map = new HashMap<Symbol, Object>();
map.put(AmqpMessageSupport.JMS_MSG_TYPE, AmqpMessageSupport.JMS_TEXT_MESSAGE);
MessageAnnotations messageAnnotations = new MessageAnnotations(map);
message.setMessageAnnotations(messageAnnotations);
JmsMessage jmsMessage = AmqpCodec.decodeMessage(mockConsumer, encodeMessage(message)).asJmsMessage();
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsTextMessage.class, jmsMessage.getClass());
JmsMessageFacade facade = jmsMessage.getFacade();
assertNotNull("Facade should not be null", facade);
assertEquals("Unexpected facade class type", AmqpJmsTextMessageFacade.class, facade.getClass());
}
/**
* Test that a message with the {@value AmqpMessageSupport#JMS_MSG_TYPE}
* annotation set to {@value AmqpMessageSupport#JMS_STREAM_MESSAGE} is
* treated as a {@link JmsStreamMessage} with {@link AmqpJmsStreamMessageFacade}
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateStreamMessageFromMessageTypeAnnotation() throws Exception {
Message message = Proton.message();
Map<Symbol, Object> map = new HashMap<Symbol, Object>();
map.put(AmqpMessageSupport.JMS_MSG_TYPE, AmqpMessageSupport.JMS_STREAM_MESSAGE);
MessageAnnotations messageAnnotations = new MessageAnnotations(map);
message.setMessageAnnotations(messageAnnotations);
JmsMessage jmsMessage = AmqpCodec.decodeMessage(mockConsumer, encodeMessage(message)).asJmsMessage();
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsStreamMessage.class, jmsMessage.getClass());
JmsMessageFacade facade = jmsMessage.getFacade();
assertNotNull("Facade should not be null", facade);
assertEquals("Unexpected facade class type", AmqpJmsStreamMessageFacade.class, facade.getClass());
}
/**
* Test that a message with no body section, but with the content type set to
* {@value AmqpMessageSupport#SERIALIZED_JAVA_OBJECT_CONTENT_TYPE} results in an ObjectMessage
* when not otherwise annotated to indicate the type of JMS message it is.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateObjectMessageFromNoBodySectionAndContentType() throws Exception {
Message message = Proton.message();
message.setContentType(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString());
JmsMessage jmsMessage = AmqpCodec.decodeMessage(mockConsumer, encodeMessage(message)).asJmsMessage();
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsObjectMessage.class, jmsMessage.getClass());
JmsMessageFacade facade = jmsMessage.getFacade();
assertNotNull("Facade should not be null", facade);
assertEquals("Unexpected facade class type", AmqpJmsObjectMessageFacade.class, facade.getClass());
AmqpObjectTypeDelegate delegate = ((AmqpJmsObjectMessageFacade) facade).getDelegate();
assertTrue("Unexpected delegate type: " + delegate, delegate instanceof AmqpSerializedObjectDelegate);
}
/**
* Test that a data body containing nothing, but with the content type set to
* {@value AmqpMessageSupport#OCTET_STREAM_CONTENT_TYPE} results in a BytesMessage when not
* otherwise annotated to indicate the type of JMS message it is.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateBytesMessageFromDataWithEmptyBinaryAndContentType() throws Exception {
Message message = Proton.message();
Binary binary = new Binary(new byte[0]);
message.setBody(new Data(binary));
message.setContentType(AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE.toString());
JmsMessage jmsMessage = AmqpCodec.decodeMessage(mockConsumer, encodeMessage(message)).asJmsMessage();
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsBytesMessage.class, jmsMessage.getClass());
JmsMessageFacade facade = jmsMessage.getFacade();
assertNotNull("Facade should not be null", facade);
assertEquals("Unexpected facade class type", AmqpJmsBytesMessageFacade.class, facade.getClass());
}
/**
* Test that a receiving a data body containing nothing and no content type being set
* results in a BytesMessage when not otherwise annotated to indicate the type of
* JMS message it is.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateBytesMessageFromDataWithEmptyBinaryAndNoContentType() throws Exception {
Message message = Proton.message();
Binary binary = new Binary(new byte[0]);
message.setBody(new Data(binary));
assertNull(message.getContentType());
JmsMessage jmsMessage = AmqpCodec.decodeMessage(mockConsumer, encodeMessage(message)).asJmsMessage();
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsBytesMessage.class, jmsMessage.getClass());
JmsMessageFacade facade = jmsMessage.getFacade();
assertNotNull("Facade should not be null", facade);
assertEquals("Unexpected facade class type", AmqpJmsBytesMessageFacade.class, facade.getClass());
}
/**
* Test that receiving a data body containing nothing, but with the content type set to
* {@value AmqpMessageSupport#SERIALIZED_JAVA_OBJECT_CONTENT_TYPE} results in an ObjectMessage
* when not otherwise annotated to indicate the type of JMS message it is.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateObjectMessageFromDataWithContentTypeAndEmptyBinary() throws Exception {
Message message = Proton.message();
Binary binary = new Binary(new byte[0]);
message.setBody(new Data(binary));
message.setContentType(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString());
JmsMessage jmsMessage = AmqpCodec.decodeMessage(mockConsumer, encodeMessage(message)).asJmsMessage();
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsObjectMessage.class, jmsMessage.getClass());
JmsMessageFacade facade = jmsMessage.getFacade();
assertNotNull("Facade should not be null", facade);
assertEquals("Unexpected facade class type", AmqpJmsObjectMessageFacade.class, facade.getClass());
AmqpObjectTypeDelegate delegate = ((AmqpJmsObjectMessageFacade) facade).getDelegate();
assertTrue("Unexpected delegate type: " + delegate, delegate instanceof AmqpSerializedObjectDelegate);
}
private void doCreateTextMessageFromDataWithContentTypeTestImpl(String contentType, Charset expectedCharset) throws IOException {
Message message = Proton.message();
Binary binary = new Binary(new byte[0]);
message.setBody(new Data(binary));
message.setContentType(contentType);
JmsMessage jmsMessage = AmqpCodec.decodeMessage(mockConsumer, encodeMessage(message)).asJmsMessage();
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsTextMessage.class, jmsMessage.getClass());
JmsMessageFacade facade = jmsMessage.getFacade();
assertNotNull("Facade should not be null", facade);
assertEquals("Unexpected facade class type", AmqpJmsTextMessageFacade.class, facade.getClass());
AmqpJmsTextMessageFacade textFacade = (AmqpJmsTextMessageFacade) facade;
assertEquals("Unexpected character set", expectedCharset, textFacade.getCharset());
}
/**
* Test that an amqp-value body containing a map results in an ObjectMessage
* when not otherwise annotated to indicate the type of JMS message it is.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateAmqpObjectMessageFromAmqpValueWithMap() throws Exception {
Message message = Proton.message();
Map<String, String> map = new HashMap<String,String>();
message.setBody(new AmqpValue(map));
JmsMessage jmsMessage = AmqpCodec.decodeMessage(mockConsumer, encodeMessage(message)).asJmsMessage();
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsObjectMessage.class, jmsMessage.getClass());
JmsMessageFacade facade = jmsMessage.getFacade();
assertNotNull("Facade should not be null", facade);
assertEquals("Unexpected facade class type", AmqpJmsObjectMessageFacade.class, facade.getClass());
AmqpObjectTypeDelegate delegate = ((AmqpJmsObjectMessageFacade) facade).getDelegate();
assertTrue("Unexpected delegate type: " + delegate, delegate instanceof AmqpTypedObjectDelegate);
}
/**
* Test that an amqp-value body containing a list results in an ObjectMessage
* when not otherwise annotated to indicate the type of JMS message it is.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateAmqpObjectMessageFromAmqpValueWithList() throws Exception {
Message message = Proton.message();
List<String> list = new ArrayList<String>();
message.setBody(new AmqpValue(list));
JmsMessage jmsMessage = AmqpCodec.decodeMessage(mockConsumer, encodeMessage(message)).asJmsMessage();
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsObjectMessage.class, jmsMessage.getClass());
JmsMessageFacade facade = jmsMessage.getFacade();
assertNotNull("Facade should not be null", facade);
assertEquals("Unexpected facade class type", AmqpJmsObjectMessageFacade.class, facade.getClass());
AmqpObjectTypeDelegate delegate = ((AmqpJmsObjectMessageFacade) facade).getDelegate();
assertTrue("Unexpected delegate type: " + delegate, delegate instanceof AmqpTypedObjectDelegate);
}
/**
* Test that an amqp-value body containing a value which can't be categorised results in
* an ObjectMessage when not otherwise annotated to indicate the type of JMS message it is.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateObjectMessageFromAmqpValueWithUncategorisedContent() throws Exception {
Message message = Proton.message();
message.setBody(new AmqpValue(UUID.randomUUID()));
JmsMessage jmsMessage = AmqpCodec.decodeMessage(mockConsumer, encodeMessage(message)).asJmsMessage();
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsObjectMessage.class, jmsMessage.getClass());
JmsMessageFacade facade = jmsMessage.getFacade();
assertNotNull("Facade should not be null", facade);
assertEquals("Unexpected facade class type", AmqpJmsObjectMessageFacade.class, facade.getClass());
AmqpObjectTypeDelegate delegate = ((AmqpJmsObjectMessageFacade) facade).getDelegate();
assertTrue("Unexpected delegate type: " + delegate, delegate instanceof AmqpTypedObjectDelegate);
}
/**
* Test that an amqp-sequence body containing a binary value results in an ObjectMessage
* when not otherwise annotated to indicate the type of JMS message it is.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateObjectMessageMessageFromAmqpSequence() throws Exception {
Message message = Proton.message();
List<String> list = new ArrayList<String>();
message.setBody(new AmqpSequence(list));
JmsMessage jmsMessage = AmqpCodec.decodeMessage(mockConsumer, encodeMessage(message)).asJmsMessage();
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsObjectMessage.class, jmsMessage.getClass());
JmsMessageFacade facade = jmsMessage.getFacade();
assertNotNull("Facade should not be null", facade);
assertEquals("Unexpected facade class type", AmqpJmsObjectMessageFacade.class, facade.getClass());
AmqpObjectTypeDelegate delegate = ((AmqpJmsObjectMessageFacade) facade).getDelegate();
assertTrue("Unexpected delegate type: " + delegate, delegate instanceof AmqpTypedObjectDelegate);
}
private JMSMessageWorkaround(final JmsMessage message) {
super(message.getFacade());
// ignore everything; delegate all public methods to message.
this.message = message;
// copy message anyway because "getBody" is final (why?)
copy(message);
}
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(RestartMessageConsumer.class, this::handleRestartMessageConsumer)
.match(JmsMessage.class, this::handleJmsMessage)
.match(ResourceStatus.class, this::handleAddressStatus)
.match(RetrieveAddressStatus.class, ras -> getSender().tell(getCurrentSourceStatus(), getSelf()))
.match(ConsumerClosedStatusReport.class, this::matchesOwnConsumer, this::handleConsumerClosed)
.match(CreateMessageConsumerResponse.class, this::messageConsumerCreated)
.match(Status.Failure.class, this::messageConsumerFailed)
.matchAny(m -> {
log.warning("Unknown message: {}", m);
unhandled(m);
}).build();
}