类org.springframework.messaging.PollableChannel源码实例Demo

下面列出了怎么用org.springframework.messaging.PollableChannel的API类实例代码及写法,或者点击链接到github查看源代码。

@Test
public void sendAndReceiveMessage() {
	this.contextRunner
			.withUserConfiguration(PollableConfiguration.class, CommonConfiguration.class)
			.run((context) -> {
			context.getBean("inputChannel", MessageChannel.class).send(
					MessageBuilder.withPayload("I am a message (sendAndReceiveMessage).".getBytes()).build());

			Message<?> message =
					context.getBean("outputChannel", PollableChannel.class).receive(RECEIVE_TIMEOUT_MS);
			assertThat(message).isNotNull();
			assertThat(message.getPayload()).isInstanceOf(byte[].class);
			String stringPayload = new String((byte[]) message.getPayload());
			assertThat(stringPayload).isEqualTo("I am a message (sendAndReceiveMessage).");
	});
}
 
@Test
@SuppressWarnings("deprecation")
public void sendAndReceiveMessageManualAckThroughAcknowledgementHeader() {
	this.contextRunner
			.withUserConfiguration(PollableConfiguration.class, CommonConfiguration.class)
			.run((context) -> {
			context.getBean(PubSubInboundChannelAdapter.class).setAckMode(AckMode.MANUAL);
			context.getBean("inputChannel", MessageChannel.class).send(
					MessageBuilder.withPayload("I am a message (sendAndReceiveMessageManualAckThroughAcknowledgementHeader).".getBytes()).build());

			PollableChannel channel = context.getBean("outputChannel", PollableChannel.class);

			Message<?> message = channel.receive(RECEIVE_TIMEOUT_MS);
			assertThat(message).isNotNull();
			AckReplyConsumer acker =
					(AckReplyConsumer) message.getHeaders().get(GcpPubSubHeaders.ACKNOWLEDGEMENT);
			assertThat(acker).isNotNull();
			acker.ack();

			message = channel.receive(RECEIVE_TIMEOUT_MS);
			assertThat(message).isNull();

			assertThat(this.outputCaptureRule.getOut()).contains("ACKNOWLEDGEMENT header is deprecated");
	});
}
 
@Test
void receiveMessage_withoutTimeout_returnsTextMessage() throws Exception {
	// Arrange
	AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
	when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue")
			.withWaitTimeSeconds(0).withMaxNumberOfMessages(1)
			.withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES)
			.withMessageAttributeNames("All")))
					.thenReturn(new ReceiveMessageResult().withMessages(Collections
							.singleton(new com.amazonaws.services.sqs.model.Message()
									.withBody("content"))));

	PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,
			"http://testQueue");

	// Act
	Message<?> receivedMessage = messageChannel.receive();

	// Assert
	assertThat(receivedMessage).isNotNull();
	assertThat(receivedMessage.getPayload()).isEqualTo("content");
}
 
@Test
void receiveMessage_withSpecifiedTimeout_returnsTextMessage() throws Exception {
	// Arrange
	AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
	when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue")
			.withWaitTimeSeconds(2).withMaxNumberOfMessages(1)
			.withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES)
			.withMessageAttributeNames("All")))
					.thenReturn(new ReceiveMessageResult().withMessages(Collections
							.singleton(new com.amazonaws.services.sqs.model.Message()
									.withBody("content"))));

	PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,
			"http://testQueue");

	// Act
	Message<?> receivedMessage = messageChannel.receive(2);

	// Assert
	assertThat(receivedMessage).isNotNull();
	assertThat(receivedMessage.getPayload()).isEqualTo("content");
}
 
@Test
void receiveMessage_withSpecifiedTimeout_returnsNull() throws Exception {
	// Arrange
	AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
	when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue")
			.withWaitTimeSeconds(2).withMaxNumberOfMessages(1)
			.withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES)
			.withMessageAttributeNames("All"))).thenReturn(
					new ReceiveMessageResult().withMessages(Collections.emptyList()));

	PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,
			"http://testQueue");

	// Act
	Message<?> receivedMessage = messageChannel.receive(2);

	// Assert
	assertThat(receivedMessage).isNull();
}
 
@Test
void receiveMessage_withoutDefaultTimeout_returnsNull() throws Exception {
	// Arrange
	AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
	when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue")
			.withWaitTimeSeconds(0).withMaxNumberOfMessages(1)
			.withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES)
			.withMessageAttributeNames("All"))).thenReturn(
					new ReceiveMessageResult().withMessages(Collections.emptyList()));

	PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,
			"http://testQueue");

	// Act
	Message<?> receivedMessage = messageChannel.receive(0);

	// Assert
	assertThat(receivedMessage).isNull();
}
 
@Nullable
protected final Message<?> doReceive(MessageChannel channel, long timeout) {
	Assert.notNull(channel, "MessageChannel is required");
	Assert.state(channel instanceof PollableChannel, "A PollableChannel is required to receive messages");

	Message<?> message = (timeout >= 0 ?
			((PollableChannel) channel).receive(timeout) : ((PollableChannel) channel).receive());

	if (message == null && logger.isTraceEnabled()) {
		logger.trace("Failed to receive message from channel '" + channel + "' within timeout: " + timeout);
	}

	return message;
}
 
@Nullable
protected final Message<?> doReceive(MessageChannel channel, long timeout) {
	Assert.notNull(channel, "MessageChannel is required");
	Assert.state(channel instanceof PollableChannel, "A PollableChannel is required to receive messages");

	Message<?> message = (timeout >= 0 ?
			((PollableChannel) channel).receive(timeout) : ((PollableChannel) channel).receive());

	if (message == null && logger.isTraceEnabled()) {
		logger.trace("Failed to receive message from channel '" + channel + "' within timeout: " + timeout);
	}

	return message;
}
 
源代码9 项目: localization_nifi   文件: SpringContextDelegate.java
/**
 *
 */
private SpringContextDelegate(String configName) {
    this.configName = configName;
    ClassLoader orig = Thread.currentThread().getContextClassLoader();
    Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
    if (logger.isDebugEnabled()) {
        logger.debug("Using " + Thread.currentThread().getContextClassLoader()
                + " as context class loader while loading Spring Context '" + configName + "'.");
    }
    try {
        this.applicationContext = new ClassPathXmlApplicationContext(configName);
        if (this.applicationContext.containsBean(SpringNiFiConstants.FROM_NIFI)){
            this.toSpringChannel = this.applicationContext.getBean(SpringNiFiConstants.FROM_NIFI, MessageChannel.class);
            if (logger.isDebugEnabled()) {
                logger.debug("Spring Application Context defined in '" + configName
                        + "' is capable of receiving messages from NiFi since 'fromNiFi' channel was discovered.");
            }
        } else {
            this.toSpringChannel = null;
        }
        if (this.applicationContext.containsBean(SpringNiFiConstants.TO_NIFI)){
            this.fromSpringChannel = this.applicationContext.getBean(SpringNiFiConstants.TO_NIFI, PollableChannel.class);
            if (logger.isDebugEnabled()) {
                logger.debug("Spring Application Context defined in '" + configName
                        + "' is capable of sending messages to " + "NiFi since 'toNiFi' channel was discovered.");
            }
        } else {
            this.fromSpringChannel = null;
        }
        if (logger.isInfoEnabled() && this.toSpringChannel == null && this.fromSpringChannel == null){
            logger.info("Spring Application Context is headless since neither 'fromNiFi' nor 'toNiFi' channels were defined. "
                    + "No data will be exchanged.");
        }
    } finally {
        Thread.currentThread().setContextClassLoader(orig);
    }
}
 
@Test
public void sendAndReceiveMessageAsString() {
	this.contextRunner
			.withUserConfiguration(PollableConfiguration.class, CommonConfiguration.class)
			.run((context) -> {
		Map<String, Object> headers = new HashMap<>();
		// Only String values for now..
		headers.put("storm", "lift your skinny fists");
		headers.put("static", "lift your skinny fists");
		headers.put("sleep", "lift your skinny fists");

		Message originalMessage = MessageBuilder.createMessage("I am a message (sendAndReceiveMessageAsString).".getBytes(),
				new MessageHeaders(headers));
		context.getBean("inputChannel", MessageChannel.class).send(originalMessage);

		Message<?> message =
				context.getBean("outputChannel", PollableChannel.class).receive(RECEIVE_TIMEOUT_MS);
		assertThat(message).isNotNull();
		assertThat(message.getPayload()).isInstanceOf(byte[].class);
		String payload = new String((byte[]) message.getPayload());
		assertThat(payload).isEqualTo("I am a message (sendAndReceiveMessageAsString).");

		assertThat(message.getHeaders().size()).isEqualTo(6);
		assertThat(message.getHeaders().get("storm")).isEqualTo("lift your skinny fists");
		assertThat(message.getHeaders().get("static")).isEqualTo("lift your skinny fists");
		assertThat(message.getHeaders().get("sleep")).isEqualTo("lift your skinny fists");
		assertThat(message.getHeaders().get(GcpPubSubHeaders.ORIGINAL_MESSAGE)).isNotNull();
	});
}
 
@Test
public void sendAndReceiveMessageManualAck() {
	this.contextRunner
			.withUserConfiguration(PollableConfiguration.class, CommonConfiguration.class)
			.run((context) -> {

			context.getBean(PubSubInboundChannelAdapter.class).setAckMode(AckMode.MANUAL);
			context.getBean("inputChannel", MessageChannel.class).send(
					MessageBuilder.withPayload("I am a message (sendAndReceiveMessageManualAck).".getBytes()).build());

			PollableChannel channel = context.getBean("outputChannel", PollableChannel.class);

			Message<?> message = channel.receive(RECEIVE_TIMEOUT_MS);
			assertThat(message).isNotNull();
			BasicAcknowledgeablePubsubMessage origMessage =
					(BasicAcknowledgeablePubsubMessage) message.getHeaders().get(GcpPubSubHeaders.ORIGINAL_MESSAGE);
			assertThat(origMessage).isNotNull();
			origMessage.nack();

			message = channel.receive(RECEIVE_TIMEOUT_MS);
			assertThat(message).isNotNull();
			origMessage = (BasicAcknowledgeablePubsubMessage)
					message.getHeaders().get(GcpPubSubHeaders.ORIGINAL_MESSAGE);
			assertThat(origMessage).isNotNull();
			origMessage.ack();

			message = channel.receive(RECEIVE_TIMEOUT_MS);
			assertThat(message).isNull();
	});
}
 
@Test
public void sendAndReceiveMessagePublishCallback() {
	this.contextRunner
			.withUserConfiguration(PollableConfiguration.class, CommonConfiguration.class)
			.run((context) -> {
				ListenableFutureCallback<String> callbackSpy = Mockito.spy(
					new ListenableFutureCallback<String>() {
						@Override
						public void onFailure(Throwable ex) {

						}

						@Override
						public void onSuccess(String result) {

						}
					});
			context.getBean(PubSubMessageHandler.class).setPublishCallback(callbackSpy);
			context.getBean("inputChannel", MessageChannel.class).send(
					MessageBuilder.withPayload("I am a message (sendAndReceiveMessagePublishCallback).".getBytes()).build());

			Message<?> message =
					context.getBean("outputChannel", PollableChannel.class).receive(RECEIVE_TIMEOUT_MS);
			assertThat(message).isNotNull();
			Awaitility.await().atMost(1, TimeUnit.SECONDS)
					.untilAsserted(() -> verify(callbackSpy, times(1)).onSuccess(any()));
	});
}
 
源代码13 项目: CogStack-Pipeline   文件: RemoteConfiguration.java
@Bean
@Qualifier("partitionHandler")
public MessageChannelPartitionHandler partitionHandler(
        @Qualifier("requestChannel") MessageChannel reqChannel,
        @Qualifier("aggregatedReplyChannel") PollableChannel repChannel) {
    MessageChannelPartitionHandler handler = new MessageChannelPartitionHandler();
    handler.setGridSize(gridSize);
    handler.setStepName("compositeSlaveStep");
    handler.setReplyChannel(repChannel);
    MessagingTemplate template = new MessagingTemplate();
    template.setDefaultChannel(reqChannel);
    template.setReceiveTimeout(partitionHandlerTimeout);
    handler.setMessagingOperations(template);
    return handler;
}
 
@Override
protected final Message<?> doReceive(MessageChannel channel) {
	Assert.notNull(channel, "'channel' is required");
	Assert.state(channel instanceof PollableChannel, "A PollableChannel is required to receive messages");

	long timeout = this.receiveTimeout;
	Message<?> message = (timeout >= 0 ?
			((PollableChannel) channel).receive(timeout) : ((PollableChannel) channel).receive());

	if (message == null && this.logger.isTraceEnabled()) {
		this.logger.trace("Failed to receive message from channel '" + channel + "' within timeout: " + timeout);
	}

	return message;
}
 
@Override
public Message<?> receive(String destination, long timeout, TimeUnit timeUnit) {
	try {
		PollableChannel messageChannel = this.context.getBean(this.destinationResolver
				.resolvedDestination(destination, DefaultChannels.INPUT),
				PollableChannel.class);
		return messageChannel.receive(timeUnit.toMillis(timeout));
	}
	catch (Exception e) {
		log.error("Exception occurred while trying to read a message from "
				+ " a channel with name [" + destination + "]", e);
		throw new IllegalStateException(e);
	}
}
 
@Override
public Message<?> receive(String destination, long timeout, TimeUnit timeUnit) {
	try {
		PollableChannel messageChannel = this.context.getBean(destination,
				PollableChannel.class);
		return messageChannel.receive(timeUnit.toMillis(timeout));
	}
	catch (Exception e) {
		log.error("Exception occurred while trying to read a message from "
				+ " a channel with name [" + destination + "]", e);
		throw new IllegalStateException(e);
	}
}
 
@Test
public void testSinkFromConsumer() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration
					.getCompleteConfiguration(SinkFromConsumer.class))
							.web(WebApplicationType.NONE)
							.run("--spring.cloud.function.definition=sink",
									"--spring.jmx.enabled=false")) {

		InputDestination source = context.getBean(InputDestination.class);
		PollableChannel result = context.getBean("result", PollableChannel.class);
		source.send(new GenericMessage<byte[]>("John Doe".getBytes()));
		assertThat(result.receive(10000).getPayload()).isEqualTo("John Doe");
	}
}
 
@Bean
public Consumer<String> sink(PollableChannel result) {
	return s -> {
		result.send(new GenericMessage<String>(s));
		System.out.println(s);
	};
}
 
源代码19 项目: spring-cloud-aws   文件: QueueMessageChannelTest.java
@Test
void receiveMessage_withMimeTypeMessageAttribute_shouldCopyToHeaders()
		throws Exception {
	// Arrange
	AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
	MimeType mimeType = new MimeType("test", "plain", Charset.forName("UTF-8"));
	when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue")
			.withWaitTimeSeconds(0).withMaxNumberOfMessages(1)
			.withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES)
			.withMessageAttributeNames("All"))).thenReturn(new ReceiveMessageResult()
					.withMessages(new com.amazonaws.services.sqs.model.Message()
							.withBody("Hello")
							.withMessageAttributes(Collections.singletonMap(
									MessageHeaders.CONTENT_TYPE,
									new MessageAttributeValue()
											.withDataType(
													MessageAttributeDataTypes.STRING)
											.withStringValue(mimeType.toString())))));

	PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,
			"http://testQueue");

	// Act
	Message<?> receivedMessage = messageChannel.receive();

	// Assert
	assertThat(receivedMessage.getHeaders().get(MessageHeaders.CONTENT_TYPE))
			.isEqualTo(mimeType);
}
 
源代码20 项目: spring-cloud-aws   文件: QueueMessageChannelTest.java
@Test
void receiveMessage_withStringMessageHeader_shouldBeReceivedAsQueueMessageAttribute()
		throws Exception {
	// Arrange
	AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
	String headerValue = "Header value";
	String headerName = "MyHeader";
	when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue")
			.withWaitTimeSeconds(0).withMaxNumberOfMessages(1)
			.withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES)
			.withMessageAttributeNames("All")))
					.thenReturn(new ReceiveMessageResult().withMessages(
							new com.amazonaws.services.sqs.model.Message()
									.withBody("Hello")
									.withMessageAttributes(Collections.singletonMap(
											headerName,
											new MessageAttributeValue().withDataType(
													MessageAttributeDataTypes.STRING)
													.withStringValue(headerValue)))));

	PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,
			"http://testQueue");

	// Act
	Message<?> receivedMessage = messageChannel.receive();

	// Assert
	assertThat(receivedMessage.getHeaders().get(headerName)).isEqualTo(headerValue);
}
 
源代码21 项目: spring-cloud-aws   文件: QueueMessageChannelTest.java
@Test
void receiveMessage_withIncompatibleNumericMessageHeader_shouldThrowAnException()
		throws Exception {
	// Arrange
	AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);

	HashMap<String, MessageAttributeValue> messageAttributes = new HashMap<>();
	AtomicInteger atomicInteger = new AtomicInteger(17);
	messageAttributes.put("atomicInteger",
			new MessageAttributeValue()
					.withDataType(MessageAttributeDataTypes.NUMBER
							+ ".java.util.concurrent.atomic.AtomicInteger")
					.withStringValue(String.valueOf(atomicInteger)));

	when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue")
			.withWaitTimeSeconds(0).withMaxNumberOfMessages(1)
			.withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES)
			.withMessageAttributeNames("All")))
					.thenReturn(new ReceiveMessageResult().withMessages(
							new com.amazonaws.services.sqs.model.Message()
									.withBody("Hello")
									.withMessageAttributes(messageAttributes)));

	PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,
			"http://testQueue");

	// Assert
	assertThatThrownBy(messageChannel::receive)
			.isInstanceOf(IllegalArgumentException.class).hasMessageContaining(
					"Cannot convert String [17] to target class [java.util.concurrent.atomic.AtomicInteger]");
}
 
源代码22 项目: spring-cloud-aws   文件: QueueMessageChannelTest.java
@Test
void receiveMessage_withMissingNumericMessageHeaderTargetClass_shouldThrowAnException()
		throws Exception {
	// Arrange
	AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);

	HashMap<String, MessageAttributeValue> messageAttributes = new HashMap<>();
	messageAttributes.put("classNotFound",
			new MessageAttributeValue()
					.withDataType(
							MessageAttributeDataTypes.NUMBER + ".class.not.Found")
					.withStringValue("12"));

	when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue")
			.withWaitTimeSeconds(0).withMaxNumberOfMessages(1)
			.withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES)
			.withMessageAttributeNames("All")))
					.thenReturn(new ReceiveMessageResult().withMessages(
							new com.amazonaws.services.sqs.model.Message()
									.withBody("Hello")
									.withMessageAttributes(messageAttributes)));

	PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,
			"http://testQueue");

	// Assert
	assertThatThrownBy(messageChannel::receive).isInstanceOf(MessagingException.class)
			.hasMessageContaining(
					"Message attribute with value '12' and data type 'Number.class.not.Found' could not be converted"
							+ " into a Number because target class was not found.");
}
 
源代码23 项目: spring-cloud-aws   文件: QueueMessageChannelTest.java
@Test
void receiveMessage_withBinaryMessageHeader_shouldBeReceivedAsByteBufferMessageAttribute()
		throws Exception {
	// Arrange
	AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
	ByteBuffer headerValue = ByteBuffer.wrap("My binary data!".getBytes());
	String headerName = "MyHeader";
	when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue")
			.withWaitTimeSeconds(0).withMaxNumberOfMessages(1)
			.withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES)
			.withMessageAttributeNames("All")))
					.thenReturn(new ReceiveMessageResult().withMessages(
							new com.amazonaws.services.sqs.model.Message()
									.withBody("Hello")
									.withMessageAttributes(Collections.singletonMap(
											headerName,
											new MessageAttributeValue().withDataType(
													MessageAttributeDataTypes.BINARY)
													.withBinaryValue(headerValue)))));

	PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,
			"http://testQueue");

	// Act
	Message<?> receivedMessage = messageChannel.receive();

	// Assert
	assertThat(receivedMessage.getHeaders().get(headerName)).isEqualTo(headerValue);
}
 
源代码24 项目: spring-cloud-aws   文件: QueueMessageChannelTest.java
@Test
void receiveMessage_withIdOfTypeString_IdShouldBeConvertedToUuid() throws Exception {
	// Arrange
	AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
	UUID uuid = UUID.randomUUID();
	when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue")
			.withWaitTimeSeconds(0).withMaxNumberOfMessages(1)
			.withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES)
			.withMessageAttributeNames("All"))).thenReturn(new ReceiveMessageResult()
					.withMessages(new com.amazonaws.services.sqs.model.Message()
							.withBody("Hello")
							.withMessageAttributes(Collections.singletonMap(
									MessageHeaders.ID,
									new MessageAttributeValue()
											.withDataType(
													MessageAttributeDataTypes.STRING)
											.withStringValue(uuid.toString())))));

	PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,
			"http://testQueue");

	// Act
	Message<?> receivedMessage = messageChannel.receive();

	// Assert
	Object idMessageHeader = receivedMessage.getHeaders().get(MessageHeaders.ID);
	assertThat(UUID.class.isInstance(idMessageHeader)).isTrue();
	assertThat(idMessageHeader).isEqualTo(uuid);
}
 
源代码25 项目: nifi   文件: SpringContextDelegate.java
/**
 *
 */
private SpringContextDelegate(String configName) {
    this.configName = configName;
    ClassLoader orig = Thread.currentThread().getContextClassLoader();
    Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
    if (logger.isDebugEnabled()) {
        logger.debug("Using " + Thread.currentThread().getContextClassLoader()
                + " as context class loader while loading Spring Context '" + configName + "'.");
    }
    try {
        this.applicationContext = new ClassPathXmlApplicationContext(configName);
        if (this.applicationContext.containsBean(SpringNiFiConstants.FROM_NIFI)){
            this.toSpringChannel = this.applicationContext.getBean(SpringNiFiConstants.FROM_NIFI, MessageChannel.class);
            if (logger.isDebugEnabled()) {
                logger.debug("Spring Application Context defined in '" + configName
                        + "' is capable of receiving messages from NiFi since 'fromNiFi' channel was discovered.");
            }
        } else {
            this.toSpringChannel = null;
        }
        if (this.applicationContext.containsBean(SpringNiFiConstants.TO_NIFI)){
            this.fromSpringChannel = this.applicationContext.getBean(SpringNiFiConstants.TO_NIFI, PollableChannel.class);
            if (logger.isDebugEnabled()) {
                logger.debug("Spring Application Context defined in '" + configName
                        + "' is capable of sending messages to " + "NiFi since 'toNiFi' channel was discovered.");
            }
        } else {
            this.fromSpringChannel = null;
        }
        if (logger.isInfoEnabled() && this.toSpringChannel == null && this.fromSpringChannel == null){
            logger.info("Spring Application Context is headless since neither 'fromNiFi' nor 'toNiFi' channels were defined. "
                    + "No data will be exchanged.");
        }
    } finally {
        Thread.currentThread().setContextClassLoader(orig);
    }
}
 
@Bean
public PollableChannel fromProcessorChannel() {
	return new QueueChannel();
}
 
@Bean
public PollableChannel unsortedChannel() {
	return new QueueChannel();
}
 
@Bean
public PollableChannel sortedChannel() {
	return new QueueChannel();
}
 
@Bean
public PollableChannel result() {
	return new QueueChannel();
}
 
源代码30 项目: spring-cloud-aws   文件: QueueMessageChannelTest.java
@Test
void receiveMessage_withNumericMessageHeaders_shouldBeReceivedAsQueueMessageAttributes()
		throws Exception {
	// Arrange
	AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);

	HashMap<String, MessageAttributeValue> messageAttributes = new HashMap<>();
	double doubleValue = 1234.56;
	messageAttributes.put("double",
			new MessageAttributeValue()
					.withDataType(
							MessageAttributeDataTypes.NUMBER + ".java.lang.Double")
					.withStringValue(String.valueOf(doubleValue)));
	long longValue = 1234L;
	messageAttributes.put("long",
			new MessageAttributeValue()
					.withDataType(
							MessageAttributeDataTypes.NUMBER + ".java.lang.Long")
					.withStringValue(String.valueOf(longValue)));
	int integerValue = 1234;
	messageAttributes.put("integer",
			new MessageAttributeValue()
					.withDataType(
							MessageAttributeDataTypes.NUMBER + ".java.lang.Integer")
					.withStringValue(String.valueOf(integerValue)));
	byte byteValue = 2;
	messageAttributes.put("byte",
			new MessageAttributeValue()
					.withDataType(
							MessageAttributeDataTypes.NUMBER + ".java.lang.Byte")
					.withStringValue(String.valueOf(byteValue)));
	short shortValue = 12;
	messageAttributes.put("short",
			new MessageAttributeValue()
					.withDataType(
							MessageAttributeDataTypes.NUMBER + ".java.lang.Short")
					.withStringValue(String.valueOf(shortValue)));
	float floatValue = 1234.56f;
	messageAttributes.put("float",
			new MessageAttributeValue()
					.withDataType(
							MessageAttributeDataTypes.NUMBER + ".java.lang.Float")
					.withStringValue(String.valueOf(floatValue)));
	BigInteger bigIntegerValue = new BigInteger("616416546156");
	messageAttributes.put("bigInteger", new MessageAttributeValue()
			.withDataType(MessageAttributeDataTypes.NUMBER + ".java.math.BigInteger")
			.withStringValue(String.valueOf(bigIntegerValue)));
	BigDecimal bigDecimalValue = new BigDecimal("7834938");
	messageAttributes.put("bigDecimal", new MessageAttributeValue()
			.withDataType(MessageAttributeDataTypes.NUMBER + ".java.math.BigDecimal")
			.withStringValue(String.valueOf(bigDecimalValue)));

	when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue")
			.withWaitTimeSeconds(0).withMaxNumberOfMessages(1)
			.withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES)
			.withMessageAttributeNames("All")))
					.thenReturn(new ReceiveMessageResult().withMessages(
							new com.amazonaws.services.sqs.model.Message()
									.withBody("Hello")
									.withMessageAttributes(messageAttributes)));

	PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,
			"http://testQueue");

	// Act
	Message<?> receivedMessage = messageChannel.receive();

	// Assert
	assertThat(receivedMessage.getHeaders().get("double")).isEqualTo(doubleValue);
	assertThat(receivedMessage.getHeaders().get("long")).isEqualTo(longValue);
	assertThat(receivedMessage.getHeaders().get("integer")).isEqualTo(integerValue);
	assertThat(receivedMessage.getHeaders().get("byte")).isEqualTo(byteValue);
	assertThat(receivedMessage.getHeaders().get("short")).isEqualTo(shortValue);
	assertThat(receivedMessage.getHeaders().get("float")).isEqualTo(floatValue);
	assertThat(receivedMessage.getHeaders().get("bigInteger"))
			.isEqualTo(bigIntegerValue);
	assertThat(receivedMessage.getHeaders().get("bigDecimal"))
			.isEqualTo(bigDecimalValue);
}
 
 类方法
 同包方法