下面列出了怎么用org.springframework.messaging.handler.annotation.Payload的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* 接收访问日志
*
* @param access
*/
@RabbitListener(queues = QueueConstants.QUEUE_ACCESS_LOGS)
public void accessLogsQueue(@Payload Map access) {
try {
if (access != null) {
GatewayAccessLogs logs = BeanConvertUtils.mapToObject(access, GatewayAccessLogs.class);
if (logs != null) {
if (logs.getIp() != null) {
logs.setRegion(ipRegionService.getRegion(logs.getIp()));
}
logs.setUseTime(logs.getResponseTime().getTime() - logs.getRequestTime().getTime());
gatewayLogsMapper.insert(logs);
}
}
} catch (Exception e) {
log.error("error:", e);
}
}
/**
*
* spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-1
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=springboot.*
* @param order
* @param channel
* @param headers
* @throws Exception
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",
durable="${spring.rabbitmq.listener.order.queue.durable}"),
exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",
durable="${spring.rabbitmq.listener.order.exchange.durable}",
type= "${spring.rabbitmq.listener.order.exchange.type}",
ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
key = "${spring.rabbitmq.listener.order.key}"
)
)
@RabbitHandler
public void onOrderMessage(@Payload com.bfxy.springboot.entity.Order order,
Channel channel,
@Headers Map<String, Object> headers) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消费端order: " + order.getId());
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}
/**
* 现在删除对象的时候,更新各种数据(如收藏数)
* @param obj
*/
private void updateCounts(@Payload BaseEntity obj) {
try {
// 不用担心冲掉
JPAThreadLocal.setBackground(true);
// 如果是新建了收藏对象,把对应的业务对象的数字增加
if (obj instanceof Favorite) {
log.info("create favorite, update favorite count.");
Favorite favorite = (Favorite) obj;
//
updateFavoriteCount(favorite.getObjType(), favorite.getObjId());
}
} finally {
JPAThreadLocal.setBackground(false);
}
}
@StreamListener(ApplicationProcessChannels.APPLICATION_SUBMITTED)
public void receiveApplicationSubmission(@Payload ApplicationSubmittedEvent applicationSubmittedEvent) {
Applicant firstApplicant = applicationSubmittedEvent.getFirstApplicant();
MonthlyExpenses monthlyExpenses = applicationSubmittedEvent.getHousehold().getMonthlyExpenses();
EarningCapacity earningCapacity = applicationSubmittedEvent.getHousehold().getEarningCapacity();
CreditApplication creditApplication = new CreditApplication.CreditApplicationBuilder(new ApplicationNumber(applicationSubmittedEvent.getApplicationNumber()))
.withApplicant(firstApplicant.getFirstName(),
firstApplicant.getLastName(),
firstApplicant.getAddress().getStreet(),
firstApplicant.getAddress().getPostCode(),
firstApplicant.getAddress().getCity())
.withFinancialSituation(new Money(monthlyExpenses.getCostOfLiving()),
new Money(earningCapacity.getFurtherIncome()),
new Money(monthlyExpenses.getRent()),
new Money(earningCapacity.getSalaryFirstApplicant()))
.build();
scoringApplicationService.scoreApplication(creditApplication);
}
@KafkaListener(topics = "message")
public void process(@Payload Message message, @Headers MessageHeaders headers) throws Exception {
Span span = kafka.startConsumerSpan("process", headers);
try (Scope scope = tracer.scopeManager().activate(span, true)) {
System.out.println("Received message: " + message.message);
if (message.image == null && message.message.trim().startsWith("/giphy")) {
String query = message.message.split("/giphy")[1].trim();
System.out.println("Giphy requested: " + query);
message.image = giphy.query(query);
if (message.image != null) {
kafka.sendMessage(message);
System.out.println("Updated message, url=" + message.image);
}
}
}
}
@RqueueListener(
value = "${notification.queue.name}",
numRetries = "${notification.queue.retry.count}",
active = "${notification.queue.active}")
public void onMessage(
@Payload Notification notification, @Header(RqueueMessageHeaders.ID) String id)
throws Exception {
log.info("Notification: {}, Id: {}", notification, id);
if (failureManager.shouldFail(notification.getId())) {
throw new Exception("Failing notification task to be retried" + notification);
}
consumedMessageService.save(notification);
}
@SuppressWarnings("unused")
private void handleMessage(
@Payload String param,
@Payload(required=false) String paramNotRequired,
@Payload(required=true) Locale nonConvertibleRequiredParam,
@Payload("foo.bar") String paramWithSpelExpression,
@MyValid @Payload String validParam,
@Validated String validParamNotAnnotated,
String paramNotAnnotated) {
}
@Test
public void supportsParameter() {
boolean useDefaultResolution = true;
PayloadMethodArgumentResolver resolver = createResolver(null, useDefaultResolution);
assertTrue(resolver.supportsParameter(this.testMethod.annotPresent(Payload.class).arg()));
assertTrue(resolver.supportsParameter(this.testMethod.annotNotPresent(Payload.class).arg(String.class)));
useDefaultResolution = false;
resolver = createResolver(null, useDefaultResolution);
assertTrue(resolver.supportsParameter(this.testMethod.annotPresent(Payload.class).arg()));
assertFalse(resolver.supportsParameter(this.testMethod.annotNotPresent(Payload.class).arg(String.class)));
}
@StreamListener(
target = Sink.INPUT,
condition = "headers['type']=='TodoCreatedEvent'"
)
public void todoCreated(@Payload TodoCreatedEvent event) throws Exception {
LOG.info("Todo created");
LOG.info("when = " + event.when());
LOG.info("todo = " + event.getTodo().toString());
String uuid = UUID.randomUUID().toString();
Email email = new Email("Alexsandro", "test"+Instant.now().getEpochSecond()+"@gmail.com", EmailState.CREATED);
EmailCreateCommand command = new EmailCreateCommand(uuid, email);
commanderHandler.create(command);
}
@Test
public void string() {
String body = "foo";
MethodParameter param = this.testMethod.annotNotPresent(Payload.class).arg(String.class);
Object value = resolveValue(param, Mono.just(toDataBuffer(body)), null);
assertEquals(body, value);
}
@StreamListener(ScoringChannels.CUSTOMER_CREATED)
public void receiveCustomerCreatedEvent(@Payload CustomerCreatedEvent customerCreatedEvent) {
LOGGER.info("Received Customer Created Event: " + customerCreatedEvent.toString());
Customer customer = restTemplate.getForObject(customerCreatedEvent.getCustomerUrl(), Customer.class);
LOGGER.info("Received Customer from Event: " + customer.toString());
ScoringResult scoringResult = loadOrInitializeScoringResult(customer.getApplicationNumber());
scoringResult.setLegitCity(customer.isCityLegit());
scoringResult.setLastUpdate(new Date());
ScoringResult savedScoringResult = scoringResultRepository.save(scoringResult);
notifyInCaseOfFinalizedScoring(savedScoringResult);
}
@StreamListener(ApplicationProcessChannels.SCORING_POSITIVE)
public void receiveScoringPositiveEvent(@Payload ScoringDoneEvent scoringDoneEvent) {
CreditApplicationStatus status = creditApplicationStatusRepository.findByApplicationNumber(scoringDoneEvent.getApplicationNumber());
status.setScoringResult("ACCEPTABLE");
status.setScoringDoneDate(scoringDoneEvent.getCreationTime());
creditApplicationStatusRepository.save(status);
}
@MessageMapping("/tty/{jobId}/shell")
public void shell(@DestinationVariable String jobId, @Payload String script, MessageHeaders headers) {
TtyCmd.In in = new TtyCmd.In()
.setId(jobId)
.setAction(TtyCmd.Action.SHELL)
.setInput(script);
validate(in, headers);
ttyService.execute(in);
}
@KafkaListener(
topics = "${haydikodlayalim.kafka.topic}",
groupId = "${haydikodlayalim.kafka.group.id}"
)
public void listen(@Payload KMessage message) {
log.info("Message received.. MessageID : {} Message: {} Date : {}",
message.getId(),
message.getMessage(),
message.getMessageDate());
}
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception {
Payload ann = parameter.getParameterAnnotation(Payload.class);
if (ann != null && StringUtils.hasText(ann.expression())) {
throw new IllegalStateException("@Payload SpEL expressions not supported by this resolver");
}
Object payload = message.getPayload();
if (isEmptyPayload(payload)) {
if (ann == null || ann.required()) {
String paramName = getParameterName(parameter);
BindingResult bindingResult = new BeanPropertyBindingResult(payload, paramName);
bindingResult.addError(new ObjectError(paramName, "Payload value must not be empty"));
throw new MethodArgumentNotValidException(message, parameter, bindingResult);
}
else {
return null;
}
}
Class<?> targetClass = parameter.getParameterType();
if (ClassUtils.isAssignable(targetClass, payload.getClass())) {
validate(message, parameter, payload);
return payload;
}
else {
payload = (this.converter instanceof SmartMessageConverter ?
((SmartMessageConverter) this.converter).fromMessage(message, targetClass, parameter) :
this.converter.fromMessage(message, targetClass));
if (payload == null) {
throw new MessageConversionException(message,
"No converter found to convert to " + targetClass + ", message=" + message);
}
validate(message, parameter, payload);
return payload;
}
}
@MessageMapping("/chat.addUser")
public void addUser(@Payload ChatMessage chatMessage, SimpMessageHeaderAccessor headerAccessor) {
LOGGER.info("User added in Chatroom:" + chatMessage.getSender());
try {
headerAccessor.getSessionAttributes().put("username", chatMessage.getSender());
redisTemplate.opsForSet().add(onlineUsers, chatMessage.getSender());
redisTemplate.convertAndSend(userStatus, JsonUtil.parseObjToJson(chatMessage));
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
@MessageMapping("/chat/addUser")
@SendTo(WebSocketConstants.CHAT_TOPIC)
public ChatMessage addUser(@Payload ChatMessage chatMessage,
SimpMessageHeaderAccessor headerAccessor) {
headerAccessor.getSessionAttributes().put("username", chatMessage.getSender());
return chatMessage;
}
@StreamListener(ApplicationProcessChannels.CUSTOMER_CREATED)
public void receiveCustomerCreatedEvent(@Payload CustomerCreatedEvent customerCreatedEvent) {
Customer customer = restTemplate.getForObject(customerCreatedEvent.getCustomerUrl(), Customer.class);
CreditApplicationStatus status = creditApplicationStatusRepository.findByApplicationNumber(customer.getApplicationNumber());
status.setCustomerEntered(customer.getUpdated());
creditApplicationStatusRepository.save(status);
}
@ConnectMapping("shell-client")
void connectShellClientAndAskForTelemetry(RSocketRequester requester,
@Payload String client) {
requester.rsocket()
.onClose()
.doFirst(() -> {
// Add all new clients to a client list
log.info("Client: {} CONNECTED.", client);
CLIENTS.add(requester);
})
.doOnError(error -> {
// Warn when channels are closed by clients
log.warn("Channel to client {} CLOSED", client);
})
.doFinally(consumer -> {
// Remove disconnected clients from the client list
CLIENTS.remove(requester);
log.info("Client {} DISCONNECTED", client);
})
.subscribe();
// Callback to client, confirming connection
requester.route("client-status")
.data("OPEN")
.retrieveFlux(String.class)
.doOnNext(s -> log.info("Client: {} Free Memory: {}.", client, s))
.subscribe();
}
@SuppressWarnings("unused")
private void handleMessage(
@Payload String param,
@Payload(required=false) String paramNotRequired,
@Payload(required=true) Locale nonConvertibleRequiredParam,
@Payload("foo.bar") String paramWithSpelExpression,
@MyValid @Payload String validParam,
@Validated String validParamNotAnnotated,
String paramNotAnnotated) {
}
@SubscribeMapping("/topic/activity")
@SendTo("/topic/tracker")
public ActivityDTO sendActivity(@Payload ActivityDTO activityDTO, StompHeaderAccessor stompHeaderAccessor, Principal principal) {
activityDTO.setUserLogin(SecurityUtils.getCurrentLogin());
activityDTO.setUserLogin(principal.getName());
activityDTO.setSessionId(stompHeaderAccessor.getSessionId());
activityDTO.setIpAddress(stompHeaderAccessor.getSessionAttributes().get(IP_ADDRESS).toString());
activityDTO.setTime(dateTimeFormatter.print(Calendar.getInstance().getTimeInMillis()));
log.debug("Sending user tracking data {}", activityDTO);
return activityDTO;
}
@KafkaListener(topics = "${kafka.topic.batchConsumerTopic}", containerFactory = "kafkaListenerContainerFactoryForBatchConsumer", groupId = "batchConsumer")
public void receive(@Payload List<String> payloads,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Long> partitionIds,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
LOGGER.info("Received group=batchConsumer with batch group data: ");
for (int i = 0; i< payloads.size(); ++i) {
LOGGER.info("---------------- payload='{}' from [email protected]='{}'", payloads.get(i), partitionIds.get(i)+"@"+offsets.get(i));
}
}
@StreamListener(CreditAgencyChannels.APPLICATION_SUBMITTED)
public void receiveApplicationSubmission(@Payload ApplicationSubmittedEvent applicationSubmittedEvent) {
System.out.println("received " + applicationSubmittedEvent.getFirstApplicant().toString());
Applicant applicant = applicationSubmittedEvent.getFirstApplicant();
personRatingQueryService.ratePerson(applicant.getFirstName(),
applicant.getLastName(),
applicant.getAddress().getStreet(),
applicant.getAddress().getPostCode(),
applicant.getAddress().getCity());
}
@MessageMapping("/topic/activity")
@SendTo("/topic/tracker")
public ActivityDTO sendActivity(@Payload ActivityDTO activityDTO, StompHeaderAccessor stompHeaderAccessor, Principal principal) {
activityDTO.setUserLogin(principal.getName());
activityDTO.setSessionId(stompHeaderAccessor.getSessionId());
activityDTO.setIpAddress(stompHeaderAccessor.getSessionAttributes().get(IP_ADDRESS).toString());
activityDTO.setTime(Instant.now());
log.debug("Sending user tracking data {}", activityDTO);
return activityDTO;
}
@RabbitListener(queues = QueueConstants.QUEUE_SCAN_API_RESOURCE)
public void scanApiResourceRabbitListener(@Payload String param) {
SystemApiScanSaveDTO scan = JSONObject.parseObject(param, SystemApiScanSaveDTO.class);
BaseContextHandler.setTenant(scan.getTenant());
this.systemApiService.batchSave(scan);
}
@KafkaListener(topics = "message")
public void process(@Payload Message message, @Headers MessageHeaders headers) throws Exception {
Span span = kafka.startConsumerSpan("process", headers);
try (Scope scope = tracer.scopeManager().activate(span, true)) {
System.out.println("Received message: " + message.message);
redis.addMessage(message);
System.out.println("Added message to room.");
}
}
@MessageMapping("/all")
@SendTo("/topic/all")
public Map<String, String> post(@Payload Map<String, String> message) {
message.put("timestamp", Long.toString(System.currentTimeMillis()));
chatHistoryDao.save(message);
return message;
}
public void resolveCustomHeaderNameAndPayloadWithHeaderNameSet(@Payload String content, @Header(name = "myCounter") int counter) {
invocations.put("resolveCustomHeaderNameAndPayloadWithHeaderNameSet", true);
assertEquals("Wrong @Payload resolution", "my payload", content);
assertEquals("Wrong @Header resolution", 24, counter);
}
public void process(@Payload EmailMessage message) {
System.out.println("Received <" + message.getToAddress() + ">");
}
public void sendMsg(@Payload ChatMessage chatMessage) {
LOGGER.info("Send msg by simpMessageSendingOperations:" + chatMessage.toString());
simpMessageSendingOperations.convertAndSend("/topic/public", chatMessage);
}