下面列出了com.fasterxml.jackson.databind.node.BaseJsonNode#org.springframework.tuple.Tuple 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@StreamListener( Sink.INPUT )
public void processNotification( final String json ) {
log.debug( "processNotification : enter" );
Tuple event = TupleBuilder.fromString( json );
Assert.hasText( event.getString( "eventType" ), "eventType not set" );
Assert.hasText( event.getString( "boardUuid" ), "boardUuid not set" );
Assert.hasText( event.getString( "occurredOn" ), "occurredOn not set" );
String eventType = event.getString( "eventType" );
if( eventType.equals( "BoardInitialized" ) ) {
log.debug( "processNotification : exit, no board should exist in cache if 'BoardInitialized' event is received" );
return;
}
this.service.uncacheTarget( UUID.fromString( event.getString( "boardUuid" ) ) );
log.debug( "processNotification : exit" );
}
private BaseJsonNode toNode(Object value) {
if (value != null) {
if (value instanceof Tuple) {
return toObjectNode((Tuple) value);
}
else if (value instanceof List<?>) {
return toArrayNode((List<?>) value);
}
else if (!value.getClass().isPrimitive()) {
return mapper.getNodeFactory().pojoNode(value);
}
else {
return mapper.valueToTree(value);
}
}
return null;
}
@Transactional
public void processDomainEvent( final Tuple event ) {
log.debug( "processDomainEvent : enter" );
log.debug( "processDomainEvent : event[{}] ", event );
String eventType = event.getString( "eventType" );
switch ( eventType ) {
case "BoardInitialized":
processBoardInitialized( event );
break;
default:
processBoardEvent( event );
break;
}
log.debug( "processDomainEvent : calling publisher.sendNotification( event )" );
this.publisher.sendNotification( event );
log.debug( "processDomainEvent : exit" );
}
public static Tuple toTuple(Tensor tensor) {
ByteBuffer buffer = ByteBuffer.allocate(tensor.numBytes());
tensor.writeTo(buffer);
// Retrieve all bytes in the buffer
buffer.clear();
byte[] bytes = new byte[buffer.capacity()];
buffer.get(bytes, 0, bytes.length);
return TupleBuilder.tuple()
.put(TF_DATA_TYPE, tensor.dataType().name())
.put(TF_SHAPE, tensor.shape())
.put(TF_VALUE, bytes)
.build();
}
@Test
public void longArray() {
long[][] inLongArray = new long[2][2];
inLongArray[0][0] = 0;
inLongArray[0][1] = 1;
inLongArray[1][0] = 2;
inLongArray[1][1] = 3;
Tensor inTensor = Tensor.create(inLongArray);
Tuple tuple = TensorTupleConverter.toTuple(inTensor);
Tensor outTensor = TensorTupleConverter.toTensor(tuple);
long[][] outLongArray = new long[2][2];
outLongArray = outTensor.copyTo(outLongArray);
compareTensors(inTensor, outTensor);
assertArrayEquals(inLongArray, outLongArray);
}
private MutableMessage<?> convertToMutable(Message<?> input) throws Exception{
Object payload = input.getPayload();
if (payload instanceof Tuple && !(payload instanceof MutableTuple)) {
payload = TupleBuilder.mutableTuple().putAll((Tuple) payload).build();
}
else if (payload instanceof String){
String strPayload = input.getPayload().toString();
Iterator<Entry<String, Object>> objects = new ObjectMapper().readValue(strPayload, Map.class).entrySet().iterator();
TupleBuilder tuples = TupleBuilder.mutableTuple();
while (objects.hasNext()){
Entry<String,Object> entry = objects.next();
tuples.put(entry.getKey(), entry.getValue());
}
payload = tuples.build();
}
return new MutableMessage<>(payload, input.getHeaders());
}
@ServiceActivator(inputChannel=Sink.INPUT)
public void process(Message<?> message) {
Object payload = message.getPayload();
if (payload instanceof String) {
try {
payload = jsonToTupleTransformer.transformPayload(payload.toString());
}
catch (Exception e) {
throw new MessageTransformationException(message, e.getMessage(), e);
}
}
if (payload instanceof Tuple) {
processTuple(computeMetricName(message), (Tuple) payload);
}
else {
processPojo(computeMetricName(message), payload);
}
}
@Test
public void testInsertion() {
Tuple tupleA = TupleBuilder.tuple().of("a", "hello1", "b", 42);
Tuple tupleB = TupleBuilder.tuple().of("a", "hello2", "b", null);
Tuple tupleC = TupleBuilder.tuple().of("a", "hello3");
channels.input().send(MessageBuilder.withPayload(tupleA).build());
channels.input().send(MessageBuilder.withPayload(tupleB).build());
channels.input().send(MessageBuilder.withPayload(tupleC).build());
Assert.assertThat(jdbcOperations.queryForObject(
"select count(*) from messages where a = ? and b = ?",
Integer.class, tupleA.getString("a"), tupleA.getInt("b")), is(1));
Assert.assertThat(jdbcOperations.queryForObject(
"select count(*) from messages where a = ? and b IS NULL",
Integer.class, tupleB.getString("a")), is(1));
Assert.assertThat(jdbcOperations.queryForObject(
"select count(*) from messages where a = ? and b IS NULL",
Integer.class, tupleC.getString("a")), is(1));
}
@Publisher( channel = Source.OUTPUT )
public Message<String> sendNotification( Tuple event ) {
String payload = converter.convert( event );
return MessageBuilder
.withPayload( payload )
.setHeader( "x-delay", 1000 )
.build();
}
@Override
public String convert( Tuple source ) {
ObjectNode root = toObjectNode(source);
String json = null;
try {
json = mapper.writeValueAsString(root);
}
catch (Exception e) {
throw new IllegalArgumentException("Tuple to string conversion failed", e);
}
return json;
}
private ObjectNode toObjectNode(Tuple source) {
ObjectNode root = mapper.createObjectNode();
for (int i = 0; i < source.size(); i++) {
Object value = source.getValues().get(i);
String name = source.getFieldNames().get(i);
if (value == null) {
root.putNull(name);
} else {
root.putPOJO(name, toNode(value));
}
}
return root;
}
@Override
public Tuple convert(byte[] source) {
if (source == null) {
return null;
}
try {
return jsonNodeToTupleConverter.convert( mapper.readTree( source ) );
}
catch (Exception e) {
throw new IllegalArgumentException(e.getMessage(), e);
}
}
@PostMapping( "/" )
public ResponseEntity saveEvent( @RequestBody String json ) {
Tuple event = TupleBuilder.fromString( json );
Assert.isTrue( event.hasFieldName( "eventType" ), "eventType is required" );
Assert.isTrue( event.hasFieldName( "boardUuid" ), "boardUuid is required" );
Assert.isTrue( event.hasFieldName( "occurredOn" ), "occurredOn is required" );
this.service.processDomainEvent( event );
return ResponseEntity
.accepted()
.build();
}
@Override
public GatewayFilter apply(Tuple args) {
return (exchange, chain) -> exchange.getSession()
.map(webSession -> {
log.debug("Session id: " + webSession.getId());
webSession.getAttributes().entrySet()
.forEach(entry ->
log.debug(entry.getKey() + " => " +
entry.getValue()));
return webSession;
})
.map(WebSession::save)
.then(chain.filter(exchange));
}
@Override
public GatewayFilter apply(Tuple args) {
return (exchange, chain) -> exchange.getSession()
.map(webSession -> {
log.debug("Session id: " + webSession.getId());
webSession.getAttributes().entrySet()
.forEach(entry ->
log.debug(entry.getKey() + " => " +
entry.getValue()));
return webSession;
})
.map(WebSession::save)
.then(chain.filter(exchange));
}
@Bean
@ConditionalOnMissingBean(name = "tensorflowOutputConverter")
public TensorflowOutputConverter tensorflowOutputConverter() {
// Default implementations serializes the Tensor into Tuple
return new TensorflowOutputConverter<Tuple>() {
@Override
public Tuple convert(Tensor tensor, Map<String, Object> processorContext) {
return TensorTupleConverter.toTuple(tensor);
}
};
}
private Tensor toFeedTensor(Object value) {
if (value instanceof Tensor) {
return (Tensor) value;
}
else if (value instanceof Tuple) {
return TensorTupleConverter.toTensor((Tuple) value);
}
return Tensor.create(value);
}
public static Tensor toTensor(Tuple tuple) {
DataType dataType = DataType.valueOf(tuple.getString(TF_DATA_TYPE));
long[] shape = (long[]) tuple.getValue(TF_SHAPE);
byte[] bytes = (byte[]) tuple.getValue(TF_VALUE);
return Tensor.create(dataType, shape, ByteBuffer.wrap(bytes));
}
@Test(expected = MessageHandlingException.class)
public void testEvaluationIncorrectTupleInput() {
Tuple incompleteInputTuple = TupleBuilder.tuple()
// missing data type
.put(TF_SHAPE, new long[0])
.put(TF_VALUE, new byte[0])
.build();
testEvaluation(incompleteInputTuple);
}
private void processValueForCounter(String counterName, Object value, String[] path) {
String key = path[0];
Object result = null;
if (value instanceof List) {
for (Object item : (List<?>) value) {
processValueForCounter(counterName, item, path);
}
}
else if (value instanceof Tuple) {
Tuple t = (Tuple) value;
if (t.hasFieldName(key)) {
result = t.getValue(key);
}
}
else if (value instanceof Map) {
result = ((Map<?, ?>) value).get(key);
}
if (result != null) {
if (path.length == 1) {
processValue(counterName, result);
}
else {
path = Arrays.copyOfRange(path, 1, path.length);
processValueForCounter(counterName, result, path);
}
}
}
private MutableMessage<?> convertToMutable(Message<?> input) {
Object payload = input.getPayload();
if (payload instanceof Tuple && !(payload instanceof MutableTuple)) {
payload = TupleBuilder.mutableTuple().putAll((Tuple) payload).build();
}
return new MutableMessage<>(payload, input.getHeaders());
}
private void processBoardInitialized( final Tuple event ) {
log.debug( "processBoardInitialized : enter " );
String boardUuid = event.getString( "boardUuid" );
DomainEventsEntity domainEventsEntity = new DomainEventsEntity( boardUuid );
DomainEventEntity domainEventEntity = new DomainEventEntity();
domainEventEntity.setId( UUID.randomUUID().toString() );
Instant occurredOn = Instant.parse( event.getString( "occurredOn" ) );
domainEventEntity.setOccurredOn( LocalDateTime.ofInstant( occurredOn, ZoneOffset.UTC ) );
domainEventEntity.setData( this.toJsonStringConverter.convert( event ) );
domainEventsEntity.getDomainEvents().add( domainEventEntity );
this.domainEventsRepository.save( domainEventsEntity );
}
private void processBoardEvent( final Tuple event ) {
log.debug( "processBoardEvent : enter " );
String boardUuid = event.getString( "boardUuid" );
this.domainEventsRepository.findById( boardUuid )
.ifPresent( found -> {
log.debug( "processBoardEvent : a DomainEventsEntity[{}] was found for boardUuid[{}]. ", found, boardUuid );
DomainEventEntity domainEventEntity = new DomainEventEntity();
domainEventEntity.setId( UUID.randomUUID().toString() );
Instant occurredOn = Instant.parse( event.getString( "occurredOn" ) );
domainEventEntity.setOccurredOn( LocalDateTime.ofInstant( occurredOn, ZoneOffset.UTC ) );
domainEventEntity.setData( toJsonStringConverter.convert( event ) );
found.getDomainEvents().add( domainEventEntity );
this.domainEventsRepository.save( found );
});
}
private void processTuple(String counterName, Tuple tuple) {
String[] path = StringUtils.tokenizeToStringArray(fvcSinkProperties.getFieldName(), ".");
processValueForCounter(counterName, tuple, path);
}
public Tuple getTest() {
return this.test;
}
public void setTest(Tuple test) {
this.test = test;
}
@Test
public void testProcessBoardInitializedEvent() throws Exception {
this.service.processDomainEvent( TupleBuilder.fromString( BOARD_INITIALIZED_EVENT ) );
verify( this.repository, times( 1 ) ).save( any( DomainEventsEntity.class ) );
verify( this.notificationPublisher, times( 1 ) ).sendNotification( any( Tuple.class ) );
}
@Test
public void testProcessBoardRenamedEvent() throws Exception {
DomainEventsEntity domainEventsEntity = createDomainEventsEntity();
when( this.repository.findById( anyString() ) ).thenReturn( Optional.of( domainEventsEntity ) );
this.service.processDomainEvent( TupleBuilder.fromString( BOARD_RENAMED_EVENT ) );
verify( this.repository, times( 1 ) ).findById( anyString() );
verify( this.repository, times( 1 ) ).save( any( DomainEventsEntity.class ) );
verify( this.notificationPublisher, times( 1 ) ).sendNotification( any( Tuple.class ) );
}
@Test
public void testSendNotification() throws Exception {
BlockingQueue<Message<?>> messages = collector.forChannel( source.output() );
Tuple event = TupleBuilder.fromString( BOARD_INITIALIZED_EVENT );
this.notificationPublisher.sendNotification( event );
assertThat( messages, receivesPayloadThat( is( BOARD_INITIALIZED_EVENT ) ) );
}
@Test
public void testSaveEvents() throws Exception {
this.mockMvc.perform( post( "/" ).content( BOARD_INITIALIZED_EVENT ) )
.andDo( print() )
.andExpect( status().isAccepted() );
verify( this.service, times( 1 ) ).processDomainEvent( any( Tuple.class ) );
}