类org.apache.kafka.common.header.Headers源码实例Demo

下面列出了怎么用org.apache.kafka.common.header.Headers的API类实例代码及写法,或者点击链接到github查看源代码。

/**
 * Gets the message type from the headers.  Throws if not found.
 *
 * @param headers the headers
 */
@SuppressWarnings("unchecked")
protected Class<T> getMessageType(Headers headers) {
    Header header = headers.lastHeader(JsonSchemaSerDeConstants.HEADER_MSG_TYPE);
    if (header == null) {
        throw new RuntimeException("Message Type not found in headers.");
    }
    String msgTypeName = IoUtil.toString(header.value());
    
    try {
        return (Class<T>) Thread.currentThread().getContextClassLoader().loadClass(msgTypeName);
    } catch (ClassNotFoundException ignored) {
    }
    try {
        return (Class<T>) Class.forName(msgTypeName);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}
 
源代码2 项目: fluent-kafka-streams-tests   文件: TestInput.java
/**
 * <p>Constructor for the test input topic.</p>
 *
 * @param testDriver Kafka's {@link TopologyTestDriver} used in this test.
 * @param topic Name of input topic.
 * @param keySerde Serde for key type in topic.
 * @param valueSerde Serde for value type in topic.
 */
protected TestInput(final TopologyTestDriver testDriver, final String topic, final Serde<K> keySerde,
        final Serde<V> valueSerde) {
    this.testDriver = testDriver;
    this.topic = topic;
    this.keySerde = keySerde;
    this.valueSerde = valueSerde;

    this.consumerFactory = new ConsumerRecordFactory<>(topic,
            keySerde == null ? new UnspecifiedSerializer<K>() : keySerde.serializer(),
            valueSerde == null ? new UnspecifiedSerializer<V>() : valueSerde.serializer()) {
        @Override
        public ConsumerRecord<byte[], byte[]> create(final String topicName, final K key, final V value,
                final Headers headers, final long timestampMs) {
            final ConsumerRecord<byte[], byte[]> record = super.create(topicName, key, value, headers, timestampMs);
            testDriver.pipeInput(record);
            return record;
        }
    };
}
 
@SuppressWarnings("unchecked")
@Nullable
private Map<String, String> decodeJsonTypes(Headers source) {
	Map<String, String> types = null;
	Header jsonTypes = source.lastHeader(JSON_TYPES);
	if (jsonTypes != null) {
		ObjectMapper headerObjectMapper = getObjectMapper();
		try {
			types = headerObjectMapper.readValue(jsonTypes.value(), Map.class);
		}
		catch (IOException e) {
			logger.error(e, () -> "Could not decode json types: " + new String(jsonTypes.value()));
		}
	}
	return types;
}
 
/**
 * Special header keys have a "_" prefix and are managed internally by the clients.
 * @param headers kafka headers object
 * @return any "special" headers container in the argument map
 */
public static Map<String, byte[]> fetchSpecialHeaders(Headers headers) {
  Map<String, byte[]> map = new HashMap<>();
  for (Header header : headers) {

    if (!header.key().startsWith("_")) {
      // skip any non special header
      continue;
    }

    if (map.containsKey(header.key())) {
      throw new IllegalStateException("Duplicate special header found " + header.key());
    }
    map.put(header.key(), header.value());
  }
  return map;
}
 
/**
 * Creates a new outgoing Kafka Message with a header added to the header list.
 *
 * @param key the header key
 * @param content the header key, must not be {@code null}
 * @return the updated Kafka Message.
 */
public OutgoingKafkaRecord<K, T> withHeader(String key, byte[] content) {
    Headers headers = getHeaders();
    Headers copy = new RecordHeaders(headers);
    copy.add(new Header() {
        @Override
        public String key() {
            return key;
        }

        @Override
        public byte[] value() {
            return content;
        }
    });
    return new OutgoingKafkaRecord<>(getTopic(), getKey(), getPayload(), getTimestamp(), getPartition(),
            copy, getAck(), getNack(), getMetadata());
}
 
/**
 * Creates a new outgoing Kafka Message with a header added to the header list.
 *
 * @param key the header key
 * @param content the header key, must not be {@code null}
 * @return the updated Kafka Message.
 */
public OutgoingKafkaRecord<K, T> withHeader(String key, String content) {
    Headers headers = getHeaders();
    Headers copy = new RecordHeaders(headers);
    copy.add(new Header() {
        @Override
        public String key() {
            return key;
        }

        @Override
        public byte[] value() {
            return content.getBytes();
        }
    });
    return new OutgoingKafkaRecord<>(getTopic(), getKey(), getPayload(), getTimestamp(), getPartition(),
            copy, getAck(), getNack(), getMetadata());
}
 
/**
 * Creates a new outgoing Kafka Message with a header added to the header list.
 *
 * @param key the header key
 * @param content the header key, must not be {@code null}
 * @param enc the encoding, must not be {@code null}
 * @return the updated Kafka Message.
 */
public OutgoingKafkaRecord<K, T> withHeader(String key, String content, Charset enc) {
    Headers headers = getHeaders();
    Headers copy = new RecordHeaders(headers);
    copy.add(new Header() {
        @Override
        public String key() {
            return key;
        }

        @Override
        public byte[] value() {
            return content.getBytes(enc);
        }
    });
    return new OutgoingKafkaRecord<>(getTopic(), getKey(), getPayload(), getTimestamp(), getPartition(),
            copy, getAck(), getNack(), getMetadata());
}
 
源代码8 项目: registry   文件: KafkaAvroSerdesTest.java
@Test
public void testSpecificSerializedGenericDeserialized() {
    Map<String, Object> config = new HashMap<>();
    config.put(AvroSnapshotDeserializer.SPECIFIC_AVRO_READER, false);
    KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient);
    kafkaAvroDeserializer.configure(config, false);

    KafkaAvroSerializer kafkaAvroSerializer = new KafkaAvroSerializer(schemaRegistryClient);
    kafkaAvroSerializer.configure(config, false);

    TestRecord record = new TestRecord();
    record.setField1("some value");
    record.setField1("some other value");

    byte[] bytes = kafkaAvroSerializer.serialize(topic, record);
    Object o = kafkaAvroDeserializer.deserialize(topic, bytes);
    checkSpecificSerializedGenericDeserializedEquals(record, o);

    Headers headers = new RecordHeaders();
    bytes = kafkaAvroSerializer.serialize(topic, headers, record);
    o = kafkaAvroDeserializer.deserialize(topic, headers, bytes);
    checkSpecificSerializedGenericDeserializedEquals(record, o);
}
 
源代码9 项目: registry   文件: KafkaAvroSerdesTest.java
@Test
public void testToggleStoringSchemaInHeader() {
    TestRecord record = new TestRecord();
    record.setField1("Hello");
    record.setField2("World");
    String keySchemaHeaderName = KafkaAvroSerde.DEFAULT_KEY_SCHEMA_VERSION_ID;

    for (Boolean storeScheamIdInHeader : Arrays.asList(true, false)) {
        Map<String, Object> configs = new HashMap<>();
        configs.put(KafkaAvroSerializer.STORE_SCHEMA_VERSION_ID_IN_HEADER, storeScheamIdInHeader.toString());
        configs.put(AbstractAvroSnapshotDeserializer.SPECIFIC_AVRO_READER, true);

        KafkaAvroSerde serde = new KafkaAvroSerde(schemaRegistryClient);
        final Serializer<Object> serializer = serde.serializer();
        serializer.configure(configs, true);

        Headers headers = new RecordHeaders();
        final byte[] bytes = serializer.serialize(topic, headers, record);
        Assert.assertEquals(storeScheamIdInHeader, headers.lastHeader(keySchemaHeaderName) != null);

        final Deserializer<Object> deserializer = serde.deserializer();
        deserializer.configure(configs, true);
        final TestRecord actual = (TestRecord) deserializer.deserialize(topic, headers, bytes);
        Assert.assertEquals(record, actual);
    }
}
 
源代码10 项目: beam   文件: KafkaRecord.java
public KafkaRecord(
    String topic,
    int partition,
    long offset,
    long timestamp,
    KafkaTimestampType timestampType,
    @Nullable Headers headers,
    KV<K, V> kv) {
  this.topic = topic;
  this.partition = partition;
  this.offset = offset;
  this.timestamp = timestamp;
  this.timestampType = timestampType;
  this.headers = headers;
  this.kv = kv;
}
 
源代码11 项目: java-kafka-client   文件: TracingKafkaUtilsTest.java
@Test
public void inject_two_contexts_and_extract() {
  MockSpan span = mockTracer.buildSpan("first").start();
  Headers headers = new RecordHeaders();
  assertEquals(0, headers.toArray().length);

  // inject first
  TracingKafkaUtils.inject(span.context(), headers, mockTracer);
  int headersLength = headers.toArray().length;
  assertTrue(headersLength > 0);

  // inject second
  MockSpan span2 = mockTracer.buildSpan("second").asChildOf(span.context()).start();
  TracingKafkaUtils.inject(span2.context(), headers, mockTracer);
  assertTrue(headers.toArray().length > headersLength);

  // check first
  MockSpan.MockContext spanContext = (MockSpan.MockContext) TracingKafkaUtils
      .extractSpanContext(headers, mockTracer);
  assertEquals(span2.context().spanId(), spanContext.spanId());
  assertEquals(span2.context().traceId(), spanContext.traceId());
}
 
源代码12 项目: sdk-java   文件: CloudEventMessageSerializerTest.java
@Test
public void serializerShouldWork() {
    String topic = "test";
    CloudEvent event = Data.V1_WITH_JSON_DATA;

    CloudEventMessageSerializer serializer = new CloudEventMessageSerializer();

    Headers headers = new RecordHeaders();

    MockBinaryMessageWriter inMessage = new MockBinaryMessageWriter();
    CloudEventUtils.toVisitable(event).read(inMessage);

    byte[] payload = serializer.serialize(topic, headers, inMessage);

    MessageReader outMessage = KafkaMessageFactory.createReader(headers, payload);

    assertThat(outMessage.getEncoding())
        .isEqualTo(Encoding.BINARY);
    assertThat(outMessage.toEvent())
        .isEqualTo(event);
}
 
源代码13 项目: beam   文件: ProducerRecordCoderTest.java
private ProducerRecord<String, String> verifySerialization(
    Headers headers, Integer partition, Long timestamp) throws IOException {
  ProducerRecord<String, String> producerRecord =
      new ProducerRecord<>("topic", partition, timestamp, "key", "value", headers);

  ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
  ProducerRecordCoder producerRecordCoder =
      ProducerRecordCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  producerRecordCoder.encode(producerRecord, outputStream);
  ProducerRecord<String, String> decodedRecord =
      producerRecordCoder.decode(new ByteArrayInputStream(outputStream.toByteArray()));

  assertEquals(producerRecord, decodedRecord);

  return decodedRecord;
}
 
源代码14 项目: sdk-java   文件: KafkaProducerMessageWriterTest.java
@ParameterizedTest
@MethodSource("binaryTestArguments")
void testRequestWithBinary(CloudEvent event, Headers expectedHeaders, byte[] expectedBody) {
    String topic = "test";
    Integer partition = 10;
    Long timestamp = System.currentTimeMillis();
    String key = "aaa";

    ProducerRecord<String, byte[]> producerRecord = KafkaMessageFactory
        .createWriter(topic, partition, timestamp, key)
        .writeBinary(event);

    assertThat(producerRecord.topic())
        .isEqualTo(topic);
    assertThat(producerRecord.partition())
        .isEqualTo(partition);
    assertThat(producerRecord.timestamp())
        .isEqualTo(timestamp);
    assertThat(producerRecord.key())
        .isEqualTo(key);
    assertThat(producerRecord.headers())
        .containsExactlyInAnyOrder(expectedHeaders.toArray());
    assertThat(producerRecord.value())
        .isEqualTo(expectedBody);
}
 
/**
 * @see org.apache.kafka.common.serialization.Serializer#serialize(java.lang.String, org.apache.kafka.common.header.Headers, java.lang.Object)
 */
@Override
public byte[] serialize(String topic, Headers headers, T data) {
    if (data == null) {
        return null;
    }

    // Now serialize the data
    try {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        JsonGenerator generator = mapper.getFactory().createGenerator(baos);
        if (isValidationEnabled()) {
            String artifactId = getArtifactId(topic, data);
            long globalId = getGlobalId(artifactId, topic, data);
            addSchemaHeaders(headers, artifactId, globalId);

            SchemaValidator schemaValidator = getSchemaCache().getSchema(globalId);
            generator = api.decorateJsonGenerator(schemaValidator, generator);
        }
        addTypeHeaders(headers, data);

        mapper.writeValue(generator, data);

        return baos.toByteArray();
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
}
 
/**
 * Adds appropriate information to the Headers so that the deserializer can function properly.
 *
 * @param headers    msg headers
 * @param artifactId artifact id
 * @param globalId   global id
 */
protected void addSchemaHeaders(Headers headers, String artifactId, long globalId) {
    // we never actually set this requirement for the globalId to be non-negative ... but it mostly is ...
    if (globalId >= 0) {
        ByteBuffer buff = ByteBuffer.allocate(8);
        buff.putLong(globalId);
        headers.add(JsonSchemaSerDeConstants.HEADER_GLOBAL_ID, buff.array());
    } else {
        headers.add(JsonSchemaSerDeConstants.HEADER_ARTIFACT_ID, IoUtil.toBytes(artifactId));
    }
}
 
源代码17 项目: brave   文件: KafkaStreamsTracing.java
void clearHeaders(Headers headers) {
  // Headers::remove creates and consumes an iterator each time. This does one loop instead.
  for (Iterator<Header> i = headers.iterator(); i.hasNext(); ) {
    Header next = i.next();
    if (propagationKeys.contains(next.key())) i.remove();
  }
}
 
/**
 * @see org.apache.kafka.common.serialization.Deserializer#deserialize(java.lang.String, org.apache.kafka.common.header.Headers, byte[])
 */
@Override
public T deserialize(String topic, Headers headers, byte[] data) {
    if (data == null) {
        return null;
    }
    
    try {
        JsonParser parser = mapper.getFactory().createParser(data);
        if (isValidationEnabled()) {
            Long globalId = getGlobalId(headers);
            
            // If no globalId is provided, check the alternative - which is to check for artifactId and 
            // (optionally) version.  If these are found, then convert that info to globalId.
            if (globalId == null) {
                String artifactId = getArtifactId(headers);
                Integer version = getVersion(headers);
                globalId = toGlobalId(artifactId, version);
            }
            
            SchemaValidator schema = getSchemaCache().getSchema(globalId);
            parser = api.decorateJsonParser(schema, parser);
        }
        
        Class<T> messageType = getMessageType(headers);

        return mapper.readValue(parser, messageType);
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
}
 
源代码19 项目: core-ng-project   文件: MessagePublisherImpl.java
private void linkContext(Headers headers) {
    headers.add(MessageHeaders.HEADER_CLIENT, Strings.bytes(LogManager.APP_NAME));

    ActionLog actionLog = LogManager.CURRENT_ACTION_LOG.get();
    if (actionLog == null) return;      // publisher may be used without action log context

    headers.add(MessageHeaders.HEADER_CORRELATION_ID, Strings.bytes(actionLog.correlationId()));
    if (actionLog.trace) headers.add(MessageHeaders.HEADER_TRACE, Strings.bytes("true"));
    headers.add(MessageHeaders.HEADER_REF_ID, Strings.bytes(actionLog.id));
}
 
/**
 * Gets the artifact id from the headers.  Throws if not found.
 *
 * @param headers the headers
 */
protected String getArtifactId(Headers headers) {
    Header header = headers.lastHeader(JsonSchemaSerDeConstants.HEADER_ARTIFACT_ID);
    if (header == null) {
        throw new RuntimeException("ArtifactId not found in headers.");
    }
    return IoUtil.toString(header.value());
}
 
源代码21 项目: singer   文件: LoggingAuditHeadersInjector.java
@Override
public Headers addHeaders(Headers headers, LogMessage logMessage) {
  try {
    headers.add(HEADER_KEY, SER.serialize(logMessage.getLoggingAuditHeaders()));
  } catch (TException e) {
    Stats.incr(SingerMetrics.NUMBER_OF_SERIALIZING_HEADERS_ERRORS);
    LOG.warn("Exception thrown while serializing loggingAuditHeaders", e);
  }
  return headers;
}
 
public HeadersMapExtractAdapter(Headers headers) {
  for (Header header : headers) {
    byte[] headerValue = header.value();
    map.put(header.key(),
        headerValue == null ? null : new String(headerValue, StandardCharsets.UTF_8));
  }
}
 
源代码23 项目: zerocode   文件: ProducerRawRecordsTest.java
@Test
public void testDeser_headers() {
    Headers headers = new RecordHeaders();
    headers.add("headerKey1", "headerValue1".getBytes());
    headers.add("headerKey2", "headerValue2".getBytes());
    ProducerRecord producerRecord  = new ProducerRecord("topic2", null, "key-123", "Hello", headers);
    String jsonBack = gson.toJson(producerRecord);
    JSONAssert.assertEquals("{\"topic\":\"topic2\",\"headers\":{\"headerKey1\":\"headerValue1\",\"headerKey2\":\"headerValue2\"},\"key\":\"key-123\",\"value\":\"Hello\"}", jsonBack, LENIENT);
}
 
源代码24 项目: kafka-backup   文件: Record.java
private boolean headersEqualityByValue(Headers a, Headers b) {
    // This is an alternative implementation of ConnectHeaders::equals that use proper Header equality by value
    if (a == b) {
        return true;
    }
    // Note, similar to ConnectHeaders::equals, it requires headers to have the same order
    // (although, that is probably not what we want in most cases)
    Iterator<Header> aIter = a.iterator();
    Iterator<Header> bIter = b.iterator();
    while (aIter.hasNext() && bIter.hasNext()) {
        if (!headerEqualityByValue(aIter.next(), bIter.next()))
            return false;
    }
    return !aIter.hasNext() && !bIter.hasNext();
}
 
源代码25 项目: kafka-workers   文件: BaseRecordWeigher.java
private long weigh(Headers headers) {
    long size = RECORD_HEADERS_INSTANCE_SIZE;
    for (Header header : headers) {
        size += RECORD_HEADER_INSTANCE_SIZE
                + StringWeigher.INSTANCE.weigh(header.key())
                // calling header.value() here may have impact on memory usage and performance as
                // it replaces ByteBuffer with byte[] inside RecordHeader
                + ByteArrayWeigher.INSTANCE.weigh(header.value());
    }
    return size;
}
 
源代码26 项目: kafka-encryption   文件: CryptoSerializer.java
/**
 * serialize data with encryption (if needed). Key reference will be looked for exclusively in the Kafka Header
 * {@link KafkaCryptoConstants#KEY_REF_HEADER}
 *
 * @param topic
 * @param headers
 * @param data
 * @return
 */
@Override
public byte[] serialize(String topic, Headers headers, T data) {
    byte[] serializedData = rawSerializer.serialize(topic, headers, data);
    if (serializedData == null) {
        return null;
    }
    Header keyReferenceHeader = headers.lastHeader(KEY_REF_HEADER);
    return encrypt(serializedData, keyReferenceHeader == null ? null : keyReferenceHeader.value());
}
 
/**
 * Fetch value of special timestamp header (_t)
 * @param headers ConsumerRecord headers
 * @return Returns null if _t does not exist otherwise returns the long value
 */
public static Long fetchTimestampHeader(Headers headers) {
  Map<String, byte[]> specialHeaders = fetchSpecialHeaders(headers);
  return specialHeaders.containsKey(Constants.TIMESTAMP_HEADER)
      ? PrimitiveEncoderDecoder.decodeLong(specialHeaders.get(Constants.TIMESTAMP_HEADER), 0)
      : null;
}
 
源代码28 项目: kafka-encryption   文件: CryptoDeserializerTest.java
@Test
public void testDeserializeWhenNoEncryptionStructure() {
    Headers headers = new RecordHeaders();

    byte[] clearValue = "clearValue".getBytes(StandardCharsets.UTF_8);

    given(rawDeserializer.deserialize("topic1", headers, clearValue)).willReturn("deserialized value");

    String value = cryptoDeserializer.deserialize("topic1", headers, clearValue);

    assertThat(value).isEqualTo("deserialized value");
    assertThat(headers.lastHeader(KafkaCryptoConstants.KEY_REF_HEADER).value()).isNull();
    verifyZeroInteractions(decryptor);
}
 
源代码29 项目: kafka-encryption   文件: CryptoSerializerTest.java
@Test
public void testSerializeWhenKeyRefHeaderIsSet() {
    final String keyRef = "org1";
    final int keyRefSize = toByteArray(keyRef).length;

    final String encoded = "encodedValue";
    final int encodedSize = toByteArray(encoded).length;

    Headers headers = new RecordHeaders().add(KafkaCryptoConstants.KEY_REF_HEADER, toByteArray(keyRef));
    given(rawSerializer.serialize("topic1", headers, "final value"))
            .willReturn("clear serialized value".getBytes(StandardCharsets.UTF_8));
    given(encryptor.encrypt("clear serialized value".getBytes(StandardCharsets.UTF_8), toByteArray(keyRef)))
            .willReturn(toByteArray(encoded));

    byte[] result = cryptoSerializer.serialize("topic1", headers, "final value");

    assertThat(result).hasSize(ENCRYPTED_PREFIX.length + Integer.BYTES + keyRefSize + encodedSize);
    ByteBuffer byteBuffer = ByteBuffer.wrap(result);
    byte[] prefix = new byte[ENCRYPTED_PREFIX.length];
    byteBuffer.get(prefix);
    assertThat(prefix).isEqualTo(ENCRYPTED_PREFIX);
    assertThat(byteBuffer.getInt()).isEqualTo(keyRefSize);
    byte[] resultKeyRef = new byte[keyRefSize];
    byteBuffer.get(resultKeyRef);
    assertThat(resultKeyRef).isEqualTo(toByteArray(keyRef));
    byte[] resultPayload = new byte[encodedSize];
    byteBuffer.get(resultPayload);
    assertThat(resultPayload).isEqualTo(toByteArray(encoded));

}
 
源代码30 项目: extension-kafka   文件: HeaderUtils.java
/**
 * Generates Kafka {@link Headers} based on an {@link EventMessage} and {@link SerializedObject}, using the given
 * {@code headerValueMapper} to correctly map the values to byte arrays.
 *
 * @param eventMessage      the {@link EventMessage} to create headers for
 * @param serializedObject  the serialized payload of the given {@code eventMessage}
 * @param headerValueMapper function for converting {@code values} to bytes. Since {@link RecordHeader} can handle
 *                          only bytes this function needs to define the logic how to convert a given value to
 *                          bytes. See {@link HeaderUtils#byteMapper()} for sample implementation
 * @return the generated Kafka {@link Headers} based on an {@link EventMessage} and {@link SerializedObject}
 */
public static Headers toHeaders(EventMessage<?> eventMessage,
                                SerializedObject<byte[]> serializedObject,
                                BiFunction<String, Object, RecordHeader> headerValueMapper) {
    notNull(eventMessage, () -> "EventMessage may not be null");
    notNull(serializedObject, () -> "SerializedObject may not be null");
    notNull(headerValueMapper, () -> "Header key-value mapper function may not be null");

    RecordHeaders headers = new RecordHeaders();
    eventMessage.getMetaData()
                .forEach((k, v) -> ((Headers) headers).add(headerValueMapper.apply(generateMetadataKey(k), v)));
    defaultHeaders(eventMessage, serializedObject).forEach((k, v) -> addHeader(headers, k, v));
    return headers;
}
 
 类所在包
 类方法
 同包方法