下面列出了怎么用org.springframework.messaging.handler.annotation.Header的API类实例代码及写法,或者点击链接到github查看源代码。
@RabbitListener(queues = QUEUE_ASYNC_RPC_WITH_FIXED_REPLY)
public void processAsyncRpcFixed(User user,
@Header(AmqpHeaders.REPLY_TO) String replyTo,
@Header(AmqpHeaders.CORRELATION_ID) byte[] correlationId) {
// String body = new String(message.getBody(), Charset.forName("UTF-8"));
// User user = JacksonUtil.json2Bean(body, new TypeReference<User>(){});
logger.info("user.name={}", user.getName());
logger.info("use a fixed reply queue={}, correlationId={}", replyTo, new String(correlationId));
ListenableFuture<String> asyncResult = asyncTask.expensiveOperation(user.getName());
asyncResult.addCallback(new ListenableFutureCallback<String>() {
@Override
public void onSuccess(String result) {
amqpTemplate.convertAndSend(replyTo, (Object) result, m -> {
//https://stackoverflow.com/questions/42382307/messageproperties-setcorrelationidstring-is-not-working
m.getMessageProperties().setCorrelationId(new String(correlationId));
return m;
});
}
@Override
public void onFailure(Throwable ex) {
logger.error("接受到QUEUE_ASYNC_RPC_WITH_FIXED_REPLY失败", ex);
}
});
}
@RabbitListener(queues = QUEUE_ASYNC_RPC)
public void processAsyncRpc(Message message, @Header(AmqpHeaders.REPLY_TO) String replyTo) {
String body = new String(message.getBody(), Charset.forName("UTF-8"));
User user = JacksonUtil.json2Bean(body, new TypeReference<User>(){});
logger.info("recevie message {} and reply to {}, user.name={}", body, replyTo, user.getName());
if (replyTo.startsWith("amq.rabbitmq.reply-to")) {
logger.debug("starting with version 3.4.0, the RabbitMQ server now supports Direct reply-to");
} else {
logger.info("fall back to using a temporary reply queue");
}
ListenableFuture<String> asyncResult = asyncTask.expensiveOperation(body);
asyncResult.addCallback(new ListenableFutureCallback<String>() {
@Override
public void onSuccess(String result) {
amqpTemplate.convertAndSend(replyTo, result);
}
@Override
public void onFailure(Throwable ex) {
logger.error("接受到QUEUE_ASYNC_RPC失败", ex);
}
});
}
@JmsListener(destination = "${destination.order}")
public void receiveOrder(String orderNumber,
@Header(JmsHeaders.MESSAGE_ID) String messageId) {
LOGGER.info("received OrderNumber='{}' with MessageId='{}'",
orderNumber, messageId);
LOGGER.info("sending Status='Accepted' with CorrelationId='{}'",
messageId);
jmsTemplate.send(statusDestination, messageCreator -> {
TextMessage message =
messageCreator.createTextMessage("Accepted");
message.setJMSCorrelationID(messageId);
return message;
});
}
@RqueueListener(
value = "${notification.queue.name}",
numRetries = "${notification.queue.retry.count}",
active = "${notification.queue.active}")
public void onMessage(
@Payload Notification notification, @Header(RqueueMessageHeaders.ID) String id)
throws Exception {
log.info("Notification: {}, Id: {}", notification, id);
if (failureManager.shouldFail(notification.getId())) {
throw new Exception("Failing notification task to be retried" + notification);
}
consumedMessageService.save(notification);
}
@RqueueListener(
value = "${email.queue.name}",
deadLetterQueue = "${email.dead.letter.queue.name}",
numRetries = "${email.queue.retry.count}",
visibilityTimeout = "${email.execution.time}",
active = "${email.queue.active}")
public void onMessage(Email email, @Header(RqueueMessageHeaders.MESSAGE) RqueueMessage message)
throws Exception {
log.info("Email: {} Message: {}", email, message);
if (failureManager.shouldFail(email.getId())) {
throw new Exception("Failing email task to be retried" + email);
}
consumedMessageService.save(email);
}
@SuppressWarnings({"unused", "OptionalUsedAsFieldOrParameterType"})
public void handleMessage(
@Header String param1,
@Header(name = "name", defaultValue = "bar") String param2,
@Header(name = "name", defaultValue = "#{systemProperties.systemProperty}") String param3,
@Header(name = "#{systemProperties.systemProperty}") String param4,
String param5,
@Header("foo") Optional<String> param6,
@Header("nativeHeaders.param1") String nativeHeaderParam1) {
}
@SuppressWarnings({"unused", "OptionalUsedAsFieldOrParameterType"})
public void handleMessage(
@Header String param1,
@Header(name = "name", defaultValue = "bar") String param2,
@Header(name = "name", defaultValue = "#{systemProperties.systemProperty}") String param3,
@Header(name = "#{systemProperties.systemProperty}") String param4,
String param5,
@Header("foo") Optional<String> param6,
@Header("nativeHeaders.param1") String nativeHeaderParam1) {
}
public void handleMessage(
@Header String param1,
@Header(name = "name", defaultValue = "bar") String param2,
@Header(name = "name", defaultValue = "#{systemProperties.systemProperty}") String param3,
@Header(name = "#{systemProperties.systemProperty}") String param4,
String param5,
@Header("foo") Optional<String> param6,
@Header("nativeHeaders.param1") String nativeHeaderParam1) {
}
@KafkaListener(topics = "${kafka.topic.batchConsumerTopic}", containerFactory = "kafkaListenerContainerFactoryForBatchConsumer", groupId = "batchConsumer")
public void receive(@Payload List<String> payloads,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Long> partitionIds,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
LOGGER.info("Received group=batchConsumer with batch group data: ");
for (int i = 0; i< payloads.size(); ++i) {
LOGGER.info("---------------- payload='{}' from [email protected]='{}'", payloads.get(i), partitionIds.get(i)+"@"+offsets.get(i));
}
}
/**
* This method is used to process messages from queue.
*
* @param in
* @param channel
* @param tag
* @throws IOException
* @throws InterruptedException
*/
@RabbitListener(queues = "espm.salesOrders")
public void recieve(SalesOrder in, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException, InterruptedException {
SalesOrderRepository repo = appContext.getBean(SalesOrderRepository.class);
try {
if (!repo.existsById(in.getSalesOrderId())) {
repo.save(in);
logger.info(in.getSalesOrderId() + " created");
channel.basicAck(tag, false);
value = initialValue;
} else {
logger.error(in.getSalesOrderId() + " already Exists, Deleting from Queue");
channel.basicAck(tag, false);
}
} catch (DataIntegrityViolationException e) {
logger.error(in.getSalesOrderId() + " is an invalid Sales-Order, Deleting from Queue");
channel.basicNack(tag, false, false);
} catch (CannotCreateTransactionException ccte) {
logger.error("Unable to connect to DB");
logger.error("Backing for " + value);
TimeUnit.MILLISECONDS.sleep(value);
if (value <= maxVal)
value = value * multiplier;
channel.basicNack(tag, false, true);
}
}
@StreamListener(TestSourceInput.INPUT)
public void receive(Object msg, @Header(AmqpHeaders.CHANNEL) Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws IOException {
log.info("Received3:" + msg);
boolean request = false;//true=重新发送
// channel.basicReject(deliveryTag, request);
channel.basicAck(deliveryTag, false);
}
/**
* 监听kafka.tut 的 topic
*
* @param record
* @param topic topic
*/
@KafkaListener(id = "tut", topics = "kafka.tut")
public void listen(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
//判断是否NULL
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
//获取消息
Object message = kafkaMessage.get();
logger.info("Receive: +++++++++++++++ Topic:" + topic);
logger.info("Receive: +++++++++++++++ Record:" + record);
logger.info("Receive: +++++++++++++++ Message:" + message);
}
}
@SqsListener(value = "sample2", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void receive(String message, @Header("SenderId") String senderId, Acknowledgment ack) throws IOException {
log.info("[sample2][Queue] senderId: {}, message: {}", senderId, message);
PointDto messageObject = objectMapper.readValue(message, PointDto.class);
pointRepository.save(messageObject.toEntity());
ack.acknowledge();
countDownLatch.countDown();
log.info("[sample2] Success Ack");
}
@SqsListener(value = "sample3", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void receive(String message, @Header("SenderId") String senderId, Acknowledgment ack) throws IOException {
log.info("[sample3][Queue] senderId: {}, message: {}", senderId, message);
PointDto messageObject = objectMapper.readValue(message, PointDto.class);
try{
pointRepository.save(messageObject.toEntity());
ack.acknowledge();
countDownLatch.countDown();
log.info("[sample3] Success Ack");
} catch (Exception e){
log.error("[sample3] Point Save Fail: "+ message, e);
}
}
@ServiceActivator(inputChannel = "pubSubInputChannel")
public void messageReceiver(Person payload,
@Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
LOGGER.info("Message arrived! Payload: " + payload);
this.processedPersonsList.add(payload);
message.ack();
}
@SuppressWarnings("unused")
private void handleMessage(
@Header String param1,
@Header(name = "name", defaultValue = "bar") String param2,
@Header(name = "name", defaultValue = "#{systemProperties.systemProperty}") String param3,
String param4,
@Header("nativeHeaders.param1") String nativeHeaderParam1) {
}
/**
* Handle the incoming Message from Queue with the property
* (hawkbit.device.simulator.amqp.receiverConnectorQueueFromSp).
*
* @param message
* the incoming message
* @param type
* the action type
* @param thingId
* the thing id in message header
* @param tenant
* the device belongs to
*/
@RabbitListener(queues = "${hawkbit.device.simulator.amqp.receiverConnectorQueueFromSp}")
public void recieveMessageSp(final Message message, @Header(MessageHeaderKey.TYPE) final String type,
@Header(name = MessageHeaderKey.THING_ID, required = false) final String thingId,
@Header(MessageHeaderKey.TENANT) final String tenant) {
final MessageType messageType = MessageType.valueOf(type);
if (MessageType.EVENT == messageType) {
checkContentTypeJson(message);
handleEventMessage(message, thingId);
return;
}
if (MessageType.THING_DELETED == messageType) {
checkContentTypeJson(message);
repository.remove(tenant, thingId);
return;
}
if (MessageType.PING_RESPONSE == messageType) {
final String correlationId = message.getMessageProperties().getCorrelationId();
if (!openPings.remove(correlationId)) {
LOGGER.error("Unknown PING_RESPONSE received for correlationId: {}.", correlationId);
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Got ping response from tenant {} with correlationId {} with timestamp {}", tenant,
correlationId, new String(message.getBody(), StandardCharsets.UTF_8));
}
return;
}
LOGGER.info("No valid message type property.");
}
@KafkaListener(id = "batch-listener", topics = "${kafka.topic.batch}")
public void receive(List<String> data,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
LOGGER.info("start of batch receive");
for (int i = 0; i < data.size(); i++) {
LOGGER.info("received message='{}' with partition-offset='{}'", data.get(i),
partitions.get(i) + "-" + offsets.get(i));
// handle message
latch.countDown();
}
LOGGER.info("end of batch receive");
}
@Override
public boolean supportsParameter(MethodParameter parameter) {
return (!Message.class.isAssignableFrom(parameter.getParameterType())
&& !MessageHeaders.class.isAssignableFrom(parameter.getParameterType())
&& !parameter.hasParameterAnnotation(Header.class)
&& !parameter.hasParameterAnnotation(Headers.class));
}
@StreamListener(Processor.INPUT)
public void receive(@Payload StreamListenerTestUtils.FooPojo fooPojo,
@Headers Map<String, Object> headers,
@Header(MessageHeaders.CONTENT_TYPE) String contentType) {
this.receivedArguments.add(fooPojo);
this.receivedArguments.add(headers);
this.receivedArguments.add(contentType);
}
@SqsListener(QUEUE_NAME)
public void receiveMessage(String message, @Header("SenderId") String senderId) {
logger.info("Received message: {}, having SenderId: {}", message, senderId);
if (countDownLatch != null) {
countDownLatch.countDown();
}
}
@RuntimeUse
@SqsListener("QueueListenerTest")
public void receiveMessage(String message,
@Header(value = "SenderId", required = false) String senderId,
@Headers Map<String, Object> allHeaders, SqsMessageHeaders asSqsHeaders) {
LOGGER.debug("Received message with content {}", message);
this.receivedMessages.add(message);
this.senderId = senderId;
this.allHeaders = allHeaders;
this.approximateReceiveCount = asSqsHeaders.getApproximateReceiveCount();
this.approximateFirstReceiveTimestamp = asSqsHeaders
.getApproximateFirstReceiveTimestamp();
this.timestamp = asSqsHeaders.getTimestamp();
this.sentTimestamp = asSqsHeaders.getSentTimestamp();
this.getCountDownLatch().countDown();
}
@RuntimeUse
@SqsListener("testQueue")
public void receive(@Payload String payload,
@Header(value = "SenderId", required = false) String senderId) {
this.senderId = senderId;
this.payload = payload;
}
@SqsListener(QUEUE_NAME)
private void receiveMessage(MessageToProcess message, @Header("ApproximateFirstReceiveTimestamp") String approximateFirstReceiveTimestamp) {
LOG.debug("Received SQS message {}", message);
try {
this.sqsSendingTextWebSocketHandler.broadcastToSessions(new DataWithTimestamp<>(message, approximateFirstReceiveTimestamp));
} catch (IOException e) {
LOG.error("Was not able to push the message to the client.", e);
}
}
@Override
public boolean supportsParameter(MethodParameter parameter) {
return parameter.hasParameterAnnotation(Header.class);
}
@Override
protected NamedValueInfo createNamedValueInfo(MethodParameter parameter) {
Header annot = parameter.getParameterAnnotation(Header.class);
Assert.state(annot != null, "No Header annotation");
return new HeaderNamedValueInfo(annot);
}
private HeaderNamedValueInfo(Header annotation) {
super(annotation.name(), annotation.required(), annotation.defaultValue());
}
@Override
public boolean supportsParameter(MethodParameter parameter) {
return parameter.hasParameterAnnotation(Header.class);
}
@Override
protected NamedValueInfo createNamedValueInfo(MethodParameter parameter) {
Header annot = parameter.getParameterAnnotation(Header.class);
Assert.state(annot != null, "No Header annotation");
return new HeaderNamedValueInfo(annot);
}
private HeaderNamedValueInfo(Header annotation) {
super(annotation.name(), annotation.required(), annotation.defaultValue());
}