类org.springframework.messaging.handler.annotation.Header源码实例Demo

下面列出了怎么用org.springframework.messaging.handler.annotation.Header的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: SpringBootBucket   文件: AsyncRPCServer.java
@RabbitListener(queues = QUEUE_ASYNC_RPC_WITH_FIXED_REPLY)
    public void processAsyncRpcFixed(User user,
                                     @Header(AmqpHeaders.REPLY_TO) String replyTo,
                                     @Header(AmqpHeaders.CORRELATION_ID) byte[] correlationId) {
//        String body = new String(message.getBody(), Charset.forName("UTF-8"));
//        User user = JacksonUtil.json2Bean(body, new TypeReference<User>(){});
        logger.info("user.name={}", user.getName());
        logger.info("use a fixed reply queue={}, correlationId={}", replyTo, new String(correlationId));
        ListenableFuture<String> asyncResult = asyncTask.expensiveOperation(user.getName());
        asyncResult.addCallback(new ListenableFutureCallback<String>() {
            @Override
            public void onSuccess(String result) {
                amqpTemplate.convertAndSend(replyTo, (Object) result, m -> {
                    //https://stackoverflow.com/questions/42382307/messageproperties-setcorrelationidstring-is-not-working
                    m.getMessageProperties().setCorrelationId(new String(correlationId));
                    return m;
                });
            }

            @Override
            public void onFailure(Throwable ex) {
                logger.error("接受到QUEUE_ASYNC_RPC_WITH_FIXED_REPLY失败", ex);
            }
        });
    }
 
源代码2 项目: SpringBootBucket   文件: AsyncRPCServer.java
@RabbitListener(queues = QUEUE_ASYNC_RPC)
public void processAsyncRpc(Message message, @Header(AmqpHeaders.REPLY_TO) String replyTo) {
    String body = new String(message.getBody(), Charset.forName("UTF-8"));
 User user = JacksonUtil.json2Bean(body, new TypeReference<User>(){});
    logger.info("recevie message {} and reply to {}, user.name={}", body, replyTo, user.getName());
    if (replyTo.startsWith("amq.rabbitmq.reply-to")) {
        logger.debug("starting with version 3.4.0, the RabbitMQ server now supports Direct reply-to");
    } else {
        logger.info("fall back to using a temporary reply queue");
    }
    ListenableFuture<String> asyncResult = asyncTask.expensiveOperation(body);
    asyncResult.addCallback(new ListenableFutureCallback<String>() {
        @Override
        public void onSuccess(String result) {
            amqpTemplate.convertAndSend(replyTo, result);
        }

        @Override
        public void onFailure(Throwable ex) {
            logger.error("接受到QUEUE_ASYNC_RPC失败", ex);
        }
    });
}
 
源代码3 项目: spring-jms   文件: Receiver.java
@JmsListener(destination = "${destination.order}")
public void receiveOrder(String orderNumber,
    @Header(JmsHeaders.MESSAGE_ID) String messageId) {
  LOGGER.info("received OrderNumber='{}' with MessageId='{}'",
      orderNumber, messageId);

  LOGGER.info("sending Status='Accepted' with CorrelationId='{}'",
      messageId);

  jmsTemplate.send(statusDestination, messageCreator -> {
    TextMessage message =
        messageCreator.createTextMessage("Accepted");
    message.setJMSCorrelationID(messageId);
    return message;
  });
}
 
源代码4 项目: rqueue   文件: MessageListener.java
@RqueueListener(
    value = "${notification.queue.name}",
    numRetries = "${notification.queue.retry.count}",
    active = "${notification.queue.active}")
public void onMessage(
    @Payload Notification notification, @Header(RqueueMessageHeaders.ID) String id)
    throws Exception {
  log.info("Notification: {}, Id: {}", notification, id);
  if (failureManager.shouldFail(notification.getId())) {
    throw new Exception("Failing notification task to be retried" + notification);
  }
  consumedMessageService.save(notification);
}
 
源代码5 项目: rqueue   文件: MessageListener.java
@RqueueListener(
    value = "${email.queue.name}",
    deadLetterQueue = "${email.dead.letter.queue.name}",
    numRetries = "${email.queue.retry.count}",
    visibilityTimeout = "${email.execution.time}",
    active = "${email.queue.active}")
public void onMessage(Email email, @Header(RqueueMessageHeaders.MESSAGE) RqueueMessage message)
    throws Exception {
  log.info("Email: {} Message: {}", email, message);
  if (failureManager.shouldFail(email.getId())) {
    throw new Exception("Failing email task to be retried" + email);
  }
  consumedMessageService.save(email);
}
 
@SuppressWarnings({"unused", "OptionalUsedAsFieldOrParameterType"})
public void handleMessage(
		@Header String param1,
		@Header(name = "name", defaultValue = "bar") String param2,
		@Header(name = "name", defaultValue = "#{systemProperties.systemProperty}") String param3,
		@Header(name = "#{systemProperties.systemProperty}") String param4,
		String param5,
		@Header("foo") Optional<String> param6,
		@Header("nativeHeaders.param1") String nativeHeaderParam1) {
}
 
@SuppressWarnings({"unused", "OptionalUsedAsFieldOrParameterType"})
public void handleMessage(
		@Header String param1,
		@Header(name = "name", defaultValue = "bar") String param2,
		@Header(name = "name", defaultValue = "#{systemProperties.systemProperty}") String param3,
		@Header(name = "#{systemProperties.systemProperty}") String param4,
		String param5,
		@Header("foo") Optional<String> param6,
		@Header("nativeHeaders.param1") String nativeHeaderParam1) {
}
 
public void handleMessage(
		@Header String param1,
		@Header(name = "name", defaultValue = "bar") String param2,
		@Header(name = "name", defaultValue = "#{systemProperties.systemProperty}") String param3,
		@Header(name = "#{systemProperties.systemProperty}") String param4,
		String param5,
		@Header("foo") Optional<String> param6,
		@Header("nativeHeaders.param1") String nativeHeaderParam1) {
}
 
@KafkaListener(topics = "${kafka.topic.batchConsumerTopic}", containerFactory = "kafkaListenerContainerFactoryForBatchConsumer", groupId = "batchConsumer")
public void receive(@Payload List<String> payloads,
                    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Long> partitionIds,
                    @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    LOGGER.info("Received group=batchConsumer with batch group data: ");
    for (int i = 0; i< payloads.size(); ++i) {
        LOGGER.info("---------------- payload='{}' from [email protected]='{}'", payloads.get(i), partitionIds.get(i)+"@"+offsets.get(i));
    }

}
 
源代码10 项目: cloud-espm-cloud-native   文件: LocalListner.java
/**
 * This method is used to process messages from queue.
 * 
 * @param in
 * @param channel
 * @param tag
 * @throws IOException
 * @throws InterruptedException
 */
@RabbitListener(queues = "espm.salesOrders")
public void recieve(SalesOrder in, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
		throws IOException, InterruptedException {

	SalesOrderRepository repo = appContext.getBean(SalesOrderRepository.class);
	try {
		if (!repo.existsById(in.getSalesOrderId())) {
			repo.save(in);
			logger.info(in.getSalesOrderId() + " created");
			channel.basicAck(tag, false);
			value = initialValue;

		} else {
			logger.error(in.getSalesOrderId() + " already Exists, Deleting from Queue");
			channel.basicAck(tag, false);

		}
	} catch (DataIntegrityViolationException e) {
		logger.error(in.getSalesOrderId() + " is an invalid Sales-Order, Deleting from Queue");
		channel.basicNack(tag, false, false);

	} catch (CannotCreateTransactionException ccte) {
		logger.error("Unable to connect to DB");
		logger.error("Backing  for " + value);
		TimeUnit.MILLISECONDS.sleep(value);
		if (value <= maxVal)
			value = value * multiplier;
		channel.basicNack(tag, false, true);

	}

}
 
源代码11 项目: spring-cloud-gray   文件: MessageAckListener.java
@StreamListener(TestSourceInput.INPUT)
    public void receive(Object msg, @Header(AmqpHeaders.CHANNEL) Channel channel,
                        @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws IOException {
        log.info("Received3:" + msg);
        boolean request = false;//true=重新发送
//        channel.basicReject(deliveryTag, request);
        channel.basicAck(deliveryTag, false);
    }
 
源代码12 项目: java-tutorial   文件: KafkaConsumer.java
/**
 * 监听kafka.tut 的 topic
 *
 * @param record
 * @param topic  topic
 */
@KafkaListener(id = "tut", topics = "kafka.tut")
public void listen(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    //判断是否NULL
    Optional<?> kafkaMessage = Optional.ofNullable(record.value());

    if (kafkaMessage.isPresent()) {
        //获取消息
        Object message = kafkaMessage.get();

        logger.info("Receive: +++++++++++++++ Topic:" + topic);
        logger.info("Receive: +++++++++++++++ Record:" + record);
        logger.info("Receive: +++++++++++++++ Message:" + message);
    }
}
 
源代码13 项目: spring-boot-aws-mock   文件: Sample2Listener.java
@SqsListener(value = "sample2", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void receive(String message, @Header("SenderId") String senderId, Acknowledgment ack) throws IOException {
    log.info("[sample2][Queue] senderId: {}, message: {}", senderId, message);
    PointDto messageObject = objectMapper.readValue(message, PointDto.class);
    pointRepository.save(messageObject.toEntity());
    ack.acknowledge();
    countDownLatch.countDown();
    log.info("[sample2] Success Ack");
}
 
源代码14 项目: spring-boot-aws-mock   文件: Sample3Listener.java
@SqsListener(value = "sample3", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void receive(String message, @Header("SenderId") String senderId, Acknowledgment ack) throws IOException {
    log.info("[sample3][Queue] senderId: {}, message: {}", senderId, message);
    PointDto messageObject = objectMapper.readValue(message, PointDto.class);
    try{
        pointRepository.save(messageObject.toEntity());
        ack.acknowledge();
        countDownLatch.countDown();
        log.info("[sample3] Success Ack");
    } catch (Exception e){
        log.error("[sample3] Point Save Fail: "+ message, e);
    }
}
 
源代码15 项目: spring-cloud-gcp   文件: ReceiverConfiguration.java
@ServiceActivator(inputChannel = "pubSubInputChannel")
public void messageReceiver(Person payload,
		@Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
	LOGGER.info("Message arrived! Payload: " + payload);
	this.processedPersonsList.add(payload);
	message.ack();
}
 
@SuppressWarnings("unused")
private void handleMessage(
		@Header String param1,
		@Header(name = "name", defaultValue = "bar") String param2,
		@Header(name = "name", defaultValue = "#{systemProperties.systemProperty}") String param3,
		String param4,
		@Header("nativeHeaders.param1") String nativeHeaderParam1) {
}
 
源代码17 项目: hawkbit-examples   文件: DmfReceiverService.java
/**
 * Handle the incoming Message from Queue with the property
 * (hawkbit.device.simulator.amqp.receiverConnectorQueueFromSp).
 *
 * @param message
 *            the incoming message
 * @param type
 *            the action type
 * @param thingId
 *            the thing id in message header
 * @param tenant
 *            the device belongs to
 */
@RabbitListener(queues = "${hawkbit.device.simulator.amqp.receiverConnectorQueueFromSp}")
public void recieveMessageSp(final Message message, @Header(MessageHeaderKey.TYPE) final String type,
        @Header(name = MessageHeaderKey.THING_ID, required = false) final String thingId,
        @Header(MessageHeaderKey.TENANT) final String tenant) {
    final MessageType messageType = MessageType.valueOf(type);

    if (MessageType.EVENT == messageType) {
        checkContentTypeJson(message);
        handleEventMessage(message, thingId);
        return;
    }

    if (MessageType.THING_DELETED == messageType) {
        checkContentTypeJson(message);
        repository.remove(tenant, thingId);
        return;
    }

    if (MessageType.PING_RESPONSE == messageType) {
        final String correlationId = message.getMessageProperties().getCorrelationId();
        if (!openPings.remove(correlationId)) {
            LOGGER.error("Unknown PING_RESPONSE received for correlationId: {}.", correlationId);
        }

        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Got ping response from tenant {} with correlationId {} with timestamp {}", tenant,
                    correlationId, new String(message.getBody(), StandardCharsets.UTF_8));
        }

        return;
    }

    LOGGER.info("No valid message type property.");
}
 
源代码18 项目: spring-kafka   文件: Receiver.java
@KafkaListener(id = "batch-listener", topics = "${kafka.topic.batch}")
public void receive(List<String> data,
    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
    @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
  LOGGER.info("start of batch receive");
  for (int i = 0; i < data.size(); i++) {
    LOGGER.info("received message='{}' with partition-offset='{}'", data.get(i),
        partitions.get(i) + "-" + offsets.get(i));
    // handle message

    latch.countDown();
  }
  LOGGER.info("end of batch receive");
}
 
@Override
public boolean supportsParameter(MethodParameter parameter) {
	return (!Message.class.isAssignableFrom(parameter.getParameterType())
			&& !MessageHeaders.class.isAssignableFrom(parameter.getParameterType())
			&& !parameter.hasParameterAnnotation(Header.class)
			&& !parameter.hasParameterAnnotation(Headers.class));
}
 
@StreamListener(Processor.INPUT)
public void receive(@Payload StreamListenerTestUtils.FooPojo fooPojo,
		@Headers Map<String, Object> headers,
		@Header(MessageHeaders.CONTENT_TYPE) String contentType) {
	this.receivedArguments.add(fooPojo);
	this.receivedArguments.add(headers);
	this.receivedArguments.add(contentType);
}
 
源代码21 项目: tutorials   文件: SpringCloudSQS.java
@SqsListener(QUEUE_NAME)
public void receiveMessage(String message, @Header("SenderId") String senderId) {
    logger.info("Received message: {}, having SenderId: {}", message, senderId);
    if (countDownLatch != null) {
        countDownLatch.countDown();
    }
}
 
源代码22 项目: spring-cloud-aws   文件: QueueListenerTest.java
@RuntimeUse
@SqsListener("QueueListenerTest")
public void receiveMessage(String message,
		@Header(value = "SenderId", required = false) String senderId,
		@Headers Map<String, Object> allHeaders, SqsMessageHeaders asSqsHeaders) {
	LOGGER.debug("Received message with content {}", message);
	this.receivedMessages.add(message);
	this.senderId = senderId;
	this.allHeaders = allHeaders;
	this.approximateReceiveCount = asSqsHeaders.getApproximateReceiveCount();
	this.approximateFirstReceiveTimestamp = asSqsHeaders
			.getApproximateFirstReceiveTimestamp();
	this.timestamp = asSqsHeaders.getTimestamp();
	this.sentTimestamp = asSqsHeaders.getSentTimestamp();
	this.getCountDownLatch().countDown();
}
 
源代码23 项目: spring-cloud-aws   文件: QueueMessageHandlerTest.java
@RuntimeUse
@SqsListener("testQueue")
public void receive(@Payload String payload,
		@Header(value = "SenderId", required = false) String senderId) {
	this.senderId = senderId;
	this.payload = payload;
}
 
源代码24 项目: aws-refapp   文件: SqsController.java
@SqsListener(QUEUE_NAME)
private void receiveMessage(MessageToProcess message, @Header("ApproximateFirstReceiveTimestamp") String approximateFirstReceiveTimestamp) {
    LOG.debug("Received SQS message {}", message);

    try {
        this.sqsSendingTextWebSocketHandler.broadcastToSessions(new DataWithTimestamp<>(message, approximateFirstReceiveTimestamp));
    } catch (IOException e) {
        LOG.error("Was not able to push the message to the client.", e);
    }
}
 
@Override
public boolean supportsParameter(MethodParameter parameter) {
	return parameter.hasParameterAnnotation(Header.class);
}
 
@Override
protected NamedValueInfo createNamedValueInfo(MethodParameter parameter) {
	Header annot = parameter.getParameterAnnotation(Header.class);
	Assert.state(annot != null, "No Header annotation");
	return new HeaderNamedValueInfo(annot);
}
 
private HeaderNamedValueInfo(Header annotation) {
	super(annotation.name(), annotation.required(), annotation.defaultValue());
}
 
@Override
public boolean supportsParameter(MethodParameter parameter) {
	return parameter.hasParameterAnnotation(Header.class);
}
 
@Override
protected NamedValueInfo createNamedValueInfo(MethodParameter parameter) {
	Header annot = parameter.getParameterAnnotation(Header.class);
	Assert.state(annot != null, "No Header annotation");
	return new HeaderNamedValueInfo(annot);
}
 
private HeaderNamedValueInfo(Header annotation) {
	super(annotation.name(), annotation.required(), annotation.defaultValue());
}
 
 类方法
 同包方法