下面列出了怎么用org.springframework.messaging.handler.annotation.Headers的API类实例代码及写法,或者点击链接到github查看源代码。
/**
*
* spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-1
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=springboot.*
* @param order
* @param channel
* @param headers
* @throws Exception
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",
durable="${spring.rabbitmq.listener.order.queue.durable}"),
exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",
durable="${spring.rabbitmq.listener.order.exchange.durable}",
type= "${spring.rabbitmq.listener.order.exchange.type}",
ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
key = "${spring.rabbitmq.listener.order.key}"
)
)
@RabbitHandler
public void onOrderMessage(@Payload com.bfxy.springboot.entity.Order order,
Channel channel,
@Headers Map<String, Object> headers) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消费端order: " + order.getId());
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}
/**
* 过滤器
*
* @param message
* @param headers
* @return boolean
* @throws
* @author wliduo[[email protected]]
* @date 2020/5/20 15:30
*/
@Filter(inputChannel = "udpFilter", outputChannel = "udpRouter")
public boolean filter(String message, @Headers Map<String, Object> headers) {
// 获取来源Id
String id = headers.get("id").toString();
// 获取来源IP,可以进行IP过滤
String ip = headers.get("ip_address").toString();
// 获取来源Port
String port = headers.get("ip_port").toString();
// 信息数据过滤
/*if (message.indexOf("-") < 0) {
// 没有-的数据会被过滤
return false;
}*/
return true;
}
@KafkaListener(topics = "message")
public void process(@Payload Message message, @Headers MessageHeaders headers) throws Exception {
Span span = kafka.startConsumerSpan("process", headers);
try (Scope scope = tracer.scopeManager().activate(span, true)) {
System.out.println("Received message: " + message.message);
if (message.image == null && message.message.trim().startsWith("/giphy")) {
String query = message.message.split("/giphy")[1].trim();
System.out.println("Giphy requested: " + query);
message.image = giphy.query(query);
if (message.image != null) {
kafka.sendMessage(message);
System.out.println("Updated message, url=" + message.image);
}
}
}
}
@JmsListener(destination = JMSType.SEND_MAIL)
public void sendMail(@Payload Object obj, @Headers MessageHeaders headers, Message message, Session session) {
log.info("recived mail: {}", obj);
System.out.println("-------------------------");
System.out.println("obj:" + obj);
System.out.println("headers:" + headers);
System.out.println("message:" + message);
System.out.println("session:" + session);
System.out.println("-------------------------");
if (obj instanceof MimeMessage) {
mailSender.send((MimeMessage) obj);
} else {
mailSender.send((SimpleMailMessage) obj);
}
}
/**
* Processes a JMS message.
*
* @param payload the message payload.
* @param allHeaders the JMS headers.
*/
@JmsListener(id = HerdJmsDestinationResolver.SQS_DESTINATION_HERD_INCOMING,
containerFactory = "jmsListenerContainerFactory", destination = HerdJmsDestinationResolver.SQS_DESTINATION_HERD_INCOMING)
public void processMessage(String payload, @Headers Map<Object, Object> allHeaders)
{
LOGGER.info("JMS message received from the queue. jmsQueueName=\"{}\" jmsMessageHeaders=\"{}\" jmsMessagePayload={}",
HerdJmsDestinationResolver.SQS_DESTINATION_HERD_INCOMING, allHeaders, payload);
// Process the message as S3 notification.
boolean messageProcessed = processS3Notification(payload);
if (!messageProcessed)
{
// The message was not processed, log the error.
LOGGER.error("Failed to process message from the JMS queue. jmsQueueName=\"{}\" jmsMessagePayload={}",
HerdJmsDestinationResolver.SQS_DESTINATION_HERD_INCOMING, payload);
}
}
/**
* 路由分发处理器
*
* @param message
* @param headers
* @return java.lang.String
* @throws
* @author wliduo[[email protected]]
* @date 2020/5/20 15:35
*/
@Router(inputChannel = "udpRouter")
public String router(String message, @Headers Map<String, Object> headers) {
// 获取来源Id
String id = headers.get("id").toString();
// 获取来源IP,可以进行IP过滤
String ip = headers.get("ip_address").toString();
// 获取来源Port
String port = headers.get("ip_port").toString();
// 筛选,走那个处理器
if (false) {
return "udpHandle2";
}
return "udpHandle1";
}
@Test
public void supportsParameter() {
assertTrue(this.resolver.supportsParameter(
this.resolvable.annotPresent(Headers.class).arg(Map.class, String.class, Object.class)));
assertTrue(this.resolver.supportsParameter(this.resolvable.arg(MessageHeaders.class)));
assertTrue(this.resolver.supportsParameter(this.resolvable.arg(MessageHeaderAccessor.class)));
assertTrue(this.resolver.supportsParameter(this.resolvable.arg(TestMessageHeaderAccessor.class)));
assertFalse(this.resolver.supportsParameter(this.resolvable.annotPresent(Headers.class).arg(String.class)));
}
@Test
public void resolveArgumentAnnotated() throws Exception {
MethodParameter param = this.resolvable.annotPresent(Headers.class).arg(Map.class, String.class, Object.class);
Object resolved = this.resolver.resolveArgument(param, this.message);
assertTrue(resolved instanceof Map);
@SuppressWarnings("unchecked")
Map<String, Object> headers = (Map<String, Object>) resolved;
assertEquals("bar", headers.get("foo"));
}
@SuppressWarnings("unused")
private void handleMessage(
@Headers Map<String, Object> param1,
@Headers String param2,
MessageHeaders param3,
MessageHeaderAccessor param4,
TestMessageHeaderAccessor param5) {
}
@Test
public void supportsParameter() {
assertTrue(this.resolver.supportsParameter(
this.resolvable.annotPresent(Headers.class).arg(Map.class, String.class, Object.class)));
assertTrue(this.resolver.supportsParameter(this.resolvable.arg(MessageHeaders.class)));
assertTrue(this.resolver.supportsParameter(this.resolvable.arg(MessageHeaderAccessor.class)));
assertTrue(this.resolver.supportsParameter(this.resolvable.arg(TestMessageHeaderAccessor.class)));
assertFalse(this.resolver.supportsParameter(this.resolvable.annotPresent(Headers.class).arg(String.class)));
}
@Test
@SuppressWarnings("unchecked")
public void resolveArgumentAnnotated() {
MethodParameter param = this.resolvable.annotPresent(Headers.class).arg(Map.class, String.class, Object.class);
Map<String, Object> headers = resolveArgument(param);
assertEquals("bar", headers.get("foo"));
}
@SuppressWarnings("unused")
private void handleMessage(
@Headers Map<String, Object> param1,
@Headers String param2,
MessageHeaders param3,
MessageHeaderAccessor param4,
TestMessageHeaderAccessor param5) {
}
public void resolveHeaders(String content, @Headers Map<String, Object> headers) {
this.invocations.put("resolveHeaders", true);
assertEquals("Wrong payload resolution", "my payload", content);
assertNotNull("headers not injected", headers);
assertEquals("Missing JMS message id header", "abcd-1234", headers.get(JmsHeaders.MESSAGE_ID));
assertEquals("Missing custom header", 1234, headers.get("customInt"));
}
@SuppressWarnings("unused")
private void handleMessage(
@Headers Map<String, ?> param1,
@Headers String param2,
MessageHeaders param3,
MessageHeaderAccessor param4,
TestMessageHeaderAccessor param5) {
}
public void resolveHeaders(String content, @Headers Map<String, Object> headers) {
this.invocations.put("resolveHeaders", true);
assertEquals("Wrong payload resolution", "my payload", content);
assertNotNull("headers not injected", headers);
assertEquals("Missing JMS message id header", "abcd-1234", headers.get(JmsHeaders.MESSAGE_ID));
assertEquals("Missing custom header", 1234, headers.get("customInt"));
}
@KafkaListener(topics = "message")
public void process(@Payload Message message, @Headers MessageHeaders headers) throws Exception {
Span span = kafka.startConsumerSpan("process", headers);
try (Scope scope = tracer.scopeManager().activate(span, true)) {
System.out.println("Received message: " + message.message);
redis.addMessage(message);
System.out.println("Added message to room.");
}
}
@Override
public boolean supportsParameter(MethodParameter parameter) {
Class<?> paramType = parameter.getParameterType();
return ((parameter.hasParameterAnnotation(Headers.class) && Map.class.isAssignableFrom(paramType)) ||
MessageHeaders.class == paramType ||
MessageHeaderAccessor.class.isAssignableFrom(paramType));
}
@SuppressWarnings("unused")
private void handleMessage(
@Headers Map<String, ?> param1,
@Headers String param2,
MessageHeaders param3,
MessageHeaderAccessor param4,
TestMessageHeaderAccessor param5) {
}
public void resolveHeaders(String content, @Headers Map<String, Object> headers) {
invocations.put("resolveHeaders", true);
assertEquals("Wrong payload resolution", "my payload", content);
assertNotNull("headers not injected", headers);
assertEquals("Missing JMS message id header", "abcd-1234", headers.get(JmsHeaders.MESSAGE_ID));
assertEquals("Missing custom header", 1234, headers.get("customInt"));
}
@RabbitListener(
bindings = @QueueBinding(
value = @Queue, exchange = @Exchange(value = "input",
durable = "true", autoDelete = "false", type = "topic"),
key = "event"))
public void newBook(Book book, @Headers Map<String, String> headers) {
LOG.info("Received new book with bookname = " + book.getName());
LOG.info("Headers = " + headers);
this.service.sendBook(book, headers.get("amqp_replyTo"));
}
@RabbitListener(
bindings = @QueueBinding(
value = @Queue, exchange = @Exchange(value = "input",
durable = "true", autoDelete = "false", type = "topic"),
key = "event2"))
public void newBook2(Book book, @Headers Map<String, String> headers) {
LOG.info("newBook2 Received new book with bookname = " + book.getName());
LOG.info("newBook2 Headers = " + headers);
this.service.sendBook(book, headers.get("amqp_replyTo"));
}
/**
* Processes a JMS message.
*
* @param payload the message payload
* @param allHeaders the JMS headers
*/
@JmsListener(id = HerdJmsDestinationResolver.SQS_DESTINATION_SEARCH_INDEX_UPDATE_QUEUE,
containerFactory = "jmsListenerContainerFactory",
destination = HerdJmsDestinationResolver.SQS_DESTINATION_SEARCH_INDEX_UPDATE_QUEUE)
public void processMessage(String payload, @Headers Map<Object, Object> allHeaders)
{
// Call the process message with retry private method.
processMessageWithRetry(payload, allHeaders);
}
/**
* Processes a JMS message.
*
* @param payload the message payload
* @param allHeaders the JMS headers
*/
@JmsListener(id = HerdJmsDestinationResolver.SQS_DESTINATION_STORAGE_POLICY_SELECTOR_JOB_SQS_QUEUE,
containerFactory = "storagePolicyProcessorJmsListenerContainerFactory",
destination = HerdJmsDestinationResolver.SQS_DESTINATION_STORAGE_POLICY_SELECTOR_JOB_SQS_QUEUE)
public void processMessage(String payload, @Headers Map<Object, Object> allHeaders)
{
LOGGER.info("Message received from the JMS queue. jmsQueueName=\"{}\" jmsMessageHeaders=\"{}\" jmsMessagePayload={}",
HerdJmsDestinationResolver.SQS_DESTINATION_STORAGE_POLICY_SELECTOR_JOB_SQS_QUEUE, allHeaders, payload);
// Process the message as storage policy selection message.
try
{
// Process messages coming from the storage policy selector job.
StoragePolicySelection storagePolicySelection = jsonHelper.unmarshallJsonToObject(StoragePolicySelection.class, payload);
LOGGER.debug("Received storage policy selection message: businessObjectDataKey={} storagePolicyKey={} storagePolicyVersion={}",
jsonHelper.objectToJson(storagePolicySelection.getBusinessObjectDataKey()),
jsonHelper.objectToJson(storagePolicySelection.getStoragePolicyKey()), storagePolicySelection.getStoragePolicyVersion());
// Process the storage policy selection message.
storagePolicyProcessorService.processStoragePolicySelectionMessage(storagePolicySelection);
}
catch (RuntimeException | IOException e)
{
// Log a warning message if storage unit status is already ARCHIVED. Such error case is typically caused by a duplicate SQS message.
if (e instanceof IllegalArgumentException &&
e.getMessage().startsWith(String.format("Storage unit status is \"%s\"", StorageUnitStatusEntity.ARCHIVED)))
{
LOGGER.warn("Failed to process message from the JMS queue. jmsQueueName=\"{}\" jmsMessagePayload={}",
HerdJmsDestinationResolver.SQS_DESTINATION_STORAGE_POLICY_SELECTOR_JOB_SQS_QUEUE, payload, e);
}
// Otherwise, log an error.
else
{
LOGGER.error("Failed to process message from the JMS queue. jmsQueueName=\"{}\" jmsMessagePayload={}",
HerdJmsDestinationResolver.SQS_DESTINATION_STORAGE_POLICY_SELECTOR_JOB_SQS_QUEUE, payload, e);
}
}
}
@Override
public boolean supportsParameter(MethodParameter parameter) {
return (!Message.class.isAssignableFrom(parameter.getParameterType())
&& !MessageHeaders.class.isAssignableFrom(parameter.getParameterType())
&& !parameter.hasParameterAnnotation(Header.class)
&& !parameter.hasParameterAnnotation(Headers.class));
}
@StreamListener(Processor.INPUT)
public void receive(@Payload StreamListenerTestUtils.FooPojo fooPojo,
@Headers Map<String, Object> headers,
@Header(MessageHeaders.CONTENT_TYPE) String contentType) {
this.receivedArguments.add(fooPojo);
this.receivedArguments.add(headers);
this.receivedArguments.add(contentType);
}
@StreamListener
public void receive(
@Input(Processor.INPUT) @Payload StreamListenerTestUtils.FooPojo fooPojo,
@Headers Map<String, Object> headers,
@Header(MessageHeaders.CONTENT_TYPE) String contentType) {
this.receivedArguments.add(fooPojo);
this.receivedArguments.add(headers);
this.receivedArguments.add(contentType);
}
@RuntimeUse
@SqsListener("QueueListenerTest")
public void receiveMessage(String message,
@Header(value = "SenderId", required = false) String senderId,
@Headers Map<String, Object> allHeaders, SqsMessageHeaders asSqsHeaders) {
LOGGER.debug("Received message with content {}", message);
this.receivedMessages.add(message);
this.senderId = senderId;
this.allHeaders = allHeaders;
this.approximateReceiveCount = asSqsHeaders.getApproximateReceiveCount();
this.approximateFirstReceiveTimestamp = asSqsHeaders
.getApproximateFirstReceiveTimestamp();
this.timestamp = asSqsHeaders.getTimestamp();
this.sentTimestamp = asSqsHeaders.getSentTimestamp();
this.getCountDownLatch().countDown();
}
@RuntimeUse
@SqsListener("testQueue")
public void receive(@Payload String payload,
@Headers Map<String, String> headers) {
this.payload = payload;
this.headers = headers;
}
@Override
public boolean supportsParameter(MethodParameter parameter) {
Class<?> paramType = parameter.getParameterType();
return ((parameter.hasParameterAnnotation(Headers.class) && Map.class.isAssignableFrom(paramType)) ||
MessageHeaders.class == paramType || MessageHeaderAccessor.class.isAssignableFrom(paramType));
}
@Override
public boolean supportsParameter(MethodParameter parameter) {
Class<?> paramType = parameter.getParameterType();
return ((parameter.hasParameterAnnotation(Headers.class) && Map.class.isAssignableFrom(paramType)) ||
MessageHeaders.class == paramType || MessageHeaderAccessor.class.isAssignableFrom(paramType));
}