类org.apache.kafka.common.header.internals.RecordHeader源码实例Demo

下面列出了怎么用org.apache.kafka.common.header.internals.RecordHeader的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: kop   文件: MessageRecordUtils.java
private static Header[] getHeadersFromMetadata(List<KeyValue> properties) {
    Header[] headers = new Header[properties.size()];

    if (log.isDebugEnabled()) {
        log.debug("getHeadersFromMetadata. Header size: {}",
            properties.size());
    }

    int index = 0;
    for (KeyValue kv: properties) {
        headers[index] = new RecordHeader(kv.getKey(), kv.getValue().getBytes(UTF_8));

        if (log.isDebugEnabled()) {
            log.debug("index: {} kv.getKey: {}. kv.getValue: {}",
                index, kv.getKey(), kv.getValue());
        }
        index++;
    }

    return headers;
}
 
源代码2 项目: stream-registry   文件: KafkaEventReceiverTest.java
@Test
public void typical() throws Exception {
  when(config.getTopic()).thenReturn(topic);
  when(consumer.partitionsFor(topic)).thenReturn(List.of(partitionInfo));
  when(consumer.beginningOffsets(topicPartitions)).thenReturn(Map.of(topicPartition, 0L));
  when(consumer.endOffsets(topicPartitions)).thenReturn(Map.of(topicPartition, 0L));
  when(consumer.poll(Duration.ofMillis(100))).thenReturn(new ConsumerRecords<>(Map.of(topicPartition, List.of(record))));
  when(record.key()).thenReturn(avroKey);
  when(record.value()).thenReturn(avroValue);
  when(converter.toModel(avroKey, avroValue)).thenReturn(event);
  when(record.headers()).thenReturn(new RecordHeaders(List.of(new RecordHeader(CORRELATION_ID, "foo".getBytes(UTF_8)))));

  underTest.receive(listener);
  Thread.sleep(100L);
  underTest.close();

  var inOrder = Mockito.inOrder(consumer, listener, correlator);
  inOrder.verify(consumer).assign(topicPartitions);
  inOrder.verify(consumer).seekToBeginning(topicPartitions);
  inOrder.verify(listener).onEvent(LOAD_COMPLETE);
  inOrder.verify(listener).onEvent(event);
  inOrder.verify(correlator).received("foo");
}
 
源代码3 项目: stream-registry   文件: KafkaEventReceiverTest.java
@Test
public void listenerThrowsException() throws Exception {
  when(config.getTopic()).thenReturn(topic);
  when(consumer.partitionsFor(topic)).thenReturn(List.of(partitionInfo));
  when(consumer.beginningOffsets(topicPartitions)).thenReturn(Map.of(topicPartition, 0L));
  when(consumer.endOffsets(topicPartitions)).thenReturn(Map.of(topicPartition, 0L));
  when(consumer.poll(Duration.ofMillis(100))).thenReturn(new ConsumerRecords<>(Map.of(topicPartition, List.of(record))));
  when(record.key()).thenReturn(avroKey);
  when(record.value()).thenReturn(avroValue);
  when(converter.toModel(avroKey, avroValue)).thenReturn(event);
  when(record.headers()).thenReturn(new RecordHeaders(List.of(new RecordHeader(CORRELATION_ID, "foo".getBytes(UTF_8)))));
  doThrow(new RuntimeException("listener error")).when(listener).onEvent(event);

  underTest.receive(listener);
  Thread.sleep(100L);
  underTest.close();

  var inOrder = Mockito.inOrder(consumer, listener, correlator);
  inOrder.verify(consumer).assign(topicPartitions);
  inOrder.verify(consumer).seekToBeginning(topicPartitions);
  inOrder.verify(listener).onEvent(LOAD_COMPLETE);
  inOrder.verify(listener).onEvent(event);
  inOrder.verify(correlator).received("foo");
}
 
public IncomingKafkaRecordMetadata(KafkaConsumerRecord<K, T> record) {
    this.record = record;
    this.recordKey = record.key();
    this.topic = record.topic();
    this.partition = record.partition();
    this.timestamp = Instant.ofEpochMilli(record.timestamp());
    this.timestampType = record.timestampType();
    this.offset = record.offset();
    if (record.headers() == null) {
        this.headers = new RecordHeaders();
    } else {
        this.headers = new RecordHeaders(record.headers().stream()
                .map(kh -> new RecordHeader(kh.key(), kh.value().getBytes())).collect(
                        Collectors.toList()));
    }
}
 
public void scheduleTaskWithCronExpression() {
    Flux.just(new File(imagesDirectory).listFiles()).filter(File::isFile).subscribe(
        f -> {
            Flux.just(new Dimension(800, 600), new Dimension(180, 180), new Dimension(1200, 630)).subscribe(d -> {
                try {
                    ImageResizeRequest imageResizeRequest = new ImageResizeRequest((int) d.getWidth(), (int) d.getHeight(), f.getAbsolutePath());
                    ProducerRecord<String, String> record = new ProducerRecord<>("asyncRequests", objectMapper.writeValueAsString(imageResizeRequest));
                    record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "asyncReplies".getBytes()));
                    RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
                    ConsumerRecord<String, String> consumerRecord = replyFuture.get();
                } catch (Exception e) {
                    LOGGER.error("Error while sending message", e);
                }
            },
            e -> LOGGER.error("Error while running lambda"),
            () -> f.renameTo(new File(f.getParent() + "/Done", f.getName())));
        }
    );
}
 
private ConsumerRecords<byte[], byte[]> createTestRecordsWithHeaders() {
  RecordHeader header = new RecordHeader("testHeader", new byte[0]);
  RecordHeaders headers = new RecordHeaders();
  headers.add(header);
  TimestampType timestampType = TimestampType.NO_TIMESTAMP_TYPE;

  byte testByte = 0;
  byte[] testKey = { testByte };
  byte[] testValue = { testByte };

  ConnectHeaders destinationHeaders = new ConnectHeaders();
  destinationHeaders.add(header.key(), header.value(), Schema.OPTIONAL_BYTES_SCHEMA);
  ConsumerRecord<byte[], byte[]> testConsumerRecord = new ConsumerRecord<byte[], byte[]>(FIRST_TOPIC, FIRST_PARTITION,
      FIRST_OFFSET, System.currentTimeMillis(), timestampType, 0L, 0, 0, testKey, testValue, headers);

  TopicPartition topicPartition = new TopicPartition(FIRST_TOPIC, FIRST_PARTITION);
  List<ConsumerRecord<byte[], byte[]>> consumerRecords = new ArrayList<>();
  consumerRecords.add(testConsumerRecord);

  Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> consumerRecordMap = new HashMap<>(1);
  consumerRecordMap.put(topicPartition, consumerRecords);
  ConsumerRecords<byte[], byte[]> testRecords = new ConsumerRecords<>(consumerRecordMap);
  return testRecords;
}
 
源代码7 项目: jackdaw   文件: MockKafkaTest.java
@Test
public void testDefaultRecordMapping() {
  final MockKafka<Object, Object> mockKafka = new MockKafka<>();
  final RecordHeaders recordHeaders = new RecordHeaders(Collections.singleton(
                  new RecordHeader("headerKey", "headerValue".getBytes(StandardCharsets.UTF_8))));
  final RecordMetadata recordMetadata = new RecordMetadata(new TopicPartition("topic", 0),
          0, 0, 0, -1L, -1, -1);
  final ProducerRecord<Object, Object> producerRecord =
          new ProducerRecord<>("topic", 0, "key", "value", recordHeaders);

  final ConsumerRecord<Object, Object> consumerRecord = mockKafka.defaultRecordMapping(producerRecord, recordMetadata);

  assertEquals(producerRecord.topic(), consumerRecord.topic());
  assertEquals(producerRecord.partition().intValue(), consumerRecord.partition());
  assertEquals(producerRecord.key(), consumerRecord.key());
  assertEquals(producerRecord.value(), consumerRecord.value());
  assertEquals(producerRecord.headers(), consumerRecord.headers());
}
 
源代码8 项目: synapse   文件: KafkaDecoderTest.java
@Test
public void shouldDecodeCompoundKeys() {
    final KafkaDecoder decoder = new KafkaDecoder();
    final ConsumerRecord<String,String> record = new ConsumerRecord<>(
            "ch01",
            0,
            42L,
            1234L, TimestampType.CREATE_TIME,
            -1L, -1, -1,
            "key-1234",
            null,
            new RecordHeaders(asList(
                    new RecordHeader("_synapse_msg_partitionKey", "1234".getBytes(UTF_8)),
                    new RecordHeader("_synapse_msg_compactionKey", "key-1234".getBytes(UTF_8))
            ))
    );

    // when
    final TextMessage decodedMessage = decoder.apply(record);

    // then
    assertThat(decodedMessage.getKey().isCompoundKey(), is(true));
    assertThat(decodedMessage.getKey().compactionKey(), is("key-1234"));
    assertThat(decodedMessage.getKey().partitionKey(), is("1234"));
}
 
源代码9 项目: synapse   文件: KafkaDecoderTest.java
@Test
public void shouldDecodeBrokenCompoundKeysAsMessageKey() {
    final KafkaDecoder decoder = new KafkaDecoder();
    final ConsumerRecord<String,String> record = new ConsumerRecord<>(
            "ch01",
            0,
            42L,
            1234L, TimestampType.CREATE_TIME,
            -1L, -1, -1,
            "record-key",
            null,
            new RecordHeaders(asList(
                    new RecordHeader("_synapse_msg_partitionKey", "1234".getBytes(UTF_8)),
                    new RecordHeader("_synapse_msg_compactionKey", "key-1234".getBytes(UTF_8))
            ))
    );

    // when
    final TextMessage decodedMessage = decoder.apply(record);

    // then
    assertThat(decodedMessage.getKey().isCompoundKey(), is(false));
    assertThat(decodedMessage.getKey().compactionKey(), is("record-key"));
}
 
源代码10 项目: synapse   文件: KafkaEncoderTest.java
@Test
public void shouldEncodeMessage() {
    // given
    final KafkaEncoder encoder = new KafkaEncoder("test", 1);
    final TextMessage message = TextMessage.of("someKey", "payload");

    // when
    ProducerRecord<String, String> record = encoder.apply(message);

    // then
    assertThat(record.key(), is("someKey"));
    assertThat(record.value(), is("payload"));
    assertThat(record.headers(), containsInAnyOrder(
            new RecordHeader("_synapse_msg_partitionKey", "someKey".getBytes(UTF_8)),
            new RecordHeader("_synapse_msg_compactionKey", "someKey".getBytes(UTF_8))
    ));
    assertThat(record.topic(), is("test"));
    assertThat(record.partition(), is(nullValue()));
}
 
源代码11 项目: synapse   文件: KafkaEncoderTest.java
@Test
public void shouldEncodeMessageHeaders() {
    // given
    final KafkaEncoder encoder = new KafkaEncoder("test", 1);
    final TextMessage message = TextMessage.of(
            "someKey",
            Header.builder()
                    .withAttribute("foo", "bar")
                    .withAttribute("foobar", Instant.ofEpochMilli(42)).build(),
            null
    );

    // when
    final ProducerRecord<String, String> record = encoder.apply(message);

    // then
    assertThat(record.headers(), containsInAnyOrder(
            new RecordHeader("_synapse_msg_partitionKey", "someKey".getBytes(UTF_8)),
            new RecordHeader("_synapse_msg_compactionKey", "someKey".getBytes(UTF_8)),
            new RecordHeader("foo", "bar".getBytes(UTF_8)),
            new RecordHeader("foobar", "1970-01-01T00:00:00.042Z".getBytes(UTF_8))
    ));
}
 
源代码12 项目: synapse   文件: KafkaEncoderTest.java
@Test
public void shouldPartitionMessage() {
    // given
    final KafkaEncoder encoder = new KafkaEncoder("test", 2);
    final TextMessage first = TextMessage.of(Key.of("0", "someKeyForPartition0"), null);
    final TextMessage second = TextMessage.of(Key.of("1", "someKeyForPartition1"), null);

    // when
    ProducerRecord<String, String> firstRecord = encoder.apply(first);
    ProducerRecord<String, String> secondRecord = encoder.apply(second);

    // then
    assertThat(firstRecord.key(), is("someKeyForPartition0"));
    assertThat(firstRecord.headers(), containsInAnyOrder(
            new RecordHeader("_synapse_msg_partitionKey", "0".getBytes(UTF_8)),
            new RecordHeader("_synapse_msg_compactionKey", "someKeyForPartition0".getBytes(UTF_8))
    ));
    assertThat(firstRecord.partition(), is(0));
    // and
    assertThat(secondRecord.key(), is("someKeyForPartition1"));
    assertThat(secondRecord.headers(), containsInAnyOrder(
            new RecordHeader("_synapse_msg_partitionKey", "1".getBytes(UTF_8)),
            new RecordHeader("_synapse_msg_compactionKey", "someKeyForPartition1".getBytes(UTF_8))
    ));
    assertThat(secondRecord.partition(), is(1));
}
 
源代码13 项目: synapse   文件: KafkaMessageSenderTest.java
@Test
public void shouldSendEvent() {
    // given
    final Message<ExampleJsonObject> message = message("someKey", new ExampleJsonObject("banana"));

    try (final Consumer<String, String> consumer = getKafkaConsumer("someTestGroup")) {
        embeddedKafka.consumeFromAnEmbeddedTopic(consumer, KAFKA_TOPIC);

        // when
        messageSender.send(message).join();

        // then

        final ConsumerRecord<String, String> record = getSingleRecord(consumer, KAFKA_TOPIC, 250L);
        assertThat(record.key(), is("someKey"));
        assertThat(record.value(), is("{\"value\":\"banana\"}"));
        assertThat(record.headers(), containsInAnyOrder(
                new RecordHeader("_synapse_msg_partitionKey", "someKey".getBytes(UTF_8)),
                new RecordHeader("_synapse_msg_compactionKey", "someKey".getBytes(UTF_8))
        ));
        assertThat(record.topic(), is(KAFKA_TOPIC));
        assertThat(record.partition(), is(0));
    }
}
 
@Override
public ProducerRecord<K, V> record() {
  if (headers.isEmpty()) {
    return new ProducerRecord<>(topic, partition, timestamp, key, value);
  } else {
    return new ProducerRecord<>(
      topic,
      partition,
      timestamp,
      key,
      value,
      headers.stream()
        .map(header -> new RecordHeader(header.key(), header.value().getBytes()))
        .collect(Collectors.toList()));
  }
}
 
源代码15 项目: vertx-kafka-client   文件: ConsumerMockTestBase.java
@Test
public void testConsumeWithHeader(TestContext ctx) {
  MockConsumer<String, String> mock = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
  KafkaReadStream<String, String> consumer = createConsumer(vertx, mock);
  Async doneLatch = ctx.async();
  consumer.handler(record -> {
    ctx.assertEquals("the_topic", record.topic());
    ctx.assertEquals(0, record.partition());
    ctx.assertEquals("abc", record.key());
    ctx.assertEquals("def", record.value());
    Header[] headers = record.headers().toArray();
    ctx.assertEquals(1, headers.length);
    Header header = headers[0];
    ctx.assertEquals("header_key", header.key());
    ctx.assertEquals("header_value", new String(header.value()));
    consumer.close(v -> doneLatch.complete());
  });
  consumer.subscribe(Collections.singleton("the_topic"), v -> {
    mock.schedulePollTask(() -> {
      mock.rebalance(Collections.singletonList(new TopicPartition("the_topic", 0)));
      mock.addRecord(new ConsumerRecord<>("the_topic", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0L, 0, 0, "abc", "def",
        new RecordHeaders(Collections.singletonList(new RecordHeader("header_key", "header_value".getBytes())))));
      mock.seek(new TopicPartition("the_topic", 0), 0L);
    });
  });
}
 
源代码16 项目: kafka_book_demo   文件: ProducertTTL.java
public static void main(String[] args)
        throws ExecutionException, InterruptedException {
    Properties properties = new Properties();
    properties.put("key.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("bootstrap.servers", brokerList);

    KafkaProducer<String, String> producer =
            new KafkaProducer<>(properties);

    ProducerRecord<String, String> record1 =
            new ProducerRecord<>(topic, 0, System.currentTimeMillis(),
                    null, "msg_ttl_1", new RecordHeaders().add(new RecordHeader("ttl",
                    BytesUtils.longToBytes(20))));
    ProducerRecord<String, String> record2 = //超时的消息
            new ProducerRecord<>(topic, 0, System.currentTimeMillis() - 5 * 1000,
                    null, "msg_ttl_2", new RecordHeaders().add(new RecordHeader("ttl",
                    BytesUtils.longToBytes(5))));
    ProducerRecord<String, String> record3 =
            new ProducerRecord<>(topic, 0, System.currentTimeMillis(),
                    null, "msg_ttl_3", new RecordHeaders().add(new RecordHeader("ttl",
                    BytesUtils.longToBytes(30))));
    producer.send(record1).get();
    producer.send(record2).get();
    producer.send(record3).get();
}
 
源代码17 项目: kafka-workers   文件: BaseRecordWeigherTest.java
private WorkerRecord<byte[], byte[]> emptyWorkerRecordWithHeaders(String[] headers) {
    RecordHeaders recordHeaders = new RecordHeaders();
    for (String headerStr: headers) {
        String[] split = headerStr.split(":");
        recordHeaders.add(new RecordHeader(split[0], split[1].getBytes(ISO_8859_1)));
    }
    ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>(EMPTY_TOPIC, SOME_PARTITION, SOME_OFFSET,
            ConsumerRecord.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, (long) ConsumerRecord.NULL_CHECKSUM,
            0, 0,
            new byte[0], new byte[0],
            recordHeaders);

    return new WorkerRecord<>(consumerRecord, SOME_SUBPARTITION);
}
 
@Incoming("data")
@Outgoing("output-2")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public Message<Integer> process(Message<Integer> input) {
    List<RecordHeader> list = Arrays.asList(
            new RecordHeader("hello", "clement".getBytes()),
            new RecordHeader("count", Integer.toString(counter.incrementAndGet()).getBytes()));
    return Message.of(
            input.getPayload() + 1,
            Metadata.of(OutgoingKafkaRecordMetadata.builder().withKey(Integer.toString(input.getPayload()))
                    .withHeaders(list).build()),
            input::ack);
}
 
源代码19 项目: BigData-In-Practice   文件: ProducertTTL.java
public static void main(String[] args)
        throws ExecutionException, InterruptedException {
    Properties properties = new Properties();
    properties.put("key.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("bootstrap.servers", brokerList);

    KafkaProducer<String, String> producer =
            new KafkaProducer<>(properties);

    ProducerRecord<String, String> record1 =
            new ProducerRecord<>(topic, 0, System.currentTimeMillis(),
                    null, "msg_ttl_1", new RecordHeaders().add(new RecordHeader("ttl",
                    BytesUtils.longToBytes(20))));
    ProducerRecord<String, String> record2 = //超时的消息
            new ProducerRecord<>(topic, 0, System.currentTimeMillis() - 5 * 1000,
                    null, "msg_ttl_2", new RecordHeaders().add(new RecordHeader("ttl",
                    BytesUtils.longToBytes(5))));
    ProducerRecord<String, String> record3 =
            new ProducerRecord<>(topic, 0, System.currentTimeMillis(),
                    null, "msg_ttl_3", new RecordHeaders().add(new RecordHeader("ttl",
                    BytesUtils.longToBytes(30))));
    producer.send(record1).get();
    producer.send(record2).get();
    producer.send(record3).get();
}
 
源代码20 项目: kafka-pubsub-emulator   文件: PublisherService.java
private Headers buildHeaders(Map<String, String> attributesMap) {
  if (attributesMap == null || attributesMap.isEmpty()) {
    return null;
  }
  return new RecordHeaders(
      attributesMap
          .entrySet()
          .parallelStream()
          .map(attribute -> new RecordHeader(attribute.getKey(), attribute.getValue().getBytes()))
          .collect(Collectors.toList()));
}
 
@Test
public void pull_withHeader() {
  int partitions = 1;
  int recordsPerPartition = 3;
  List<Header> headers = new ArrayList<>();
  headers.add(new RecordHeader("key1", "value1".getBytes()));
  headers.add(new RecordHeader("key2", "value2".getBytes()));
  generateTestRecordsForConsumers(partitions, recordsPerPartition, headers);

  // Each response should pull from a different partition
  List<String> messageIds = new ArrayList<>();
  List<String> messages = new ArrayList<>();
  List<Map<String, String>> attributes = new ArrayList<>();
  List<PubsubMessage> response = subscriptionManager.pull(10, false);
  for (PubsubMessage message : response) {
    messageIds.add(message.getMessageId());
    messages.add(message.getData().toStringUtf8());
    attributes.add(message.getAttributesMap());
  }

  assertThat(messageIds, Matchers.contains("0-0", "0-1", "0-2"));
  assertThat(messages, Matchers.contains("message-0000", "message-0001", "message-0002"));
  ImmutableMap<String, String> expectedAttributes =
      new Builder<String, String>().put("key1", "value1").put("key2", "value2").build();
  assertThat(
      attributes,
      Matchers.equalTo(
          Arrays.asList(expectedAttributes, expectedAttributes, expectedAttributes)));

  assertThat(subscriptionManager.pull(10, false), Matchers.empty());
}
 
@Test
public void publish_withAttributes() {
  int messages = 3;
  PublishRequest request =
      PublishRequest.newBuilder()
          .setTopic("projects/project-1/topics/topic-2")
          .addAllMessages(generatePubsubMessagesWithHeader(messages))
          .build();

  MockProducer<String, ByteBuffer> producer = startPublishExecutor(messages);

  PublishResponse response = blockingStub.publish(request);
  assertThat(response.getMessageIdsList(), Matchers.contains("0-0", "0-1", "0-2"));

  List<Headers> headers =
      producer.history().stream().map(ProducerRecord::headers).collect(Collectors.toList());
  assertThat(
      headers,
      Matchers.contains(
          new RecordHeaders(
              Collections.singletonList(
                  new RecordHeader("some-key", "some-value".getBytes(UTF_8)))),
          new RecordHeaders(
              Collections.singletonList(
                  new RecordHeader("some-key", "some-value".getBytes(UTF_8)))),
          new RecordHeaders(
              Collections.singletonList(
                  new RecordHeader("some-key", "some-value".getBytes(UTF_8))))));

  verify(statisticsManager, times(3))
      .computePublish(
          eq("projects/project-1/topics/topic-2"),
          argThat(message -> message.toStringUtf8().matches(MESSAGE_CONTENT_REGEX)),
          anyLong());
  verify(statisticsManager, never()).computePublishError(anyString());
}
 
源代码23 项目: sdk-java   文件: KafkaUtils.java
static RecordHeaders kafkaHeaders(RecordHeader... headers) {
    RecordHeaders hs = new RecordHeaders();
    for (RecordHeader h : headers) {
        hs.add(h);
    }
    return hs;
}
 
源代码24 项目: sdk-java   文件: KafkaProducerMessageWriterTest.java
@ParameterizedTest
@MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions")
void testRequestWithStructured(CloudEvent event) {
    String expectedContentType = CSVFormat.INSTANCE.serializedContentType();
    byte[] expectedBuffer = CSVFormat.INSTANCE.serialize(event);

    String topic = "test";
    Integer partition = 10;
    Long timestamp = System.currentTimeMillis();
    String key = "aaa";

    ProducerRecord<String, byte[]> producerRecord = StructuredMessageReader
        .from(event, CSVFormat.INSTANCE)
        .read(KafkaMessageFactory.createWriter(topic, partition, timestamp, key));

    assertThat(producerRecord.topic())
        .isEqualTo(topic);
    assertThat(producerRecord.partition())
        .isEqualTo(partition);
    assertThat(producerRecord.timestamp())
        .isEqualTo(timestamp);
    assertThat(producerRecord.key())
        .isEqualTo(key);
    assertThat(producerRecord.headers())
        .containsExactly(new RecordHeader(KafkaHeaders.CONTENT_TYPE, expectedContentType.getBytes()));
    assertThat(producerRecord.value())
        .isEqualTo(expectedBuffer);
}
 
源代码25 项目: extension-kafka   文件: HeaderUtils.java
/**
 * Generates Kafka {@link Headers} based on an {@link EventMessage} and {@link SerializedObject}, using the given
 * {@code headerValueMapper} to correctly map the values to byte arrays.
 *
 * @param eventMessage      the {@link EventMessage} to create headers for
 * @param serializedObject  the serialized payload of the given {@code eventMessage}
 * @param headerValueMapper function for converting {@code values} to bytes. Since {@link RecordHeader} can handle
 *                          only bytes this function needs to define the logic how to convert a given value to
 *                          bytes. See {@link HeaderUtils#byteMapper()} for sample implementation
 * @return the generated Kafka {@link Headers} based on an {@link EventMessage} and {@link SerializedObject}
 */
public static Headers toHeaders(EventMessage<?> eventMessage,
                                SerializedObject<byte[]> serializedObject,
                                BiFunction<String, Object, RecordHeader> headerValueMapper) {
    notNull(eventMessage, () -> "EventMessage may not be null");
    notNull(serializedObject, () -> "SerializedObject may not be null");
    notNull(headerValueMapper, () -> "Header key-value mapper function may not be null");

    RecordHeaders headers = new RecordHeaders();
    eventMessage.getMetaData()
                .forEach((k, v) -> ((Headers) headers).add(headerValueMapper.apply(generateMetadataKey(k), v)));
    defaultHeaders(eventMessage, serializedObject).forEach((k, v) -> addHeader(headers, k, v));
    return headers;
}
 
源代码26 项目: extension-kafka   文件: HeaderUtilsTest.java
@Test
public void testByteMapperNullValueShouldBeAbleToHandle() {
    BiFunction<String, Object, RecordHeader> fxn = byteMapper();
    RecordHeader header = fxn.apply("abc", null);

    assertThat(header.value()).isNull();
}
 
源代码27 项目: extension-kafka   文件: HeaderUtilsTest.java
@Test
public void testGeneratingHeadersWithByteMapperShouldGenerateCorrectHeaders() {
    BiFunction<String, Object, RecordHeader> fxn = byteMapper();
    String expectedKey = "abc";
    String expectedValue = "xyz";
    RecordHeader header = fxn.apply(expectedKey, expectedValue);

    assertThat(header.key()).isEqualTo(expectedKey);
    assertThat(new String(header.value())).isEqualTo(expectedValue);
}
 
源代码28 项目: extension-kafka   文件: HeaderUtilsTest.java
@Test
public void testGeneratingHeadersWithCustomMapperShouldGeneratedCorrectHeaders() {
    String metaKey = "someHeaderKey";
    String expectedMetaDataValue = "evt:someValue";
    Headers header = toHeaders(
            asEventMessage("SomePayload").withMetaData(MetaData.with(metaKey, "someValue")),
            serializedObject(),
            (key, value) -> new RecordHeader(key, ("evt:" + value.toString()).getBytes())
    );

    assertThat(valueAsString(header, generateMetadataKey(metaKey))).isEqualTo(expectedMetaDataValue);
}
 
private void reconsumeLater(ConsumerRecord<String, byte[]> consumeRecord) throws InterruptedException, ExecutionException {

		// add all header to headList except RETRY_COUNT
		Headers headers = consumeRecord.headers();
		List<Header> headerList = new ArrayList<Header>(8);
		Iterator<Header> iterator = headers.iterator();
		Integer retryCount = -1;
		boolean hasOrignalHeader = false;
		while (iterator.hasNext()) {
			Header next = iterator.next();
			if (next.key().equals(RETRY_COUNT_KEY)) {
				retryCount = serializer.deserialize(next.value());
				continue;
			}
			
			if(next.key().equals(ORGINAL_TOPIC)){
				hasOrignalHeader = true;
			}
			headerList.add(next);
		}
		
		// add RETRY_COUNT to header
		retryCount++;
		headerList.add(new RecordHeader(RETRY_COUNT_KEY, serializer.serialization(retryCount)));
		
		if(!hasOrignalHeader){
			headerList.add(new RecordHeader(ORGINAL_TOPIC, serializer.serialization(consumeRecord.topic())));
		}

		// send message to corresponding queue according to retry times
		String retryTopic = calcRetryTopic(consumeRecord.topic(), retryCount);
		
		ProducerRecord<String, byte[]> record = new ProducerRecord<>(retryTopic,
				consumeRecord.partition() % retryQueuePartitionCount.get(retryTopic), null, consumeRecord.key(),
				consumeRecord.value(), headerList);
		Future<RecordMetadata> publishKafkaMessage = retryQueueMsgProducer.publishKafkaMessage(record);
		publishKafkaMessage.get();
	}
 
@Override
public EasyTransMsgPublishResult publish(String topic, String tag, String key, Map<String,Object> header, byte[] msgByte) {
	String kafkaTopic = QueueKafkaHelper.getKafkaTopic(topic, tag);
	
	//calculate partition
	TransactionId trxId = (TransactionId) header.get(EasytransConstant.CallHeadKeys.PARENT_TRX_ID_KEY);
	int partition = calcMessagePartition(kafkaTopic, trxId);
	
	List<Header> kafkaHeaderList = new ArrayList<>(header.size());
	for(Entry<String, Object> entry:header.entrySet()){
		kafkaHeaderList.add(new RecordHeader(entry.getKey(),serializer.serialization(entry.getValue())));
	}
	
	ProducerRecord<String, byte[]> record = new ProducerRecord<>(kafkaTopic, partition, null, key, msgByte, kafkaHeaderList);
	Future<RecordMetadata> sendResultFuture = kafkaProducer.send(record);
	try {
		RecordMetadata recordMetadata = sendResultFuture.get();
		log.info("message sent:" + recordMetadata);
	} catch (InterruptedException | ExecutionException e) {
		throw new RuntimeException("message sent error",e);
	}
	
	EasyTransMsgPublishResult easyTransMsgPublishResult = new EasyTransMsgPublishResult();
	easyTransMsgPublishResult.setTopic(topic);
	easyTransMsgPublishResult.setMessageId(key);
	return easyTransMsgPublishResult;
}
 
 类所在包
 类方法
 同包方法