下面列出了怎么用org.apache.kafka.common.header.Headers的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Gets the message type from the headers. Throws if not found.
*
* @param headers the headers
*/
@SuppressWarnings("unchecked")
protected Class<T> getMessageType(Headers headers) {
Header header = headers.lastHeader(JsonSchemaSerDeConstants.HEADER_MSG_TYPE);
if (header == null) {
throw new RuntimeException("Message Type not found in headers.");
}
String msgTypeName = IoUtil.toString(header.value());
try {
return (Class<T>) Thread.currentThread().getContextClassLoader().loadClass(msgTypeName);
} catch (ClassNotFoundException ignored) {
}
try {
return (Class<T>) Class.forName(msgTypeName);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* <p>Constructor for the test input topic.</p>
*
* @param testDriver Kafka's {@link TopologyTestDriver} used in this test.
* @param topic Name of input topic.
* @param keySerde Serde for key type in topic.
* @param valueSerde Serde for value type in topic.
*/
protected TestInput(final TopologyTestDriver testDriver, final String topic, final Serde<K> keySerde,
final Serde<V> valueSerde) {
this.testDriver = testDriver;
this.topic = topic;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
this.consumerFactory = new ConsumerRecordFactory<>(topic,
keySerde == null ? new UnspecifiedSerializer<K>() : keySerde.serializer(),
valueSerde == null ? new UnspecifiedSerializer<V>() : valueSerde.serializer()) {
@Override
public ConsumerRecord<byte[], byte[]> create(final String topicName, final K key, final V value,
final Headers headers, final long timestampMs) {
final ConsumerRecord<byte[], byte[]> record = super.create(topicName, key, value, headers, timestampMs);
testDriver.pipeInput(record);
return record;
}
};
}
@SuppressWarnings("unchecked")
@Nullable
private Map<String, String> decodeJsonTypes(Headers source) {
Map<String, String> types = null;
Header jsonTypes = source.lastHeader(JSON_TYPES);
if (jsonTypes != null) {
ObjectMapper headerObjectMapper = getObjectMapper();
try {
types = headerObjectMapper.readValue(jsonTypes.value(), Map.class);
}
catch (IOException e) {
logger.error(e, () -> "Could not decode json types: " + new String(jsonTypes.value()));
}
}
return types;
}
/**
* Special header keys have a "_" prefix and are managed internally by the clients.
* @param headers kafka headers object
* @return any "special" headers container in the argument map
*/
public static Map<String, byte[]> fetchSpecialHeaders(Headers headers) {
Map<String, byte[]> map = new HashMap<>();
for (Header header : headers) {
if (!header.key().startsWith("_")) {
// skip any non special header
continue;
}
if (map.containsKey(header.key())) {
throw new IllegalStateException("Duplicate special header found " + header.key());
}
map.put(header.key(), header.value());
}
return map;
}
/**
* Creates a new outgoing Kafka Message with a header added to the header list.
*
* @param key the header key
* @param content the header key, must not be {@code null}
* @return the updated Kafka Message.
*/
public OutgoingKafkaRecord<K, T> withHeader(String key, byte[] content) {
Headers headers = getHeaders();
Headers copy = new RecordHeaders(headers);
copy.add(new Header() {
@Override
public String key() {
return key;
}
@Override
public byte[] value() {
return content;
}
});
return new OutgoingKafkaRecord<>(getTopic(), getKey(), getPayload(), getTimestamp(), getPartition(),
copy, getAck(), getNack(), getMetadata());
}
/**
* Creates a new outgoing Kafka Message with a header added to the header list.
*
* @param key the header key
* @param content the header key, must not be {@code null}
* @return the updated Kafka Message.
*/
public OutgoingKafkaRecord<K, T> withHeader(String key, String content) {
Headers headers = getHeaders();
Headers copy = new RecordHeaders(headers);
copy.add(new Header() {
@Override
public String key() {
return key;
}
@Override
public byte[] value() {
return content.getBytes();
}
});
return new OutgoingKafkaRecord<>(getTopic(), getKey(), getPayload(), getTimestamp(), getPartition(),
copy, getAck(), getNack(), getMetadata());
}
/**
* Creates a new outgoing Kafka Message with a header added to the header list.
*
* @param key the header key
* @param content the header key, must not be {@code null}
* @param enc the encoding, must not be {@code null}
* @return the updated Kafka Message.
*/
public OutgoingKafkaRecord<K, T> withHeader(String key, String content, Charset enc) {
Headers headers = getHeaders();
Headers copy = new RecordHeaders(headers);
copy.add(new Header() {
@Override
public String key() {
return key;
}
@Override
public byte[] value() {
return content.getBytes(enc);
}
});
return new OutgoingKafkaRecord<>(getTopic(), getKey(), getPayload(), getTimestamp(), getPartition(),
copy, getAck(), getNack(), getMetadata());
}
@Test
public void testSpecificSerializedGenericDeserialized() {
Map<String, Object> config = new HashMap<>();
config.put(AvroSnapshotDeserializer.SPECIFIC_AVRO_READER, false);
KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient);
kafkaAvroDeserializer.configure(config, false);
KafkaAvroSerializer kafkaAvroSerializer = new KafkaAvroSerializer(schemaRegistryClient);
kafkaAvroSerializer.configure(config, false);
TestRecord record = new TestRecord();
record.setField1("some value");
record.setField1("some other value");
byte[] bytes = kafkaAvroSerializer.serialize(topic, record);
Object o = kafkaAvroDeserializer.deserialize(topic, bytes);
checkSpecificSerializedGenericDeserializedEquals(record, o);
Headers headers = new RecordHeaders();
bytes = kafkaAvroSerializer.serialize(topic, headers, record);
o = kafkaAvroDeserializer.deserialize(topic, headers, bytes);
checkSpecificSerializedGenericDeserializedEquals(record, o);
}
@Test
public void testToggleStoringSchemaInHeader() {
TestRecord record = new TestRecord();
record.setField1("Hello");
record.setField2("World");
String keySchemaHeaderName = KafkaAvroSerde.DEFAULT_KEY_SCHEMA_VERSION_ID;
for (Boolean storeScheamIdInHeader : Arrays.asList(true, false)) {
Map<String, Object> configs = new HashMap<>();
configs.put(KafkaAvroSerializer.STORE_SCHEMA_VERSION_ID_IN_HEADER, storeScheamIdInHeader.toString());
configs.put(AbstractAvroSnapshotDeserializer.SPECIFIC_AVRO_READER, true);
KafkaAvroSerde serde = new KafkaAvroSerde(schemaRegistryClient);
final Serializer<Object> serializer = serde.serializer();
serializer.configure(configs, true);
Headers headers = new RecordHeaders();
final byte[] bytes = serializer.serialize(topic, headers, record);
Assert.assertEquals(storeScheamIdInHeader, headers.lastHeader(keySchemaHeaderName) != null);
final Deserializer<Object> deserializer = serde.deserializer();
deserializer.configure(configs, true);
final TestRecord actual = (TestRecord) deserializer.deserialize(topic, headers, bytes);
Assert.assertEquals(record, actual);
}
}
public KafkaRecord(
String topic,
int partition,
long offset,
long timestamp,
KafkaTimestampType timestampType,
@Nullable Headers headers,
KV<K, V> kv) {
this.topic = topic;
this.partition = partition;
this.offset = offset;
this.timestamp = timestamp;
this.timestampType = timestampType;
this.headers = headers;
this.kv = kv;
}
@Test
public void inject_two_contexts_and_extract() {
MockSpan span = mockTracer.buildSpan("first").start();
Headers headers = new RecordHeaders();
assertEquals(0, headers.toArray().length);
// inject first
TracingKafkaUtils.inject(span.context(), headers, mockTracer);
int headersLength = headers.toArray().length;
assertTrue(headersLength > 0);
// inject second
MockSpan span2 = mockTracer.buildSpan("second").asChildOf(span.context()).start();
TracingKafkaUtils.inject(span2.context(), headers, mockTracer);
assertTrue(headers.toArray().length > headersLength);
// check first
MockSpan.MockContext spanContext = (MockSpan.MockContext) TracingKafkaUtils
.extractSpanContext(headers, mockTracer);
assertEquals(span2.context().spanId(), spanContext.spanId());
assertEquals(span2.context().traceId(), spanContext.traceId());
}
@Test
public void serializerShouldWork() {
String topic = "test";
CloudEvent event = Data.V1_WITH_JSON_DATA;
CloudEventMessageSerializer serializer = new CloudEventMessageSerializer();
Headers headers = new RecordHeaders();
MockBinaryMessageWriter inMessage = new MockBinaryMessageWriter();
CloudEventUtils.toVisitable(event).read(inMessage);
byte[] payload = serializer.serialize(topic, headers, inMessage);
MessageReader outMessage = KafkaMessageFactory.createReader(headers, payload);
assertThat(outMessage.getEncoding())
.isEqualTo(Encoding.BINARY);
assertThat(outMessage.toEvent())
.isEqualTo(event);
}
private ProducerRecord<String, String> verifySerialization(
Headers headers, Integer partition, Long timestamp) throws IOException {
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>("topic", partition, timestamp, "key", "value", headers);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ProducerRecordCoder producerRecordCoder =
ProducerRecordCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
producerRecordCoder.encode(producerRecord, outputStream);
ProducerRecord<String, String> decodedRecord =
producerRecordCoder.decode(new ByteArrayInputStream(outputStream.toByteArray()));
assertEquals(producerRecord, decodedRecord);
return decodedRecord;
}
@ParameterizedTest
@MethodSource("binaryTestArguments")
void testRequestWithBinary(CloudEvent event, Headers expectedHeaders, byte[] expectedBody) {
String topic = "test";
Integer partition = 10;
Long timestamp = System.currentTimeMillis();
String key = "aaa";
ProducerRecord<String, byte[]> producerRecord = KafkaMessageFactory
.createWriter(topic, partition, timestamp, key)
.writeBinary(event);
assertThat(producerRecord.topic())
.isEqualTo(topic);
assertThat(producerRecord.partition())
.isEqualTo(partition);
assertThat(producerRecord.timestamp())
.isEqualTo(timestamp);
assertThat(producerRecord.key())
.isEqualTo(key);
assertThat(producerRecord.headers())
.containsExactlyInAnyOrder(expectedHeaders.toArray());
assertThat(producerRecord.value())
.isEqualTo(expectedBody);
}
/**
* @see org.apache.kafka.common.serialization.Serializer#serialize(java.lang.String, org.apache.kafka.common.header.Headers, java.lang.Object)
*/
@Override
public byte[] serialize(String topic, Headers headers, T data) {
if (data == null) {
return null;
}
// Now serialize the data
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
JsonGenerator generator = mapper.getFactory().createGenerator(baos);
if (isValidationEnabled()) {
String artifactId = getArtifactId(topic, data);
long globalId = getGlobalId(artifactId, topic, data);
addSchemaHeaders(headers, artifactId, globalId);
SchemaValidator schemaValidator = getSchemaCache().getSchema(globalId);
generator = api.decorateJsonGenerator(schemaValidator, generator);
}
addTypeHeaders(headers, data);
mapper.writeValue(generator, data);
return baos.toByteArray();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/**
* Adds appropriate information to the Headers so that the deserializer can function properly.
*
* @param headers msg headers
* @param artifactId artifact id
* @param globalId global id
*/
protected void addSchemaHeaders(Headers headers, String artifactId, long globalId) {
// we never actually set this requirement for the globalId to be non-negative ... but it mostly is ...
if (globalId >= 0) {
ByteBuffer buff = ByteBuffer.allocate(8);
buff.putLong(globalId);
headers.add(JsonSchemaSerDeConstants.HEADER_GLOBAL_ID, buff.array());
} else {
headers.add(JsonSchemaSerDeConstants.HEADER_ARTIFACT_ID, IoUtil.toBytes(artifactId));
}
}
void clearHeaders(Headers headers) {
// Headers::remove creates and consumes an iterator each time. This does one loop instead.
for (Iterator<Header> i = headers.iterator(); i.hasNext(); ) {
Header next = i.next();
if (propagationKeys.contains(next.key())) i.remove();
}
}
/**
* @see org.apache.kafka.common.serialization.Deserializer#deserialize(java.lang.String, org.apache.kafka.common.header.Headers, byte[])
*/
@Override
public T deserialize(String topic, Headers headers, byte[] data) {
if (data == null) {
return null;
}
try {
JsonParser parser = mapper.getFactory().createParser(data);
if (isValidationEnabled()) {
Long globalId = getGlobalId(headers);
// If no globalId is provided, check the alternative - which is to check for artifactId and
// (optionally) version. If these are found, then convert that info to globalId.
if (globalId == null) {
String artifactId = getArtifactId(headers);
Integer version = getVersion(headers);
globalId = toGlobalId(artifactId, version);
}
SchemaValidator schema = getSchemaCache().getSchema(globalId);
parser = api.decorateJsonParser(schema, parser);
}
Class<T> messageType = getMessageType(headers);
return mapper.readValue(parser, messageType);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
private void linkContext(Headers headers) {
headers.add(MessageHeaders.HEADER_CLIENT, Strings.bytes(LogManager.APP_NAME));
ActionLog actionLog = LogManager.CURRENT_ACTION_LOG.get();
if (actionLog == null) return; // publisher may be used without action log context
headers.add(MessageHeaders.HEADER_CORRELATION_ID, Strings.bytes(actionLog.correlationId()));
if (actionLog.trace) headers.add(MessageHeaders.HEADER_TRACE, Strings.bytes("true"));
headers.add(MessageHeaders.HEADER_REF_ID, Strings.bytes(actionLog.id));
}
/**
* Gets the artifact id from the headers. Throws if not found.
*
* @param headers the headers
*/
protected String getArtifactId(Headers headers) {
Header header = headers.lastHeader(JsonSchemaSerDeConstants.HEADER_ARTIFACT_ID);
if (header == null) {
throw new RuntimeException("ArtifactId not found in headers.");
}
return IoUtil.toString(header.value());
}
@Override
public Headers addHeaders(Headers headers, LogMessage logMessage) {
try {
headers.add(HEADER_KEY, SER.serialize(logMessage.getLoggingAuditHeaders()));
} catch (TException e) {
Stats.incr(SingerMetrics.NUMBER_OF_SERIALIZING_HEADERS_ERRORS);
LOG.warn("Exception thrown while serializing loggingAuditHeaders", e);
}
return headers;
}
public HeadersMapExtractAdapter(Headers headers) {
for (Header header : headers) {
byte[] headerValue = header.value();
map.put(header.key(),
headerValue == null ? null : new String(headerValue, StandardCharsets.UTF_8));
}
}
@Test
public void testDeser_headers() {
Headers headers = new RecordHeaders();
headers.add("headerKey1", "headerValue1".getBytes());
headers.add("headerKey2", "headerValue2".getBytes());
ProducerRecord producerRecord = new ProducerRecord("topic2", null, "key-123", "Hello", headers);
String jsonBack = gson.toJson(producerRecord);
JSONAssert.assertEquals("{\"topic\":\"topic2\",\"headers\":{\"headerKey1\":\"headerValue1\",\"headerKey2\":\"headerValue2\"},\"key\":\"key-123\",\"value\":\"Hello\"}", jsonBack, LENIENT);
}
private boolean headersEqualityByValue(Headers a, Headers b) {
// This is an alternative implementation of ConnectHeaders::equals that use proper Header equality by value
if (a == b) {
return true;
}
// Note, similar to ConnectHeaders::equals, it requires headers to have the same order
// (although, that is probably not what we want in most cases)
Iterator<Header> aIter = a.iterator();
Iterator<Header> bIter = b.iterator();
while (aIter.hasNext() && bIter.hasNext()) {
if (!headerEqualityByValue(aIter.next(), bIter.next()))
return false;
}
return !aIter.hasNext() && !bIter.hasNext();
}
private long weigh(Headers headers) {
long size = RECORD_HEADERS_INSTANCE_SIZE;
for (Header header : headers) {
size += RECORD_HEADER_INSTANCE_SIZE
+ StringWeigher.INSTANCE.weigh(header.key())
// calling header.value() here may have impact on memory usage and performance as
// it replaces ByteBuffer with byte[] inside RecordHeader
+ ByteArrayWeigher.INSTANCE.weigh(header.value());
}
return size;
}
/**
* serialize data with encryption (if needed). Key reference will be looked for exclusively in the Kafka Header
* {@link KafkaCryptoConstants#KEY_REF_HEADER}
*
* @param topic
* @param headers
* @param data
* @return
*/
@Override
public byte[] serialize(String topic, Headers headers, T data) {
byte[] serializedData = rawSerializer.serialize(topic, headers, data);
if (serializedData == null) {
return null;
}
Header keyReferenceHeader = headers.lastHeader(KEY_REF_HEADER);
return encrypt(serializedData, keyReferenceHeader == null ? null : keyReferenceHeader.value());
}
/**
* Fetch value of special timestamp header (_t)
* @param headers ConsumerRecord headers
* @return Returns null if _t does not exist otherwise returns the long value
*/
public static Long fetchTimestampHeader(Headers headers) {
Map<String, byte[]> specialHeaders = fetchSpecialHeaders(headers);
return specialHeaders.containsKey(Constants.TIMESTAMP_HEADER)
? PrimitiveEncoderDecoder.decodeLong(specialHeaders.get(Constants.TIMESTAMP_HEADER), 0)
: null;
}
@Test
public void testDeserializeWhenNoEncryptionStructure() {
Headers headers = new RecordHeaders();
byte[] clearValue = "clearValue".getBytes(StandardCharsets.UTF_8);
given(rawDeserializer.deserialize("topic1", headers, clearValue)).willReturn("deserialized value");
String value = cryptoDeserializer.deserialize("topic1", headers, clearValue);
assertThat(value).isEqualTo("deserialized value");
assertThat(headers.lastHeader(KafkaCryptoConstants.KEY_REF_HEADER).value()).isNull();
verifyZeroInteractions(decryptor);
}
@Test
public void testSerializeWhenKeyRefHeaderIsSet() {
final String keyRef = "org1";
final int keyRefSize = toByteArray(keyRef).length;
final String encoded = "encodedValue";
final int encodedSize = toByteArray(encoded).length;
Headers headers = new RecordHeaders().add(KafkaCryptoConstants.KEY_REF_HEADER, toByteArray(keyRef));
given(rawSerializer.serialize("topic1", headers, "final value"))
.willReturn("clear serialized value".getBytes(StandardCharsets.UTF_8));
given(encryptor.encrypt("clear serialized value".getBytes(StandardCharsets.UTF_8), toByteArray(keyRef)))
.willReturn(toByteArray(encoded));
byte[] result = cryptoSerializer.serialize("topic1", headers, "final value");
assertThat(result).hasSize(ENCRYPTED_PREFIX.length + Integer.BYTES + keyRefSize + encodedSize);
ByteBuffer byteBuffer = ByteBuffer.wrap(result);
byte[] prefix = new byte[ENCRYPTED_PREFIX.length];
byteBuffer.get(prefix);
assertThat(prefix).isEqualTo(ENCRYPTED_PREFIX);
assertThat(byteBuffer.getInt()).isEqualTo(keyRefSize);
byte[] resultKeyRef = new byte[keyRefSize];
byteBuffer.get(resultKeyRef);
assertThat(resultKeyRef).isEqualTo(toByteArray(keyRef));
byte[] resultPayload = new byte[encodedSize];
byteBuffer.get(resultPayload);
assertThat(resultPayload).isEqualTo(toByteArray(encoded));
}
/**
* Generates Kafka {@link Headers} based on an {@link EventMessage} and {@link SerializedObject}, using the given
* {@code headerValueMapper} to correctly map the values to byte arrays.
*
* @param eventMessage the {@link EventMessage} to create headers for
* @param serializedObject the serialized payload of the given {@code eventMessage}
* @param headerValueMapper function for converting {@code values} to bytes. Since {@link RecordHeader} can handle
* only bytes this function needs to define the logic how to convert a given value to
* bytes. See {@link HeaderUtils#byteMapper()} for sample implementation
* @return the generated Kafka {@link Headers} based on an {@link EventMessage} and {@link SerializedObject}
*/
public static Headers toHeaders(EventMessage<?> eventMessage,
SerializedObject<byte[]> serializedObject,
BiFunction<String, Object, RecordHeader> headerValueMapper) {
notNull(eventMessage, () -> "EventMessage may not be null");
notNull(serializedObject, () -> "SerializedObject may not be null");
notNull(headerValueMapper, () -> "Header key-value mapper function may not be null");
RecordHeaders headers = new RecordHeaders();
eventMessage.getMetaData()
.forEach((k, v) -> ((Headers) headers).add(headerValueMapper.apply(generateMetadataKey(k), v)));
defaultHeaders(eventMessage, serializedObject).forEach((k, v) -> addHeader(headers, k, v));
return headers;
}