下面列出了怎么用org.springframework.messaging.PollableChannel的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void sendAndReceiveMessage() {
this.contextRunner
.withUserConfiguration(PollableConfiguration.class, CommonConfiguration.class)
.run((context) -> {
context.getBean("inputChannel", MessageChannel.class).send(
MessageBuilder.withPayload("I am a message (sendAndReceiveMessage).".getBytes()).build());
Message<?> message =
context.getBean("outputChannel", PollableChannel.class).receive(RECEIVE_TIMEOUT_MS);
assertThat(message).isNotNull();
assertThat(message.getPayload()).isInstanceOf(byte[].class);
String stringPayload = new String((byte[]) message.getPayload());
assertThat(stringPayload).isEqualTo("I am a message (sendAndReceiveMessage).");
});
}
@Test
@SuppressWarnings("deprecation")
public void sendAndReceiveMessageManualAckThroughAcknowledgementHeader() {
this.contextRunner
.withUserConfiguration(PollableConfiguration.class, CommonConfiguration.class)
.run((context) -> {
context.getBean(PubSubInboundChannelAdapter.class).setAckMode(AckMode.MANUAL);
context.getBean("inputChannel", MessageChannel.class).send(
MessageBuilder.withPayload("I am a message (sendAndReceiveMessageManualAckThroughAcknowledgementHeader).".getBytes()).build());
PollableChannel channel = context.getBean("outputChannel", PollableChannel.class);
Message<?> message = channel.receive(RECEIVE_TIMEOUT_MS);
assertThat(message).isNotNull();
AckReplyConsumer acker =
(AckReplyConsumer) message.getHeaders().get(GcpPubSubHeaders.ACKNOWLEDGEMENT);
assertThat(acker).isNotNull();
acker.ack();
message = channel.receive(RECEIVE_TIMEOUT_MS);
assertThat(message).isNull();
assertThat(this.outputCaptureRule.getOut()).contains("ACKNOWLEDGEMENT header is deprecated");
});
}
@Test
void receiveMessage_withoutTimeout_returnsTextMessage() throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue")
.withWaitTimeSeconds(0).withMaxNumberOfMessages(1)
.withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES)
.withMessageAttributeNames("All")))
.thenReturn(new ReceiveMessageResult().withMessages(Collections
.singleton(new com.amazonaws.services.sqs.model.Message()
.withBody("content"))));
PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,
"http://testQueue");
// Act
Message<?> receivedMessage = messageChannel.receive();
// Assert
assertThat(receivedMessage).isNotNull();
assertThat(receivedMessage.getPayload()).isEqualTo("content");
}
@Test
void receiveMessage_withSpecifiedTimeout_returnsTextMessage() throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue")
.withWaitTimeSeconds(2).withMaxNumberOfMessages(1)
.withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES)
.withMessageAttributeNames("All")))
.thenReturn(new ReceiveMessageResult().withMessages(Collections
.singleton(new com.amazonaws.services.sqs.model.Message()
.withBody("content"))));
PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,
"http://testQueue");
// Act
Message<?> receivedMessage = messageChannel.receive(2);
// Assert
assertThat(receivedMessage).isNotNull();
assertThat(receivedMessage.getPayload()).isEqualTo("content");
}
@Test
void receiveMessage_withSpecifiedTimeout_returnsNull() throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue")
.withWaitTimeSeconds(2).withMaxNumberOfMessages(1)
.withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES)
.withMessageAttributeNames("All"))).thenReturn(
new ReceiveMessageResult().withMessages(Collections.emptyList()));
PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,
"http://testQueue");
// Act
Message<?> receivedMessage = messageChannel.receive(2);
// Assert
assertThat(receivedMessage).isNull();
}
@Test
void receiveMessage_withoutDefaultTimeout_returnsNull() throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue")
.withWaitTimeSeconds(0).withMaxNumberOfMessages(1)
.withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES)
.withMessageAttributeNames("All"))).thenReturn(
new ReceiveMessageResult().withMessages(Collections.emptyList()));
PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,
"http://testQueue");
// Act
Message<?> receivedMessage = messageChannel.receive(0);
// Assert
assertThat(receivedMessage).isNull();
}
@Nullable
protected final Message<?> doReceive(MessageChannel channel, long timeout) {
Assert.notNull(channel, "MessageChannel is required");
Assert.state(channel instanceof PollableChannel, "A PollableChannel is required to receive messages");
Message<?> message = (timeout >= 0 ?
((PollableChannel) channel).receive(timeout) : ((PollableChannel) channel).receive());
if (message == null && logger.isTraceEnabled()) {
logger.trace("Failed to receive message from channel '" + channel + "' within timeout: " + timeout);
}
return message;
}
@Nullable
protected final Message<?> doReceive(MessageChannel channel, long timeout) {
Assert.notNull(channel, "MessageChannel is required");
Assert.state(channel instanceof PollableChannel, "A PollableChannel is required to receive messages");
Message<?> message = (timeout >= 0 ?
((PollableChannel) channel).receive(timeout) : ((PollableChannel) channel).receive());
if (message == null && logger.isTraceEnabled()) {
logger.trace("Failed to receive message from channel '" + channel + "' within timeout: " + timeout);
}
return message;
}
/**
*
*/
private SpringContextDelegate(String configName) {
this.configName = configName;
ClassLoader orig = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
if (logger.isDebugEnabled()) {
logger.debug("Using " + Thread.currentThread().getContextClassLoader()
+ " as context class loader while loading Spring Context '" + configName + "'.");
}
try {
this.applicationContext = new ClassPathXmlApplicationContext(configName);
if (this.applicationContext.containsBean(SpringNiFiConstants.FROM_NIFI)){
this.toSpringChannel = this.applicationContext.getBean(SpringNiFiConstants.FROM_NIFI, MessageChannel.class);
if (logger.isDebugEnabled()) {
logger.debug("Spring Application Context defined in '" + configName
+ "' is capable of receiving messages from NiFi since 'fromNiFi' channel was discovered.");
}
} else {
this.toSpringChannel = null;
}
if (this.applicationContext.containsBean(SpringNiFiConstants.TO_NIFI)){
this.fromSpringChannel = this.applicationContext.getBean(SpringNiFiConstants.TO_NIFI, PollableChannel.class);
if (logger.isDebugEnabled()) {
logger.debug("Spring Application Context defined in '" + configName
+ "' is capable of sending messages to " + "NiFi since 'toNiFi' channel was discovered.");
}
} else {
this.fromSpringChannel = null;
}
if (logger.isInfoEnabled() && this.toSpringChannel == null && this.fromSpringChannel == null){
logger.info("Spring Application Context is headless since neither 'fromNiFi' nor 'toNiFi' channels were defined. "
+ "No data will be exchanged.");
}
} finally {
Thread.currentThread().setContextClassLoader(orig);
}
}
@Test
public void sendAndReceiveMessageAsString() {
this.contextRunner
.withUserConfiguration(PollableConfiguration.class, CommonConfiguration.class)
.run((context) -> {
Map<String, Object> headers = new HashMap<>();
// Only String values for now..
headers.put("storm", "lift your skinny fists");
headers.put("static", "lift your skinny fists");
headers.put("sleep", "lift your skinny fists");
Message originalMessage = MessageBuilder.createMessage("I am a message (sendAndReceiveMessageAsString).".getBytes(),
new MessageHeaders(headers));
context.getBean("inputChannel", MessageChannel.class).send(originalMessage);
Message<?> message =
context.getBean("outputChannel", PollableChannel.class).receive(RECEIVE_TIMEOUT_MS);
assertThat(message).isNotNull();
assertThat(message.getPayload()).isInstanceOf(byte[].class);
String payload = new String((byte[]) message.getPayload());
assertThat(payload).isEqualTo("I am a message (sendAndReceiveMessageAsString).");
assertThat(message.getHeaders().size()).isEqualTo(6);
assertThat(message.getHeaders().get("storm")).isEqualTo("lift your skinny fists");
assertThat(message.getHeaders().get("static")).isEqualTo("lift your skinny fists");
assertThat(message.getHeaders().get("sleep")).isEqualTo("lift your skinny fists");
assertThat(message.getHeaders().get(GcpPubSubHeaders.ORIGINAL_MESSAGE)).isNotNull();
});
}
@Test
public void sendAndReceiveMessageManualAck() {
this.contextRunner
.withUserConfiguration(PollableConfiguration.class, CommonConfiguration.class)
.run((context) -> {
context.getBean(PubSubInboundChannelAdapter.class).setAckMode(AckMode.MANUAL);
context.getBean("inputChannel", MessageChannel.class).send(
MessageBuilder.withPayload("I am a message (sendAndReceiveMessageManualAck).".getBytes()).build());
PollableChannel channel = context.getBean("outputChannel", PollableChannel.class);
Message<?> message = channel.receive(RECEIVE_TIMEOUT_MS);
assertThat(message).isNotNull();
BasicAcknowledgeablePubsubMessage origMessage =
(BasicAcknowledgeablePubsubMessage) message.getHeaders().get(GcpPubSubHeaders.ORIGINAL_MESSAGE);
assertThat(origMessage).isNotNull();
origMessage.nack();
message = channel.receive(RECEIVE_TIMEOUT_MS);
assertThat(message).isNotNull();
origMessage = (BasicAcknowledgeablePubsubMessage)
message.getHeaders().get(GcpPubSubHeaders.ORIGINAL_MESSAGE);
assertThat(origMessage).isNotNull();
origMessage.ack();
message = channel.receive(RECEIVE_TIMEOUT_MS);
assertThat(message).isNull();
});
}
@Test
public void sendAndReceiveMessagePublishCallback() {
this.contextRunner
.withUserConfiguration(PollableConfiguration.class, CommonConfiguration.class)
.run((context) -> {
ListenableFutureCallback<String> callbackSpy = Mockito.spy(
new ListenableFutureCallback<String>() {
@Override
public void onFailure(Throwable ex) {
}
@Override
public void onSuccess(String result) {
}
});
context.getBean(PubSubMessageHandler.class).setPublishCallback(callbackSpy);
context.getBean("inputChannel", MessageChannel.class).send(
MessageBuilder.withPayload("I am a message (sendAndReceiveMessagePublishCallback).".getBytes()).build());
Message<?> message =
context.getBean("outputChannel", PollableChannel.class).receive(RECEIVE_TIMEOUT_MS);
assertThat(message).isNotNull();
Awaitility.await().atMost(1, TimeUnit.SECONDS)
.untilAsserted(() -> verify(callbackSpy, times(1)).onSuccess(any()));
});
}
@Bean
@Qualifier("partitionHandler")
public MessageChannelPartitionHandler partitionHandler(
@Qualifier("requestChannel") MessageChannel reqChannel,
@Qualifier("aggregatedReplyChannel") PollableChannel repChannel) {
MessageChannelPartitionHandler handler = new MessageChannelPartitionHandler();
handler.setGridSize(gridSize);
handler.setStepName("compositeSlaveStep");
handler.setReplyChannel(repChannel);
MessagingTemplate template = new MessagingTemplate();
template.setDefaultChannel(reqChannel);
template.setReceiveTimeout(partitionHandlerTimeout);
handler.setMessagingOperations(template);
return handler;
}
@Override
protected final Message<?> doReceive(MessageChannel channel) {
Assert.notNull(channel, "'channel' is required");
Assert.state(channel instanceof PollableChannel, "A PollableChannel is required to receive messages");
long timeout = this.receiveTimeout;
Message<?> message = (timeout >= 0 ?
((PollableChannel) channel).receive(timeout) : ((PollableChannel) channel).receive());
if (message == null && this.logger.isTraceEnabled()) {
this.logger.trace("Failed to receive message from channel '" + channel + "' within timeout: " + timeout);
}
return message;
}
@Override
public Message<?> receive(String destination, long timeout, TimeUnit timeUnit) {
try {
PollableChannel messageChannel = this.context.getBean(this.destinationResolver
.resolvedDestination(destination, DefaultChannels.INPUT),
PollableChannel.class);
return messageChannel.receive(timeUnit.toMillis(timeout));
}
catch (Exception e) {
log.error("Exception occurred while trying to read a message from "
+ " a channel with name [" + destination + "]", e);
throw new IllegalStateException(e);
}
}
@Override
public Message<?> receive(String destination, long timeout, TimeUnit timeUnit) {
try {
PollableChannel messageChannel = this.context.getBean(destination,
PollableChannel.class);
return messageChannel.receive(timeUnit.toMillis(timeout));
}
catch (Exception e) {
log.error("Exception occurred while trying to read a message from "
+ " a channel with name [" + destination + "]", e);
throw new IllegalStateException(e);
}
}
@Test
public void testSinkFromConsumer() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration
.getCompleteConfiguration(SinkFromConsumer.class))
.web(WebApplicationType.NONE)
.run("--spring.cloud.function.definition=sink",
"--spring.jmx.enabled=false")) {
InputDestination source = context.getBean(InputDestination.class);
PollableChannel result = context.getBean("result", PollableChannel.class);
source.send(new GenericMessage<byte[]>("John Doe".getBytes()));
assertThat(result.receive(10000).getPayload()).isEqualTo("John Doe");
}
}
@Bean
public Consumer<String> sink(PollableChannel result) {
return s -> {
result.send(new GenericMessage<String>(s));
System.out.println(s);
};
}
@Test
void receiveMessage_withMimeTypeMessageAttribute_shouldCopyToHeaders()
throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
MimeType mimeType = new MimeType("test", "plain", Charset.forName("UTF-8"));
when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue")
.withWaitTimeSeconds(0).withMaxNumberOfMessages(1)
.withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES)
.withMessageAttributeNames("All"))).thenReturn(new ReceiveMessageResult()
.withMessages(new com.amazonaws.services.sqs.model.Message()
.withBody("Hello")
.withMessageAttributes(Collections.singletonMap(
MessageHeaders.CONTENT_TYPE,
new MessageAttributeValue()
.withDataType(
MessageAttributeDataTypes.STRING)
.withStringValue(mimeType.toString())))));
PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,
"http://testQueue");
// Act
Message<?> receivedMessage = messageChannel.receive();
// Assert
assertThat(receivedMessage.getHeaders().get(MessageHeaders.CONTENT_TYPE))
.isEqualTo(mimeType);
}
@Test
void receiveMessage_withStringMessageHeader_shouldBeReceivedAsQueueMessageAttribute()
throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
String headerValue = "Header value";
String headerName = "MyHeader";
when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue")
.withWaitTimeSeconds(0).withMaxNumberOfMessages(1)
.withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES)
.withMessageAttributeNames("All")))
.thenReturn(new ReceiveMessageResult().withMessages(
new com.amazonaws.services.sqs.model.Message()
.withBody("Hello")
.withMessageAttributes(Collections.singletonMap(
headerName,
new MessageAttributeValue().withDataType(
MessageAttributeDataTypes.STRING)
.withStringValue(headerValue)))));
PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,
"http://testQueue");
// Act
Message<?> receivedMessage = messageChannel.receive();
// Assert
assertThat(receivedMessage.getHeaders().get(headerName)).isEqualTo(headerValue);
}
@Test
void receiveMessage_withIncompatibleNumericMessageHeader_shouldThrowAnException()
throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
HashMap<String, MessageAttributeValue> messageAttributes = new HashMap<>();
AtomicInteger atomicInteger = new AtomicInteger(17);
messageAttributes.put("atomicInteger",
new MessageAttributeValue()
.withDataType(MessageAttributeDataTypes.NUMBER
+ ".java.util.concurrent.atomic.AtomicInteger")
.withStringValue(String.valueOf(atomicInteger)));
when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue")
.withWaitTimeSeconds(0).withMaxNumberOfMessages(1)
.withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES)
.withMessageAttributeNames("All")))
.thenReturn(new ReceiveMessageResult().withMessages(
new com.amazonaws.services.sqs.model.Message()
.withBody("Hello")
.withMessageAttributes(messageAttributes)));
PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,
"http://testQueue");
// Assert
assertThatThrownBy(messageChannel::receive)
.isInstanceOf(IllegalArgumentException.class).hasMessageContaining(
"Cannot convert String [17] to target class [java.util.concurrent.atomic.AtomicInteger]");
}
@Test
void receiveMessage_withMissingNumericMessageHeaderTargetClass_shouldThrowAnException()
throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
HashMap<String, MessageAttributeValue> messageAttributes = new HashMap<>();
messageAttributes.put("classNotFound",
new MessageAttributeValue()
.withDataType(
MessageAttributeDataTypes.NUMBER + ".class.not.Found")
.withStringValue("12"));
when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue")
.withWaitTimeSeconds(0).withMaxNumberOfMessages(1)
.withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES)
.withMessageAttributeNames("All")))
.thenReturn(new ReceiveMessageResult().withMessages(
new com.amazonaws.services.sqs.model.Message()
.withBody("Hello")
.withMessageAttributes(messageAttributes)));
PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,
"http://testQueue");
// Assert
assertThatThrownBy(messageChannel::receive).isInstanceOf(MessagingException.class)
.hasMessageContaining(
"Message attribute with value '12' and data type 'Number.class.not.Found' could not be converted"
+ " into a Number because target class was not found.");
}
@Test
void receiveMessage_withBinaryMessageHeader_shouldBeReceivedAsByteBufferMessageAttribute()
throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
ByteBuffer headerValue = ByteBuffer.wrap("My binary data!".getBytes());
String headerName = "MyHeader";
when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue")
.withWaitTimeSeconds(0).withMaxNumberOfMessages(1)
.withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES)
.withMessageAttributeNames("All")))
.thenReturn(new ReceiveMessageResult().withMessages(
new com.amazonaws.services.sqs.model.Message()
.withBody("Hello")
.withMessageAttributes(Collections.singletonMap(
headerName,
new MessageAttributeValue().withDataType(
MessageAttributeDataTypes.BINARY)
.withBinaryValue(headerValue)))));
PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,
"http://testQueue");
// Act
Message<?> receivedMessage = messageChannel.receive();
// Assert
assertThat(receivedMessage.getHeaders().get(headerName)).isEqualTo(headerValue);
}
@Test
void receiveMessage_withIdOfTypeString_IdShouldBeConvertedToUuid() throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
UUID uuid = UUID.randomUUID();
when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue")
.withWaitTimeSeconds(0).withMaxNumberOfMessages(1)
.withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES)
.withMessageAttributeNames("All"))).thenReturn(new ReceiveMessageResult()
.withMessages(new com.amazonaws.services.sqs.model.Message()
.withBody("Hello")
.withMessageAttributes(Collections.singletonMap(
MessageHeaders.ID,
new MessageAttributeValue()
.withDataType(
MessageAttributeDataTypes.STRING)
.withStringValue(uuid.toString())))));
PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,
"http://testQueue");
// Act
Message<?> receivedMessage = messageChannel.receive();
// Assert
Object idMessageHeader = receivedMessage.getHeaders().get(MessageHeaders.ID);
assertThat(UUID.class.isInstance(idMessageHeader)).isTrue();
assertThat(idMessageHeader).isEqualTo(uuid);
}
/**
*
*/
private SpringContextDelegate(String configName) {
this.configName = configName;
ClassLoader orig = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
if (logger.isDebugEnabled()) {
logger.debug("Using " + Thread.currentThread().getContextClassLoader()
+ " as context class loader while loading Spring Context '" + configName + "'.");
}
try {
this.applicationContext = new ClassPathXmlApplicationContext(configName);
if (this.applicationContext.containsBean(SpringNiFiConstants.FROM_NIFI)){
this.toSpringChannel = this.applicationContext.getBean(SpringNiFiConstants.FROM_NIFI, MessageChannel.class);
if (logger.isDebugEnabled()) {
logger.debug("Spring Application Context defined in '" + configName
+ "' is capable of receiving messages from NiFi since 'fromNiFi' channel was discovered.");
}
} else {
this.toSpringChannel = null;
}
if (this.applicationContext.containsBean(SpringNiFiConstants.TO_NIFI)){
this.fromSpringChannel = this.applicationContext.getBean(SpringNiFiConstants.TO_NIFI, PollableChannel.class);
if (logger.isDebugEnabled()) {
logger.debug("Spring Application Context defined in '" + configName
+ "' is capable of sending messages to " + "NiFi since 'toNiFi' channel was discovered.");
}
} else {
this.fromSpringChannel = null;
}
if (logger.isInfoEnabled() && this.toSpringChannel == null && this.fromSpringChannel == null){
logger.info("Spring Application Context is headless since neither 'fromNiFi' nor 'toNiFi' channels were defined. "
+ "No data will be exchanged.");
}
} finally {
Thread.currentThread().setContextClassLoader(orig);
}
}
@Bean
public PollableChannel fromProcessorChannel() {
return new QueueChannel();
}
@Bean
public PollableChannel unsortedChannel() {
return new QueueChannel();
}
@Bean
public PollableChannel sortedChannel() {
return new QueueChannel();
}
@Bean
public PollableChannel result() {
return new QueueChannel();
}
@Test
void receiveMessage_withNumericMessageHeaders_shouldBeReceivedAsQueueMessageAttributes()
throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
HashMap<String, MessageAttributeValue> messageAttributes = new HashMap<>();
double doubleValue = 1234.56;
messageAttributes.put("double",
new MessageAttributeValue()
.withDataType(
MessageAttributeDataTypes.NUMBER + ".java.lang.Double")
.withStringValue(String.valueOf(doubleValue)));
long longValue = 1234L;
messageAttributes.put("long",
new MessageAttributeValue()
.withDataType(
MessageAttributeDataTypes.NUMBER + ".java.lang.Long")
.withStringValue(String.valueOf(longValue)));
int integerValue = 1234;
messageAttributes.put("integer",
new MessageAttributeValue()
.withDataType(
MessageAttributeDataTypes.NUMBER + ".java.lang.Integer")
.withStringValue(String.valueOf(integerValue)));
byte byteValue = 2;
messageAttributes.put("byte",
new MessageAttributeValue()
.withDataType(
MessageAttributeDataTypes.NUMBER + ".java.lang.Byte")
.withStringValue(String.valueOf(byteValue)));
short shortValue = 12;
messageAttributes.put("short",
new MessageAttributeValue()
.withDataType(
MessageAttributeDataTypes.NUMBER + ".java.lang.Short")
.withStringValue(String.valueOf(shortValue)));
float floatValue = 1234.56f;
messageAttributes.put("float",
new MessageAttributeValue()
.withDataType(
MessageAttributeDataTypes.NUMBER + ".java.lang.Float")
.withStringValue(String.valueOf(floatValue)));
BigInteger bigIntegerValue = new BigInteger("616416546156");
messageAttributes.put("bigInteger", new MessageAttributeValue()
.withDataType(MessageAttributeDataTypes.NUMBER + ".java.math.BigInteger")
.withStringValue(String.valueOf(bigIntegerValue)));
BigDecimal bigDecimalValue = new BigDecimal("7834938");
messageAttributes.put("bigDecimal", new MessageAttributeValue()
.withDataType(MessageAttributeDataTypes.NUMBER + ".java.math.BigDecimal")
.withStringValue(String.valueOf(bigDecimalValue)));
when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue")
.withWaitTimeSeconds(0).withMaxNumberOfMessages(1)
.withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES)
.withMessageAttributeNames("All")))
.thenReturn(new ReceiveMessageResult().withMessages(
new com.amazonaws.services.sqs.model.Message()
.withBody("Hello")
.withMessageAttributes(messageAttributes)));
PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,
"http://testQueue");
// Act
Message<?> receivedMessage = messageChannel.receive();
// Assert
assertThat(receivedMessage.getHeaders().get("double")).isEqualTo(doubleValue);
assertThat(receivedMessage.getHeaders().get("long")).isEqualTo(longValue);
assertThat(receivedMessage.getHeaders().get("integer")).isEqualTo(integerValue);
assertThat(receivedMessage.getHeaders().get("byte")).isEqualTo(byteValue);
assertThat(receivedMessage.getHeaders().get("short")).isEqualTo(shortValue);
assertThat(receivedMessage.getHeaders().get("float")).isEqualTo(floatValue);
assertThat(receivedMessage.getHeaders().get("bigInteger"))
.isEqualTo(bigIntegerValue);
assertThat(receivedMessage.getHeaders().get("bigDecimal"))
.isEqualTo(bigDecimalValue);
}