下面列出了怎么用org.apache.kafka.common.header.internals.RecordHeader的API类实例代码及写法,或者点击链接到github查看源代码。
private static Header[] getHeadersFromMetadata(List<KeyValue> properties) {
Header[] headers = new Header[properties.size()];
if (log.isDebugEnabled()) {
log.debug("getHeadersFromMetadata. Header size: {}",
properties.size());
}
int index = 0;
for (KeyValue kv: properties) {
headers[index] = new RecordHeader(kv.getKey(), kv.getValue().getBytes(UTF_8));
if (log.isDebugEnabled()) {
log.debug("index: {} kv.getKey: {}. kv.getValue: {}",
index, kv.getKey(), kv.getValue());
}
index++;
}
return headers;
}
@Test
public void typical() throws Exception {
when(config.getTopic()).thenReturn(topic);
when(consumer.partitionsFor(topic)).thenReturn(List.of(partitionInfo));
when(consumer.beginningOffsets(topicPartitions)).thenReturn(Map.of(topicPartition, 0L));
when(consumer.endOffsets(topicPartitions)).thenReturn(Map.of(topicPartition, 0L));
when(consumer.poll(Duration.ofMillis(100))).thenReturn(new ConsumerRecords<>(Map.of(topicPartition, List.of(record))));
when(record.key()).thenReturn(avroKey);
when(record.value()).thenReturn(avroValue);
when(converter.toModel(avroKey, avroValue)).thenReturn(event);
when(record.headers()).thenReturn(new RecordHeaders(List.of(new RecordHeader(CORRELATION_ID, "foo".getBytes(UTF_8)))));
underTest.receive(listener);
Thread.sleep(100L);
underTest.close();
var inOrder = Mockito.inOrder(consumer, listener, correlator);
inOrder.verify(consumer).assign(topicPartitions);
inOrder.verify(consumer).seekToBeginning(topicPartitions);
inOrder.verify(listener).onEvent(LOAD_COMPLETE);
inOrder.verify(listener).onEvent(event);
inOrder.verify(correlator).received("foo");
}
@Test
public void listenerThrowsException() throws Exception {
when(config.getTopic()).thenReturn(topic);
when(consumer.partitionsFor(topic)).thenReturn(List.of(partitionInfo));
when(consumer.beginningOffsets(topicPartitions)).thenReturn(Map.of(topicPartition, 0L));
when(consumer.endOffsets(topicPartitions)).thenReturn(Map.of(topicPartition, 0L));
when(consumer.poll(Duration.ofMillis(100))).thenReturn(new ConsumerRecords<>(Map.of(topicPartition, List.of(record))));
when(record.key()).thenReturn(avroKey);
when(record.value()).thenReturn(avroValue);
when(converter.toModel(avroKey, avroValue)).thenReturn(event);
when(record.headers()).thenReturn(new RecordHeaders(List.of(new RecordHeader(CORRELATION_ID, "foo".getBytes(UTF_8)))));
doThrow(new RuntimeException("listener error")).when(listener).onEvent(event);
underTest.receive(listener);
Thread.sleep(100L);
underTest.close();
var inOrder = Mockito.inOrder(consumer, listener, correlator);
inOrder.verify(consumer).assign(topicPartitions);
inOrder.verify(consumer).seekToBeginning(topicPartitions);
inOrder.verify(listener).onEvent(LOAD_COMPLETE);
inOrder.verify(listener).onEvent(event);
inOrder.verify(correlator).received("foo");
}
public IncomingKafkaRecordMetadata(KafkaConsumerRecord<K, T> record) {
this.record = record;
this.recordKey = record.key();
this.topic = record.topic();
this.partition = record.partition();
this.timestamp = Instant.ofEpochMilli(record.timestamp());
this.timestampType = record.timestampType();
this.offset = record.offset();
if (record.headers() == null) {
this.headers = new RecordHeaders();
} else {
this.headers = new RecordHeaders(record.headers().stream()
.map(kh -> new RecordHeader(kh.key(), kh.value().getBytes())).collect(
Collectors.toList()));
}
}
public void scheduleTaskWithCronExpression() {
Flux.just(new File(imagesDirectory).listFiles()).filter(File::isFile).subscribe(
f -> {
Flux.just(new Dimension(800, 600), new Dimension(180, 180), new Dimension(1200, 630)).subscribe(d -> {
try {
ImageResizeRequest imageResizeRequest = new ImageResizeRequest((int) d.getWidth(), (int) d.getHeight(), f.getAbsolutePath());
ProducerRecord<String, String> record = new ProducerRecord<>("asyncRequests", objectMapper.writeValueAsString(imageResizeRequest));
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "asyncReplies".getBytes()));
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
ConsumerRecord<String, String> consumerRecord = replyFuture.get();
} catch (Exception e) {
LOGGER.error("Error while sending message", e);
}
},
e -> LOGGER.error("Error while running lambda"),
() -> f.renameTo(new File(f.getParent() + "/Done", f.getName())));
}
);
}
private ConsumerRecords<byte[], byte[]> createTestRecordsWithHeaders() {
RecordHeader header = new RecordHeader("testHeader", new byte[0]);
RecordHeaders headers = new RecordHeaders();
headers.add(header);
TimestampType timestampType = TimestampType.NO_TIMESTAMP_TYPE;
byte testByte = 0;
byte[] testKey = { testByte };
byte[] testValue = { testByte };
ConnectHeaders destinationHeaders = new ConnectHeaders();
destinationHeaders.add(header.key(), header.value(), Schema.OPTIONAL_BYTES_SCHEMA);
ConsumerRecord<byte[], byte[]> testConsumerRecord = new ConsumerRecord<byte[], byte[]>(FIRST_TOPIC, FIRST_PARTITION,
FIRST_OFFSET, System.currentTimeMillis(), timestampType, 0L, 0, 0, testKey, testValue, headers);
TopicPartition topicPartition = new TopicPartition(FIRST_TOPIC, FIRST_PARTITION);
List<ConsumerRecord<byte[], byte[]>> consumerRecords = new ArrayList<>();
consumerRecords.add(testConsumerRecord);
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> consumerRecordMap = new HashMap<>(1);
consumerRecordMap.put(topicPartition, consumerRecords);
ConsumerRecords<byte[], byte[]> testRecords = new ConsumerRecords<>(consumerRecordMap);
return testRecords;
}
@Test
public void testDefaultRecordMapping() {
final MockKafka<Object, Object> mockKafka = new MockKafka<>();
final RecordHeaders recordHeaders = new RecordHeaders(Collections.singleton(
new RecordHeader("headerKey", "headerValue".getBytes(StandardCharsets.UTF_8))));
final RecordMetadata recordMetadata = new RecordMetadata(new TopicPartition("topic", 0),
0, 0, 0, -1L, -1, -1);
final ProducerRecord<Object, Object> producerRecord =
new ProducerRecord<>("topic", 0, "key", "value", recordHeaders);
final ConsumerRecord<Object, Object> consumerRecord = mockKafka.defaultRecordMapping(producerRecord, recordMetadata);
assertEquals(producerRecord.topic(), consumerRecord.topic());
assertEquals(producerRecord.partition().intValue(), consumerRecord.partition());
assertEquals(producerRecord.key(), consumerRecord.key());
assertEquals(producerRecord.value(), consumerRecord.value());
assertEquals(producerRecord.headers(), consumerRecord.headers());
}
@Test
public void shouldDecodeCompoundKeys() {
final KafkaDecoder decoder = new KafkaDecoder();
final ConsumerRecord<String,String> record = new ConsumerRecord<>(
"ch01",
0,
42L,
1234L, TimestampType.CREATE_TIME,
-1L, -1, -1,
"key-1234",
null,
new RecordHeaders(asList(
new RecordHeader("_synapse_msg_partitionKey", "1234".getBytes(UTF_8)),
new RecordHeader("_synapse_msg_compactionKey", "key-1234".getBytes(UTF_8))
))
);
// when
final TextMessage decodedMessage = decoder.apply(record);
// then
assertThat(decodedMessage.getKey().isCompoundKey(), is(true));
assertThat(decodedMessage.getKey().compactionKey(), is("key-1234"));
assertThat(decodedMessage.getKey().partitionKey(), is("1234"));
}
@Test
public void shouldDecodeBrokenCompoundKeysAsMessageKey() {
final KafkaDecoder decoder = new KafkaDecoder();
final ConsumerRecord<String,String> record = new ConsumerRecord<>(
"ch01",
0,
42L,
1234L, TimestampType.CREATE_TIME,
-1L, -1, -1,
"record-key",
null,
new RecordHeaders(asList(
new RecordHeader("_synapse_msg_partitionKey", "1234".getBytes(UTF_8)),
new RecordHeader("_synapse_msg_compactionKey", "key-1234".getBytes(UTF_8))
))
);
// when
final TextMessage decodedMessage = decoder.apply(record);
// then
assertThat(decodedMessage.getKey().isCompoundKey(), is(false));
assertThat(decodedMessage.getKey().compactionKey(), is("record-key"));
}
@Test
public void shouldEncodeMessage() {
// given
final KafkaEncoder encoder = new KafkaEncoder("test", 1);
final TextMessage message = TextMessage.of("someKey", "payload");
// when
ProducerRecord<String, String> record = encoder.apply(message);
// then
assertThat(record.key(), is("someKey"));
assertThat(record.value(), is("payload"));
assertThat(record.headers(), containsInAnyOrder(
new RecordHeader("_synapse_msg_partitionKey", "someKey".getBytes(UTF_8)),
new RecordHeader("_synapse_msg_compactionKey", "someKey".getBytes(UTF_8))
));
assertThat(record.topic(), is("test"));
assertThat(record.partition(), is(nullValue()));
}
@Test
public void shouldEncodeMessageHeaders() {
// given
final KafkaEncoder encoder = new KafkaEncoder("test", 1);
final TextMessage message = TextMessage.of(
"someKey",
Header.builder()
.withAttribute("foo", "bar")
.withAttribute("foobar", Instant.ofEpochMilli(42)).build(),
null
);
// when
final ProducerRecord<String, String> record = encoder.apply(message);
// then
assertThat(record.headers(), containsInAnyOrder(
new RecordHeader("_synapse_msg_partitionKey", "someKey".getBytes(UTF_8)),
new RecordHeader("_synapse_msg_compactionKey", "someKey".getBytes(UTF_8)),
new RecordHeader("foo", "bar".getBytes(UTF_8)),
new RecordHeader("foobar", "1970-01-01T00:00:00.042Z".getBytes(UTF_8))
));
}
@Test
public void shouldPartitionMessage() {
// given
final KafkaEncoder encoder = new KafkaEncoder("test", 2);
final TextMessage first = TextMessage.of(Key.of("0", "someKeyForPartition0"), null);
final TextMessage second = TextMessage.of(Key.of("1", "someKeyForPartition1"), null);
// when
ProducerRecord<String, String> firstRecord = encoder.apply(first);
ProducerRecord<String, String> secondRecord = encoder.apply(second);
// then
assertThat(firstRecord.key(), is("someKeyForPartition0"));
assertThat(firstRecord.headers(), containsInAnyOrder(
new RecordHeader("_synapse_msg_partitionKey", "0".getBytes(UTF_8)),
new RecordHeader("_synapse_msg_compactionKey", "someKeyForPartition0".getBytes(UTF_8))
));
assertThat(firstRecord.partition(), is(0));
// and
assertThat(secondRecord.key(), is("someKeyForPartition1"));
assertThat(secondRecord.headers(), containsInAnyOrder(
new RecordHeader("_synapse_msg_partitionKey", "1".getBytes(UTF_8)),
new RecordHeader("_synapse_msg_compactionKey", "someKeyForPartition1".getBytes(UTF_8))
));
assertThat(secondRecord.partition(), is(1));
}
@Test
public void shouldSendEvent() {
// given
final Message<ExampleJsonObject> message = message("someKey", new ExampleJsonObject("banana"));
try (final Consumer<String, String> consumer = getKafkaConsumer("someTestGroup")) {
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, KAFKA_TOPIC);
// when
messageSender.send(message).join();
// then
final ConsumerRecord<String, String> record = getSingleRecord(consumer, KAFKA_TOPIC, 250L);
assertThat(record.key(), is("someKey"));
assertThat(record.value(), is("{\"value\":\"banana\"}"));
assertThat(record.headers(), containsInAnyOrder(
new RecordHeader("_synapse_msg_partitionKey", "someKey".getBytes(UTF_8)),
new RecordHeader("_synapse_msg_compactionKey", "someKey".getBytes(UTF_8))
));
assertThat(record.topic(), is(KAFKA_TOPIC));
assertThat(record.partition(), is(0));
}
}
@Override
public ProducerRecord<K, V> record() {
if (headers.isEmpty()) {
return new ProducerRecord<>(topic, partition, timestamp, key, value);
} else {
return new ProducerRecord<>(
topic,
partition,
timestamp,
key,
value,
headers.stream()
.map(header -> new RecordHeader(header.key(), header.value().getBytes()))
.collect(Collectors.toList()));
}
}
@Test
public void testConsumeWithHeader(TestContext ctx) {
MockConsumer<String, String> mock = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
KafkaReadStream<String, String> consumer = createConsumer(vertx, mock);
Async doneLatch = ctx.async();
consumer.handler(record -> {
ctx.assertEquals("the_topic", record.topic());
ctx.assertEquals(0, record.partition());
ctx.assertEquals("abc", record.key());
ctx.assertEquals("def", record.value());
Header[] headers = record.headers().toArray();
ctx.assertEquals(1, headers.length);
Header header = headers[0];
ctx.assertEquals("header_key", header.key());
ctx.assertEquals("header_value", new String(header.value()));
consumer.close(v -> doneLatch.complete());
});
consumer.subscribe(Collections.singleton("the_topic"), v -> {
mock.schedulePollTask(() -> {
mock.rebalance(Collections.singletonList(new TopicPartition("the_topic", 0)));
mock.addRecord(new ConsumerRecord<>("the_topic", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0L, 0, 0, "abc", "def",
new RecordHeaders(Collections.singletonList(new RecordHeader("header_key", "header_value".getBytes())))));
mock.seek(new TopicPartition("the_topic", 0), 0L);
});
});
}
public static void main(String[] args)
throws ExecutionException, InterruptedException {
Properties properties = new Properties();
properties.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
properties.put("bootstrap.servers", brokerList);
KafkaProducer<String, String> producer =
new KafkaProducer<>(properties);
ProducerRecord<String, String> record1 =
new ProducerRecord<>(topic, 0, System.currentTimeMillis(),
null, "msg_ttl_1", new RecordHeaders().add(new RecordHeader("ttl",
BytesUtils.longToBytes(20))));
ProducerRecord<String, String> record2 = //超时的消息
new ProducerRecord<>(topic, 0, System.currentTimeMillis() - 5 * 1000,
null, "msg_ttl_2", new RecordHeaders().add(new RecordHeader("ttl",
BytesUtils.longToBytes(5))));
ProducerRecord<String, String> record3 =
new ProducerRecord<>(topic, 0, System.currentTimeMillis(),
null, "msg_ttl_3", new RecordHeaders().add(new RecordHeader("ttl",
BytesUtils.longToBytes(30))));
producer.send(record1).get();
producer.send(record2).get();
producer.send(record3).get();
}
private WorkerRecord<byte[], byte[]> emptyWorkerRecordWithHeaders(String[] headers) {
RecordHeaders recordHeaders = new RecordHeaders();
for (String headerStr: headers) {
String[] split = headerStr.split(":");
recordHeaders.add(new RecordHeader(split[0], split[1].getBytes(ISO_8859_1)));
}
ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>(EMPTY_TOPIC, SOME_PARTITION, SOME_OFFSET,
ConsumerRecord.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, (long) ConsumerRecord.NULL_CHECKSUM,
0, 0,
new byte[0], new byte[0],
recordHeaders);
return new WorkerRecord<>(consumerRecord, SOME_SUBPARTITION);
}
@Incoming("data")
@Outgoing("output-2")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public Message<Integer> process(Message<Integer> input) {
List<RecordHeader> list = Arrays.asList(
new RecordHeader("hello", "clement".getBytes()),
new RecordHeader("count", Integer.toString(counter.incrementAndGet()).getBytes()));
return Message.of(
input.getPayload() + 1,
Metadata.of(OutgoingKafkaRecordMetadata.builder().withKey(Integer.toString(input.getPayload()))
.withHeaders(list).build()),
input::ack);
}
public static void main(String[] args)
throws ExecutionException, InterruptedException {
Properties properties = new Properties();
properties.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
properties.put("bootstrap.servers", brokerList);
KafkaProducer<String, String> producer =
new KafkaProducer<>(properties);
ProducerRecord<String, String> record1 =
new ProducerRecord<>(topic, 0, System.currentTimeMillis(),
null, "msg_ttl_1", new RecordHeaders().add(new RecordHeader("ttl",
BytesUtils.longToBytes(20))));
ProducerRecord<String, String> record2 = //超时的消息
new ProducerRecord<>(topic, 0, System.currentTimeMillis() - 5 * 1000,
null, "msg_ttl_2", new RecordHeaders().add(new RecordHeader("ttl",
BytesUtils.longToBytes(5))));
ProducerRecord<String, String> record3 =
new ProducerRecord<>(topic, 0, System.currentTimeMillis(),
null, "msg_ttl_3", new RecordHeaders().add(new RecordHeader("ttl",
BytesUtils.longToBytes(30))));
producer.send(record1).get();
producer.send(record2).get();
producer.send(record3).get();
}
private Headers buildHeaders(Map<String, String> attributesMap) {
if (attributesMap == null || attributesMap.isEmpty()) {
return null;
}
return new RecordHeaders(
attributesMap
.entrySet()
.parallelStream()
.map(attribute -> new RecordHeader(attribute.getKey(), attribute.getValue().getBytes()))
.collect(Collectors.toList()));
}
@Test
public void pull_withHeader() {
int partitions = 1;
int recordsPerPartition = 3;
List<Header> headers = new ArrayList<>();
headers.add(new RecordHeader("key1", "value1".getBytes()));
headers.add(new RecordHeader("key2", "value2".getBytes()));
generateTestRecordsForConsumers(partitions, recordsPerPartition, headers);
// Each response should pull from a different partition
List<String> messageIds = new ArrayList<>();
List<String> messages = new ArrayList<>();
List<Map<String, String>> attributes = new ArrayList<>();
List<PubsubMessage> response = subscriptionManager.pull(10, false);
for (PubsubMessage message : response) {
messageIds.add(message.getMessageId());
messages.add(message.getData().toStringUtf8());
attributes.add(message.getAttributesMap());
}
assertThat(messageIds, Matchers.contains("0-0", "0-1", "0-2"));
assertThat(messages, Matchers.contains("message-0000", "message-0001", "message-0002"));
ImmutableMap<String, String> expectedAttributes =
new Builder<String, String>().put("key1", "value1").put("key2", "value2").build();
assertThat(
attributes,
Matchers.equalTo(
Arrays.asList(expectedAttributes, expectedAttributes, expectedAttributes)));
assertThat(subscriptionManager.pull(10, false), Matchers.empty());
}
@Test
public void publish_withAttributes() {
int messages = 3;
PublishRequest request =
PublishRequest.newBuilder()
.setTopic("projects/project-1/topics/topic-2")
.addAllMessages(generatePubsubMessagesWithHeader(messages))
.build();
MockProducer<String, ByteBuffer> producer = startPublishExecutor(messages);
PublishResponse response = blockingStub.publish(request);
assertThat(response.getMessageIdsList(), Matchers.contains("0-0", "0-1", "0-2"));
List<Headers> headers =
producer.history().stream().map(ProducerRecord::headers).collect(Collectors.toList());
assertThat(
headers,
Matchers.contains(
new RecordHeaders(
Collections.singletonList(
new RecordHeader("some-key", "some-value".getBytes(UTF_8)))),
new RecordHeaders(
Collections.singletonList(
new RecordHeader("some-key", "some-value".getBytes(UTF_8)))),
new RecordHeaders(
Collections.singletonList(
new RecordHeader("some-key", "some-value".getBytes(UTF_8))))));
verify(statisticsManager, times(3))
.computePublish(
eq("projects/project-1/topics/topic-2"),
argThat(message -> message.toStringUtf8().matches(MESSAGE_CONTENT_REGEX)),
anyLong());
verify(statisticsManager, never()).computePublishError(anyString());
}
static RecordHeaders kafkaHeaders(RecordHeader... headers) {
RecordHeaders hs = new RecordHeaders();
for (RecordHeader h : headers) {
hs.add(h);
}
return hs;
}
@ParameterizedTest
@MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions")
void testRequestWithStructured(CloudEvent event) {
String expectedContentType = CSVFormat.INSTANCE.serializedContentType();
byte[] expectedBuffer = CSVFormat.INSTANCE.serialize(event);
String topic = "test";
Integer partition = 10;
Long timestamp = System.currentTimeMillis();
String key = "aaa";
ProducerRecord<String, byte[]> producerRecord = StructuredMessageReader
.from(event, CSVFormat.INSTANCE)
.read(KafkaMessageFactory.createWriter(topic, partition, timestamp, key));
assertThat(producerRecord.topic())
.isEqualTo(topic);
assertThat(producerRecord.partition())
.isEqualTo(partition);
assertThat(producerRecord.timestamp())
.isEqualTo(timestamp);
assertThat(producerRecord.key())
.isEqualTo(key);
assertThat(producerRecord.headers())
.containsExactly(new RecordHeader(KafkaHeaders.CONTENT_TYPE, expectedContentType.getBytes()));
assertThat(producerRecord.value())
.isEqualTo(expectedBuffer);
}
/**
* Generates Kafka {@link Headers} based on an {@link EventMessage} and {@link SerializedObject}, using the given
* {@code headerValueMapper} to correctly map the values to byte arrays.
*
* @param eventMessage the {@link EventMessage} to create headers for
* @param serializedObject the serialized payload of the given {@code eventMessage}
* @param headerValueMapper function for converting {@code values} to bytes. Since {@link RecordHeader} can handle
* only bytes this function needs to define the logic how to convert a given value to
* bytes. See {@link HeaderUtils#byteMapper()} for sample implementation
* @return the generated Kafka {@link Headers} based on an {@link EventMessage} and {@link SerializedObject}
*/
public static Headers toHeaders(EventMessage<?> eventMessage,
SerializedObject<byte[]> serializedObject,
BiFunction<String, Object, RecordHeader> headerValueMapper) {
notNull(eventMessage, () -> "EventMessage may not be null");
notNull(serializedObject, () -> "SerializedObject may not be null");
notNull(headerValueMapper, () -> "Header key-value mapper function may not be null");
RecordHeaders headers = new RecordHeaders();
eventMessage.getMetaData()
.forEach((k, v) -> ((Headers) headers).add(headerValueMapper.apply(generateMetadataKey(k), v)));
defaultHeaders(eventMessage, serializedObject).forEach((k, v) -> addHeader(headers, k, v));
return headers;
}
@Test
public void testByteMapperNullValueShouldBeAbleToHandle() {
BiFunction<String, Object, RecordHeader> fxn = byteMapper();
RecordHeader header = fxn.apply("abc", null);
assertThat(header.value()).isNull();
}
@Test
public void testGeneratingHeadersWithByteMapperShouldGenerateCorrectHeaders() {
BiFunction<String, Object, RecordHeader> fxn = byteMapper();
String expectedKey = "abc";
String expectedValue = "xyz";
RecordHeader header = fxn.apply(expectedKey, expectedValue);
assertThat(header.key()).isEqualTo(expectedKey);
assertThat(new String(header.value())).isEqualTo(expectedValue);
}
@Test
public void testGeneratingHeadersWithCustomMapperShouldGeneratedCorrectHeaders() {
String metaKey = "someHeaderKey";
String expectedMetaDataValue = "evt:someValue";
Headers header = toHeaders(
asEventMessage("SomePayload").withMetaData(MetaData.with(metaKey, "someValue")),
serializedObject(),
(key, value) -> new RecordHeader(key, ("evt:" + value.toString()).getBytes())
);
assertThat(valueAsString(header, generateMetadataKey(metaKey))).isEqualTo(expectedMetaDataValue);
}
private void reconsumeLater(ConsumerRecord<String, byte[]> consumeRecord) throws InterruptedException, ExecutionException {
// add all header to headList except RETRY_COUNT
Headers headers = consumeRecord.headers();
List<Header> headerList = new ArrayList<Header>(8);
Iterator<Header> iterator = headers.iterator();
Integer retryCount = -1;
boolean hasOrignalHeader = false;
while (iterator.hasNext()) {
Header next = iterator.next();
if (next.key().equals(RETRY_COUNT_KEY)) {
retryCount = serializer.deserialize(next.value());
continue;
}
if(next.key().equals(ORGINAL_TOPIC)){
hasOrignalHeader = true;
}
headerList.add(next);
}
// add RETRY_COUNT to header
retryCount++;
headerList.add(new RecordHeader(RETRY_COUNT_KEY, serializer.serialization(retryCount)));
if(!hasOrignalHeader){
headerList.add(new RecordHeader(ORGINAL_TOPIC, serializer.serialization(consumeRecord.topic())));
}
// send message to corresponding queue according to retry times
String retryTopic = calcRetryTopic(consumeRecord.topic(), retryCount);
ProducerRecord<String, byte[]> record = new ProducerRecord<>(retryTopic,
consumeRecord.partition() % retryQueuePartitionCount.get(retryTopic), null, consumeRecord.key(),
consumeRecord.value(), headerList);
Future<RecordMetadata> publishKafkaMessage = retryQueueMsgProducer.publishKafkaMessage(record);
publishKafkaMessage.get();
}
@Override
public EasyTransMsgPublishResult publish(String topic, String tag, String key, Map<String,Object> header, byte[] msgByte) {
String kafkaTopic = QueueKafkaHelper.getKafkaTopic(topic, tag);
//calculate partition
TransactionId trxId = (TransactionId) header.get(EasytransConstant.CallHeadKeys.PARENT_TRX_ID_KEY);
int partition = calcMessagePartition(kafkaTopic, trxId);
List<Header> kafkaHeaderList = new ArrayList<>(header.size());
for(Entry<String, Object> entry:header.entrySet()){
kafkaHeaderList.add(new RecordHeader(entry.getKey(),serializer.serialization(entry.getValue())));
}
ProducerRecord<String, byte[]> record = new ProducerRecord<>(kafkaTopic, partition, null, key, msgByte, kafkaHeaderList);
Future<RecordMetadata> sendResultFuture = kafkaProducer.send(record);
try {
RecordMetadata recordMetadata = sendResultFuture.get();
log.info("message sent:" + recordMetadata);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("message sent error",e);
}
EasyTransMsgPublishResult easyTransMsgPublishResult = new EasyTransMsgPublishResult();
easyTransMsgPublishResult.setTopic(topic);
easyTransMsgPublishResult.setMessageId(key);
return easyTransMsgPublishResult;
}