类org.springframework.messaging.handler.annotation.Headers源码实例Demo

下面列出了怎么用org.springframework.messaging.handler.annotation.Headers的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: code   文件: RabbitReceiver.java
/**
 * 
 * 	spring.rabbitmq.listener.order.queue.name=queue-2
	spring.rabbitmq.listener.order.queue.durable=true
	spring.rabbitmq.listener.order.exchange.name=exchange-1
	spring.rabbitmq.listener.order.exchange.durable=true
	spring.rabbitmq.listener.order.exchange.type=topic
	spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
	spring.rabbitmq.listener.order.key=springboot.*
 * @param order
 * @param channel
 * @param headers
 * @throws Exception
 */
@RabbitListener(bindings = @QueueBinding(
		value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", 
		durable="${spring.rabbitmq.listener.order.queue.durable}"),
		exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", 
		durable="${spring.rabbitmq.listener.order.exchange.durable}", 
		type= "${spring.rabbitmq.listener.order.exchange.type}", 
		ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
		key = "${spring.rabbitmq.listener.order.key}"
		)
)
@RabbitHandler
public void onOrderMessage(@Payload com.bfxy.springboot.entity.Order order, 
		Channel channel, 
		@Headers Map<String, Object> headers) throws Exception {
	System.err.println("--------------------------------------");
	System.err.println("消费端order: " + order.getId());
	Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
	//手工ACK
	channel.basicAck(deliveryTag, false);
}
 
源代码2 项目: ProjectStudy   文件: UdpServer.java
/**
   * 过滤器
   *
   * @param message
* @param headers
   * @return boolean
   * @throws
   * @author wliduo[[email protected]]
   * @date 2020/5/20 15:30
   */
  @Filter(inputChannel = "udpFilter", outputChannel = "udpRouter")
  public boolean filter(String message, @Headers Map<String, Object> headers) {
      // 获取来源Id
      String id = headers.get("id").toString();
      // 获取来源IP,可以进行IP过滤
      String ip = headers.get("ip_address").toString();
      // 获取来源Port
      String port = headers.get("ip_port").toString();
      // 信息数据过滤
      /*if (message.indexOf("-") < 0) {
          // 没有-的数据会被过滤
          return false;
      }*/
      return true;
  }
 
源代码3 项目: Mastering-Distributed-Tracing   文件: App.java
@KafkaListener(topics = "message")
public void process(@Payload Message message, @Headers MessageHeaders headers) throws Exception {
    Span span = kafka.startConsumerSpan("process", headers);
    try (Scope scope = tracer.scopeManager().activate(span, true)) {
        System.out.println("Received message: " + message.message);
        if (message.image == null && message.message.trim().startsWith("/giphy")) {
            String query = message.message.split("/giphy")[1].trim();
            System.out.println("Giphy requested: " + query);
            message.image = giphy.query(query);
            if (message.image != null) {
                kafka.sendMessage(message);
                System.out.println("Updated message, url=" + message.image);
            }
        }
    }
}
 
@JmsListener(destination = JMSType.SEND_MAIL)
public void sendMail(@Payload Object obj, @Headers MessageHeaders headers, Message message, Session session) {
	log.info("recived mail: {}", obj);

	System.out.println("-------------------------");
	System.out.println("obj:" + obj);
	System.out.println("headers:" + headers);
	System.out.println("message:" + message);
	System.out.println("session:" + session);
	System.out.println("-------------------------");

	if (obj instanceof MimeMessage) {
		mailSender.send((MimeMessage) obj);
	} else {
		mailSender.send((SimpleMailMessage) obj);
	}
}
 
源代码5 项目: herd   文件: HerdJmsMessageListener.java
/**
 * Processes a JMS message.
 *
 * @param payload the message payload.
 * @param allHeaders the JMS headers.
 */
@JmsListener(id = HerdJmsDestinationResolver.SQS_DESTINATION_HERD_INCOMING,
    containerFactory = "jmsListenerContainerFactory", destination = HerdJmsDestinationResolver.SQS_DESTINATION_HERD_INCOMING)
public void processMessage(String payload, @Headers Map<Object, Object> allHeaders)
{
    LOGGER.info("JMS message received from the queue. jmsQueueName=\"{}\" jmsMessageHeaders=\"{}\" jmsMessagePayload={}",
        HerdJmsDestinationResolver.SQS_DESTINATION_HERD_INCOMING, allHeaders, payload);

    // Process the message as S3 notification.
    boolean messageProcessed = processS3Notification(payload);

    if (!messageProcessed)
    {
        // The message was not processed, log the error.
        LOGGER.error("Failed to process message from the JMS queue. jmsQueueName=\"{}\" jmsMessagePayload={}",
            HerdJmsDestinationResolver.SQS_DESTINATION_HERD_INCOMING, payload);
    }
}
 
源代码6 项目: ProjectStudy   文件: UdpServer.java
/**
   * 路由分发处理器
   *
   * @param message
* @param headers
   * @return java.lang.String
   * @throws
   * @author wliduo[[email protected]]
   * @date 2020/5/20 15:35
   */
  @Router(inputChannel = "udpRouter")
  public String router(String message, @Headers Map<String, Object> headers) {
      // 获取来源Id
      String id = headers.get("id").toString();
      // 获取来源IP,可以进行IP过滤
      String ip = headers.get("ip_address").toString();
      // 获取来源Port
      String port = headers.get("ip_port").toString();
      // 筛选,走那个处理器
      if (false) {
          return "udpHandle2";
      }
      return "udpHandle1";
  }
 
@Test
public void supportsParameter() {

	assertTrue(this.resolver.supportsParameter(
			this.resolvable.annotPresent(Headers.class).arg(Map.class, String.class, Object.class)));

	assertTrue(this.resolver.supportsParameter(this.resolvable.arg(MessageHeaders.class)));
	assertTrue(this.resolver.supportsParameter(this.resolvable.arg(MessageHeaderAccessor.class)));
	assertTrue(this.resolver.supportsParameter(this.resolvable.arg(TestMessageHeaderAccessor.class)));

	assertFalse(this.resolver.supportsParameter(this.resolvable.annotPresent(Headers.class).arg(String.class)));
}
 
@Test
public void resolveArgumentAnnotated() throws Exception {
	MethodParameter param = this.resolvable.annotPresent(Headers.class).arg(Map.class, String.class, Object.class);
	Object resolved = this.resolver.resolveArgument(param, this.message);

	assertTrue(resolved instanceof Map);
	@SuppressWarnings("unchecked")
	Map<String, Object> headers = (Map<String, Object>) resolved;
	assertEquals("bar", headers.get("foo"));
}
 
@SuppressWarnings("unused")
private void handleMessage(
		@Headers Map<String, Object> param1,
		@Headers String param2,
		MessageHeaders param3,
		MessageHeaderAccessor param4,
		TestMessageHeaderAccessor param5) {
}
 
@Test
public void supportsParameter() {

	assertTrue(this.resolver.supportsParameter(
			this.resolvable.annotPresent(Headers.class).arg(Map.class, String.class, Object.class)));

	assertTrue(this.resolver.supportsParameter(this.resolvable.arg(MessageHeaders.class)));
	assertTrue(this.resolver.supportsParameter(this.resolvable.arg(MessageHeaderAccessor.class)));
	assertTrue(this.resolver.supportsParameter(this.resolvable.arg(TestMessageHeaderAccessor.class)));

	assertFalse(this.resolver.supportsParameter(this.resolvable.annotPresent(Headers.class).arg(String.class)));
}
 
@Test
@SuppressWarnings("unchecked")
public void resolveArgumentAnnotated() {
	MethodParameter param = this.resolvable.annotPresent(Headers.class).arg(Map.class, String.class, Object.class);
	Map<String, Object> headers = resolveArgument(param);
	assertEquals("bar", headers.get("foo"));
}
 
@SuppressWarnings("unused")
private void handleMessage(
		@Headers Map<String, Object> param1,
		@Headers String param2,
		MessageHeaders param3,
		MessageHeaderAccessor param4,
		TestMessageHeaderAccessor param5) {
}
 
public void resolveHeaders(String content, @Headers Map<String, Object> headers) {
	this.invocations.put("resolveHeaders", true);
	assertEquals("Wrong payload resolution", "my payload", content);
	assertNotNull("headers not injected", headers);
	assertEquals("Missing JMS message id header", "abcd-1234", headers.get(JmsHeaders.MESSAGE_ID));
	assertEquals("Missing custom header", 1234, headers.get("customInt"));
}
 
@SuppressWarnings("unused")
private void handleMessage(
		@Headers Map<String, ?> param1,
		@Headers String param2,
		MessageHeaders param3,
		MessageHeaderAccessor param4,
		TestMessageHeaderAccessor param5) {
}
 
public void resolveHeaders(String content, @Headers Map<String, Object> headers) {
	this.invocations.put("resolveHeaders", true);
	assertEquals("Wrong payload resolution", "my payload", content);
	assertNotNull("headers not injected", headers);
	assertEquals("Missing JMS message id header", "abcd-1234", headers.get(JmsHeaders.MESSAGE_ID));
	assertEquals("Missing custom header", 1234, headers.get("customInt"));
}
 
源代码16 项目: Mastering-Distributed-Tracing   文件: App.java
@KafkaListener(topics = "message")
public void process(@Payload Message message, @Headers MessageHeaders headers) throws Exception {
    Span span = kafka.startConsumerSpan("process", headers);
    try (Scope scope = tracer.scopeManager().activate(span, true)) {
        System.out.println("Received message: " + message.message);
        redis.addMessage(message);
        System.out.println("Added message to room.");
    }
}
 
@Override
public boolean supportsParameter(MethodParameter parameter) {
	Class<?> paramType = parameter.getParameterType();
	return ((parameter.hasParameterAnnotation(Headers.class) && Map.class.isAssignableFrom(paramType)) ||
			MessageHeaders.class == paramType ||
			MessageHeaderAccessor.class.isAssignableFrom(paramType));
}
 
@SuppressWarnings("unused")
private void handleMessage(
		@Headers Map<String, ?> param1,
		@Headers String param2,
		MessageHeaders param3,
		MessageHeaderAccessor param4,
		TestMessageHeaderAccessor param5) {
}
 
public void resolveHeaders(String content, @Headers Map<String, Object> headers) {
	invocations.put("resolveHeaders", true);
	assertEquals("Wrong payload resolution", "my payload", content);
	assertNotNull("headers not injected", headers);
	assertEquals("Missing JMS message id header", "abcd-1234", headers.get(JmsHeaders.MESSAGE_ID));
	assertEquals("Missing custom header", 1234, headers.get("customInt"));
}
 
源代码20 项目: spring-cloud-contract   文件: RabbitManager.java
@RabbitListener(
		bindings = @QueueBinding(
				value = @Queue, exchange = @Exchange(value = "input",
						durable = "true", autoDelete = "false", type = "topic"),
				key = "event"))
public void newBook(Book book, @Headers Map<String, String> headers) {
	LOG.info("Received new book with bookname = " + book.getName());
	LOG.info("Headers = " + headers);
	this.service.sendBook(book, headers.get("amqp_replyTo"));
}
 
源代码21 项目: spring-cloud-contract   文件: RabbitManager.java
@RabbitListener(
		bindings = @QueueBinding(
				value = @Queue, exchange = @Exchange(value = "input",
						durable = "true", autoDelete = "false", type = "topic"),
				key = "event2"))
public void newBook2(Book book, @Headers Map<String, String> headers) {
	LOG.info("newBook2 Received new book with bookname = " + book.getName());
	LOG.info("newBook2 Headers = " + headers);
	this.service.sendBook(book, headers.get("amqp_replyTo"));
}
 
源代码22 项目: herd   文件: SearchIndexUpdateJmsMessageListener.java
/**
 * Processes a JMS message.
 *
 * @param payload the message payload
 * @param allHeaders the JMS headers
 */
@JmsListener(id = HerdJmsDestinationResolver.SQS_DESTINATION_SEARCH_INDEX_UPDATE_QUEUE,
    containerFactory = "jmsListenerContainerFactory",
    destination = HerdJmsDestinationResolver.SQS_DESTINATION_SEARCH_INDEX_UPDATE_QUEUE)
public void processMessage(String payload, @Headers Map<Object, Object> allHeaders)
{
    // Call the process message with retry private method.
    processMessageWithRetry(payload, allHeaders);
}
 
/**
 * Processes a JMS message.
 *
 * @param payload the message payload
 * @param allHeaders the JMS headers
 */
@JmsListener(id = HerdJmsDestinationResolver.SQS_DESTINATION_STORAGE_POLICY_SELECTOR_JOB_SQS_QUEUE,
    containerFactory = "storagePolicyProcessorJmsListenerContainerFactory",
    destination = HerdJmsDestinationResolver.SQS_DESTINATION_STORAGE_POLICY_SELECTOR_JOB_SQS_QUEUE)
public void processMessage(String payload, @Headers Map<Object, Object> allHeaders)
{
    LOGGER.info("Message received from the JMS queue. jmsQueueName=\"{}\" jmsMessageHeaders=\"{}\" jmsMessagePayload={}",
        HerdJmsDestinationResolver.SQS_DESTINATION_STORAGE_POLICY_SELECTOR_JOB_SQS_QUEUE, allHeaders, payload);

    // Process the message as storage policy selection message.
    try
    {
        // Process messages coming from the storage policy selector job.
        StoragePolicySelection storagePolicySelection = jsonHelper.unmarshallJsonToObject(StoragePolicySelection.class, payload);

        LOGGER.debug("Received storage policy selection message: businessObjectDataKey={} storagePolicyKey={} storagePolicyVersion={}",
            jsonHelper.objectToJson(storagePolicySelection.getBusinessObjectDataKey()),
            jsonHelper.objectToJson(storagePolicySelection.getStoragePolicyKey()), storagePolicySelection.getStoragePolicyVersion());

        // Process the storage policy selection message.
        storagePolicyProcessorService.processStoragePolicySelectionMessage(storagePolicySelection);
    }
    catch (RuntimeException | IOException e)
    {
        // Log a warning message if storage unit status is already ARCHIVED. Such error case is typically caused by a duplicate SQS message.
        if (e instanceof IllegalArgumentException &&
            e.getMessage().startsWith(String.format("Storage unit status is \"%s\"", StorageUnitStatusEntity.ARCHIVED)))
        {
            LOGGER.warn("Failed to process message from the JMS queue. jmsQueueName=\"{}\" jmsMessagePayload={}",
                HerdJmsDestinationResolver.SQS_DESTINATION_STORAGE_POLICY_SELECTOR_JOB_SQS_QUEUE, payload, e);
        }
        // Otherwise, log an error.
        else
        {
            LOGGER.error("Failed to process message from the JMS queue. jmsQueueName=\"{}\" jmsMessagePayload={}",
                HerdJmsDestinationResolver.SQS_DESTINATION_STORAGE_POLICY_SELECTOR_JOB_SQS_QUEUE, payload, e);
        }
    }
}
 
@Override
public boolean supportsParameter(MethodParameter parameter) {
	return (!Message.class.isAssignableFrom(parameter.getParameterType())
			&& !MessageHeaders.class.isAssignableFrom(parameter.getParameterType())
			&& !parameter.hasParameterAnnotation(Header.class)
			&& !parameter.hasParameterAnnotation(Headers.class));
}
 
@StreamListener(Processor.INPUT)
public void receive(@Payload StreamListenerTestUtils.FooPojo fooPojo,
		@Headers Map<String, Object> headers,
		@Header(MessageHeaders.CONTENT_TYPE) String contentType) {
	this.receivedArguments.add(fooPojo);
	this.receivedArguments.add(headers);
	this.receivedArguments.add(contentType);
}
 
@StreamListener
public void receive(
		@Input(Processor.INPUT) @Payload StreamListenerTestUtils.FooPojo fooPojo,
		@Headers Map<String, Object> headers,
		@Header(MessageHeaders.CONTENT_TYPE) String contentType) {
	this.receivedArguments.add(fooPojo);
	this.receivedArguments.add(headers);
	this.receivedArguments.add(contentType);
}
 
源代码27 项目: spring-cloud-aws   文件: QueueListenerTest.java
@RuntimeUse
@SqsListener("QueueListenerTest")
public void receiveMessage(String message,
		@Header(value = "SenderId", required = false) String senderId,
		@Headers Map<String, Object> allHeaders, SqsMessageHeaders asSqsHeaders) {
	LOGGER.debug("Received message with content {}", message);
	this.receivedMessages.add(message);
	this.senderId = senderId;
	this.allHeaders = allHeaders;
	this.approximateReceiveCount = asSqsHeaders.getApproximateReceiveCount();
	this.approximateFirstReceiveTimestamp = asSqsHeaders
			.getApproximateFirstReceiveTimestamp();
	this.timestamp = asSqsHeaders.getTimestamp();
	this.sentTimestamp = asSqsHeaders.getSentTimestamp();
	this.getCountDownLatch().countDown();
}
 
源代码28 项目: spring-cloud-aws   文件: QueueMessageHandlerTest.java
@RuntimeUse
@SqsListener("testQueue")
public void receive(@Payload String payload,
		@Headers Map<String, String> headers) {
	this.payload = payload;
	this.headers = headers;
}
 
@Override
public boolean supportsParameter(MethodParameter parameter) {
	Class<?> paramType = parameter.getParameterType();
	return ((parameter.hasParameterAnnotation(Headers.class) && Map.class.isAssignableFrom(paramType)) ||
			MessageHeaders.class == paramType || MessageHeaderAccessor.class.isAssignableFrom(paramType));
}
 
@Override
public boolean supportsParameter(MethodParameter parameter) {
	Class<?> paramType = parameter.getParameterType();
	return ((parameter.hasParameterAnnotation(Headers.class) && Map.class.isAssignableFrom(paramType)) ||
			MessageHeaders.class == paramType || MessageHeaderAccessor.class.isAssignableFrom(paramType));
}
 
 类方法
 同包方法