下面列出了怎么用com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void serialize(ByteBuffer bbuf, JsonGenerator gen, SerializerProvider provider) throws IOException
{
// first, simple case when wrapping an array...
if (bbuf.hasArray()) {
gen.writeBinary(bbuf.array(), 0, bbuf.limit());
return;
}
// the other case is more complicated however. Best to handle with InputStream wrapper.
// But should we rewind it; and/or make a copy?
ByteBuffer copy = bbuf.asReadOnlyBuffer();
if (copy.position() > 0) {
copy.rewind();
}
InputStream in = new ByteBufferBackedInputStream(copy);
gen.writeBinary(in, copy.remaining());
in.close();
}
@Test
public void shouldSendMessage() throws Exception {
// given
final Message<ExampleJsonObject> message = message("someKey", new ExampleJsonObject("banana"));
when(kinesisClient.putRecords(any(PutRecordsRequest.class))).thenReturn(completedFuture(PutRecordsResponse.builder()
.failedRecordCount(0)
.records(PutRecordsResultEntry.builder().build())
.build()));
// when
kinesisMessageSender.send(message).join();
// then
verify(kinesisClient).putRecords(putRecordsRequestCaptor.capture());
final PutRecordsRequest caputuredRequest = putRecordsRequestCaptor.getValue();
assertThat(caputuredRequest.streamName(), is("test"));
assertThat(caputuredRequest.records(), hasSize(1));
assertThat(caputuredRequest.records().get(0).partitionKey(), is("someKey"));
final ByteBufferBackedInputStream inputStream = new ByteBufferBackedInputStream(caputuredRequest.records().get(0).data().asByteBuffer());
final JsonNode json = currentObjectMapper().readTree(inputStream);
assertThat(json.get(MessageFormat.SYNAPSE_MSG_FORMAT).asText(), is("v2"));
}
@Test
public void shouldSendMessageUsingPartitionKey() throws Exception {
// given
final Message<ExampleJsonObject> message = message(Key.of("somePartitionKey", "someCompactionKey"), new ExampleJsonObject("banana"));
when(kinesisClient.putRecords(any(PutRecordsRequest.class))).thenReturn(completedFuture(PutRecordsResponse.builder()
.failedRecordCount(0)
.records(PutRecordsResultEntry.builder().build())
.build()));
// when
kinesisMessageSender.send(message).join();
// then
verify(kinesisClient).putRecords(putRecordsRequestCaptor.capture());
final PutRecordsRequest caputuredRequest = putRecordsRequestCaptor.getValue();
assertThat(caputuredRequest.records().get(0).partitionKey(), is("somePartitionKey"));
final ByteBufferBackedInputStream inputStream = new ByteBufferBackedInputStream(caputuredRequest.records().get(0).data().asByteBuffer());
final JsonNode json = currentObjectMapper().readTree(inputStream);
assertThat(json.get(MessageFormat.SYNAPSE_MSG_KEY).get(MessageFormat.SYNAPSE_MSG_PARTITIONKEY).textValue(), is("somePartitionKey"));
assertThat(json.get(MessageFormat.SYNAPSE_MSG_KEY).get(MessageFormat.SYNAPSE_MSG_COMPACTIONKEY).textValue(), is("someCompactionKey"));
}
@Test
public void shouldSendDeleteMessage() throws IOException {
// given
when(kinesisClient.putRecords(any(PutRecordsRequest.class))).thenReturn(completedFuture(PutRecordsResponse.builder()
.failedRecordCount(0)
.records(PutRecordsResultEntry.builder().build())
.build()));
//when
kinesisMessageSender.send(message("someKey", null)).join();
//then
verify(kinesisClient).putRecords(putRecordsRequestCaptor.capture());
final PutRecordsRequest caputuredRequest = putRecordsRequestCaptor.getValue();
final ByteBufferBackedInputStream inputStream = new ByteBufferBackedInputStream(caputuredRequest.records().get(0).data().asByteBuffer());
final JsonNode json = currentObjectMapper().readTree(inputStream);
assertThat(json.get(SYNAPSE_MSG_PAYLOAD).textValue(), is(nullValue()));
}
@Test
public void shouldSendEvent() throws Exception {
// given
final Message<ExampleJsonObject> message = message("someKey", new ExampleJsonObject("banana"));
when(kinesisClient.putRecords(any(PutRecordsRequest.class))).thenReturn(completedFuture(PutRecordsResponse.builder()
.failedRecordCount(0)
.records(PutRecordsResultEntry.builder().build())
.build()));
// when
kinesisMessageSender.send(message).join();
// then
verify(kinesisClient).putRecords(putRecordsRequestCaptor.capture());
final PutRecordsRequest caputuredRequest = putRecordsRequestCaptor.getValue();
assertThat(caputuredRequest.streamName(), is("test"));
assertThat(caputuredRequest.records(), hasSize(1));
assertThat(caputuredRequest.records().get(0).partitionKey(), is("someKey"));
final ByteBufferBackedInputStream inputStream = new ByteBufferBackedInputStream(caputuredRequest.records().get(0).data().asByteBuffer());
ExampleJsonObject jsonObject = currentObjectMapper().readValue(inputStream, ExampleJsonObject.class);
assertThat(jsonObject.value, is("banana"));
}
@Override
public List<CSVReportEntry> apply(final GetCredentialReportResult report) {
Assert.state(Textcsv.toString().equals(report.getReportFormat()), "unknown credential report format: " + report.getReportFormat());
try (final Reader r = new BufferedReader(new InputStreamReader(new ByteBufferBackedInputStream(report.getContent())))) {
final CSVParser parser = new CSVParser(r, CSV_FORMAT);
final Map<String, Integer> headers = parser.getHeaderMap();
Assert.state(headers.containsKey("user"), "Header 'user' not found in CSV");
Assert.state(headers.containsKey("arn"), "Header 'arn' not found in CSV");
Assert.state(headers.containsKey("password_enabled"), "Header 'password_enabled' not found in CSV");
Assert.state(headers.containsKey("mfa_active"), "Header 'mfa_active' not found in CSV");
Assert.state(headers.containsKey("access_key_1_active"), "Header 'access_key_1_active' not found in CSV");
Assert.state(headers.containsKey("access_key_2_active"), "Header 'access_key_2_active' not found in CSV");
return stream(parser.spliterator(), false).map(this::toCSVReportEntry).filter(Objects::nonNull).collect(toList());
} catch (final IOException e) {
throw new RuntimeException("Could not read csv report", e);
}
}
public void send(String bucket, String key, ByteBuffer dataBuffer)
throws IOException
{
File file = File.createTempFile("tmp-fluency-", ".tmp");
try {
try (InputStream in = new ByteBufferBackedInputStream(dataBuffer);
OutputStream fout = Files.newOutputStream(file.toPath(), StandardOpenOption.WRITE);
OutputStream out = config.isCompressionEnabled() ? new GZIPOutputStream(fout) : fout) {
copyStreams(in, out);
}
Failsafe.with(retryPolicy).run(() -> uploadData(bucket, key, file));
}
finally {
if (!file.delete()) {
LOG.warn("Failed to delete a temp file: {}", file.getAbsolutePath());
}
}
}
/**
* Deserializes the given message.
*
* @param action
* @return
* @throws Exception
*/
public T deserialize(WebSocketAction action) throws Exception {
// first deserialize
T message = null;
if (messageClass != null) {
message = serDe.deserialize(new ByteBufferBackedInputStream(rawData), messageClass);
}
// then validate
if (message != null && action.shouldValidatePayload()) {
SpringValidatorAdapter validatorAdapter = new SpringValidatorAdapter(messageValidator);
BeanPropertyBindingResult result = new BeanPropertyBindingResult(message, messageClass.getName());
validatorAdapter.validate(message, result);
if (result.hasErrors()) {
throw new MethodArgumentNotValidException(new MethodParameter(action.getMethod(), action.getPayloadParameterIndex()), result);
}
}
return message;
}
@Before
public void setUpCloudEventsTest() throws Exception {
processorPluginMock.getPreProcessors().add(envelope -> {
var rawValue = envelope.getRawValue();
if (rawValue instanceof ByteBuffer) {
var byteBuffer = (ByteBuffer) rawValue;
try {
var map = MAPPER.readValue(new ByteBufferBackedInputStream(byteBuffer.duplicate()), Map.class);
var eventType = (String) map.remove("eventType");
return CompletableFuture.completedFuture(
envelope.withValue(
new LiiklusCloudEvent(
(String) map.remove("eventId"),
"com.example.legacy." + eventType.replace("/", ".").toLowerCase(),
"/tests/upcaster",
"application/json",
null,
ByteBuffer.wrap(MAPPER.writeValueAsBytes(map)).asReadOnlyBuffer(),
Collections.emptyMap()
),
LiiklusCloudEvent::asJson
)
);
} catch (IOException e) {
return CompletableFuture.failedFuture(e);
}
}
return CompletableFuture.completedFuture(envelope);
});
}
@Test
public void shouldSendMessageHeaders() throws Exception {
// given
final Message<ExampleJsonObject> message = message("someKey", Header.of(of("attr-of", "attr-value")), null);
when(kinesisClient.putRecords(any(PutRecordsRequest.class))).thenReturn(completedFuture(PutRecordsResponse.builder()
.failedRecordCount(0)
.records(PutRecordsResultEntry.builder().build())
.build()));
// when
kinesisMessageSender.send(message).join();
// then
verify(kinesisClient).putRecords(putRecordsRequestCaptor.capture());
final PutRecordsRequest caputuredRequest = putRecordsRequestCaptor.getValue();
assertThat(caputuredRequest.streamName(), is("test"));
assertThat(caputuredRequest.records(), hasSize(1));
assertThat(caputuredRequest.records().get(0).partitionKey(), is("someKey"));
final ByteBufferBackedInputStream inputStream = new ByteBufferBackedInputStream(caputuredRequest.records().get(0).data().asByteBuffer());
final JsonNode json = currentObjectMapper().readTree(inputStream);
assertThat(currentObjectMapper().convertValue(json.get(SYNAPSE_MSG_HEADERS), Map.class), is(ImmutableMap.of("attr-of", "attr-value")));
}
@Test
public void shouldSendMessagePayload() throws Exception {
// given
final Message<ExampleJsonObject> message = message("someKey", new ExampleJsonObject("banana"));
when(kinesisClient.putRecords(any(PutRecordsRequest.class))).thenReturn(completedFuture(PutRecordsResponse.builder()
.failedRecordCount(0)
.records(PutRecordsResultEntry.builder().build())
.build()));
// when
kinesisMessageSender.send(message).join();
// then
verify(kinesisClient).putRecords(putRecordsRequestCaptor.capture());
final PutRecordsRequest caputuredRequest = putRecordsRequestCaptor.getValue();
assertThat(caputuredRequest.streamName(), is("test"));
assertThat(caputuredRequest.records(), hasSize(1));
assertThat(caputuredRequest.records().get(0).partitionKey(), is("someKey"));
final ByteBufferBackedInputStream inputStream = new ByteBufferBackedInputStream(caputuredRequest.records().get(0).data().asByteBuffer());
final JsonNode json = currentObjectMapper().readTree(inputStream);
final ExampleJsonObject jsonObject = currentObjectMapper().convertValue(json.get(SYNAPSE_MSG_PAYLOAD), ExampleJsonObject.class);
assertThat(jsonObject.value, is("banana"));
}
@Test
public void shouldSendBatch() throws Exception {
// given
ExampleJsonObject bananaObject = new ExampleJsonObject("banana");
ExampleJsonObject appleObject = new ExampleJsonObject("apple");
when(kinesisClient.putRecords(any(PutRecordsRequest.class))).thenReturn(completedFuture(PutRecordsResponse.builder()
.failedRecordCount(0)
.records(PutRecordsResultEntry.builder().build())
.build()));
// when
kinesisMessageSender.sendBatch(Stream.of(
message("b", bananaObject),
message("a", appleObject)
));
// then
verify(kinesisClient).putRecords(putRecordsRequestCaptor.capture());
final PutRecordsRequest caputuredRequest = putRecordsRequestCaptor.getValue();
assertThat(caputuredRequest.streamName(), is("test"));
assertThat(caputuredRequest.records(), hasSize(2));
final PutRecordsRequestEntry firstEntry = caputuredRequest.records().get(0);
assertThat(firstEntry.partitionKey(), is("b"));
assertThat(currentObjectMapper().readValue(
new ByteBufferBackedInputStream(firstEntry.data().asByteBuffer()), Map.class).get(SYNAPSE_MSG_PAYLOAD),
is(singletonMap("value", "banana")));
final PutRecordsRequestEntry secondEntry = caputuredRequest.records().get(1);
assertThat(secondEntry.partitionKey(), is("a"));
assertThat(currentObjectMapper().readValue(
new ByteBufferBackedInputStream(secondEntry.data().asByteBuffer()), Map.class).get(SYNAPSE_MSG_PAYLOAD),
is(singletonMap("value", "apple")));
}
@Test
public void shouldInterceptMessagesInBatch() throws Exception {
// given
ExampleJsonObject first = new ExampleJsonObject("x");
ExampleJsonObject second = new ExampleJsonObject("x");
when(kinesisClient.putRecords(any(PutRecordsRequest.class))).thenReturn(completedFuture(PutRecordsResponse.builder()
.failedRecordCount(0)
.records(PutRecordsResultEntry.builder().build())
.build()));
interceptorRegistry.register(senderChannelsWith(m -> TextMessage.of(m.getKey(), "{\"m\":\"Lovely day for a Guinness\"}")));
// when
kinesisMessageSender.sendBatch(Stream.of(
message("b", first),
message("a", second)
));
// then
verify(kinesisClient).putRecords(putRecordsRequestCaptor.capture());
final PutRecordsRequest caputuredRequest = putRecordsRequestCaptor.getValue();
assertThat(caputuredRequest.streamName(), is("test"));
assertThat(caputuredRequest.records(), hasSize(2));
final PutRecordsRequestEntry firstEntry = caputuredRequest.records().get(0);
assertThat(firstEntry.partitionKey(), is("b"));
assertThat(currentObjectMapper().readValue(
new ByteBufferBackedInputStream(firstEntry.data().asByteBuffer()), Map.class).get(SYNAPSE_MSG_PAYLOAD),
is(singletonMap("m", "Lovely day for a Guinness")));
final PutRecordsRequestEntry secondEntry = caputuredRequest.records().get(1);
assertThat(secondEntry.partitionKey(), is("a"));
assertThat(currentObjectMapper().readValue(
new ByteBufferBackedInputStream(secondEntry.data().asByteBuffer()), Map.class).get(SYNAPSE_MSG_PAYLOAD),
is(singletonMap("m", "Lovely day for a Guinness")));
}
@Test
public void shouldInterceptMessages() throws IOException {
// given
final Message<ExampleJsonObject> message = message("someKey", new ExampleJsonObject("banana"));
when(kinesisClient.putRecords(any(PutRecordsRequest.class))).thenReturn(completedFuture(PutRecordsResponse.builder()
.failedRecordCount(0)
.records(PutRecordsResultEntry.builder().build())
.build())
);
// and especially
interceptorRegistry.register(matchingSenderChannelsWith(
"test",
(m) -> TextMessage.of(m.getKey(), m.getHeader(), "{\"value\" : \"apple\"}"))
);
// when
kinesisMessageSender.send(message).join();
// then
verify(kinesisClient).putRecords(putRecordsRequestCaptor.capture());
final PutRecordsRequest caputuredRequest = putRecordsRequestCaptor.getValue();
final ByteBufferBackedInputStream inputStream = new ByteBufferBackedInputStream(caputuredRequest.records().get(0).data().asByteBuffer());
ExampleJsonObject jsonObject = currentObjectMapper().readValue(inputStream, ExampleJsonObject.class);
assertThat(jsonObject.value, is("apple"));
}
@Test
public void shouldInterceptMessagesInBatch() throws Exception {
// given
ExampleJsonObject bananaObject = new ExampleJsonObject("banana");
ExampleJsonObject appleObject = new ExampleJsonObject("apple");
when(kinesisClient.putRecords(any(PutRecordsRequest.class))).thenReturn(completedFuture(PutRecordsResponse.builder()
.failedRecordCount(0)
.records(PutRecordsResultEntry.builder().build())
.build()));
// when
kinesisMessageSender.sendBatch(Stream.of(
message("b", bananaObject),
message("a", appleObject)
));
// then
verify(kinesisClient).putRecords(putRecordsRequestCaptor.capture());
final PutRecordsRequest caputuredRequest = putRecordsRequestCaptor.getValue();
assertThat(caputuredRequest.streamName(), is("test"));
assertThat(caputuredRequest.records(), hasSize(2));
final PutRecordsRequestEntry firstEntry = caputuredRequest.records().get(0);
assertThat(firstEntry.partitionKey(), is("b"));
assertThat(currentObjectMapper().readValue(
new ByteBufferBackedInputStream(firstEntry.data().asByteBuffer()), ExampleJsonObject.class).value,
is("banana"));
final PutRecordsRequestEntry secondEntry = caputuredRequest.records().get(1);
assertThat(secondEntry.partitionKey(), is("a"));
assertThat(currentObjectMapper().readValue(
new ByteBufferBackedInputStream(secondEntry.data().asByteBuffer()), ExampleJsonObject.class).value,
is("apple"));
}
public void send(String dbAndTableTag, ByteBuffer dataBuffer)
throws IOException
{
String[] dbAndTable = dbAndTableTag.split("\\.");
// TODO: Validation
String database = dbAndTable[0];
String table = dbAndTable[1];
File file = File.createTempFile("tmp-fluency-", ".msgpack.gz");
try {
try (InputStream in = new ByteBufferBackedInputStream(dataBuffer);
OutputStream out = new GZIPOutputStream(
Files.newOutputStream(
file.toPath(),
StandardOpenOption.WRITE))) {
copyStreams(in, out);
}
String uniqueId = UUID.randomUUID().toString();
Failsafe.with(retryPolicy).run(() -> importData(database, table, uniqueId, file));
}
finally {
if (!file.delete()) {
LOG.warn("Failed to delete a temp file: {}", file.getAbsolutePath());
}
}
}
@Override
public T decode(ByteBuffer bytes) throws DecodeException {
try {
return mapper.readValue(new ByteBufferBackedInputStream(bytes), objectClass);
} catch (IOException e) {
throw new DecodeException(bytes, "decode json error", e);
}
}
@Override
public boolean willDecode(ByteBuffer bytes) {
try {
mapper.readTree(new ByteBufferBackedInputStream(bytes));
} catch (IOException e) {
logger.warn("invalidate json", e);
return false;
}
return true;
}
@Test
public void shouldSendBatch() throws Exception {
// given
ExampleJsonObject bananaObject = new ExampleJsonObject("banana");
ExampleJsonObject appleObject = new ExampleJsonObject("apple");
when(kinesisClient.putRecords(any(PutRecordsRequest.class))).thenReturn(completedFuture(PutRecordsResponse.builder()
.failedRecordCount(0)
.records(PutRecordsResultEntry.builder().build())
.build()));
// and especially
interceptorRegistry.register(matchingSenderChannelsWith(
"test",
(m) -> TextMessage.of(m.getKey(), m.getHeader(), "{\"value\" : \"Lovely day for a Guinness\"}"))
);
// when
kinesisMessageSender.sendBatch(Stream.of(
message("b", bananaObject),
message("a", appleObject)
));
// then
verify(kinesisClient).putRecords(putRecordsRequestCaptor.capture());
final PutRecordsRequest caputuredRequest = putRecordsRequestCaptor.getValue();
assertThat(caputuredRequest.streamName(), is("test"));
assertThat(caputuredRequest.records(), hasSize(2));
final PutRecordsRequestEntry firstEntry = caputuredRequest.records().get(0);
assertThat(firstEntry.partitionKey(), is("b"));
assertThat(currentObjectMapper().readValue(
new ByteBufferBackedInputStream(firstEntry.data().asByteBuffer()), ExampleJsonObject.class).value,
is("Lovely day for a Guinness"));
final PutRecordsRequestEntry secondEntry = caputuredRequest.records().get(1);
assertThat(secondEntry.partitionKey(), is("a"));
assertThat(currentObjectMapper().readValue(
new ByteBufferBackedInputStream(secondEntry.data().asByteBuffer()), ExampleJsonObject.class).value,
is("Lovely day for a Guinness"));
}