下面列出了怎么用org.apache.kafka.common.record.TimestampType的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void poll() throws Exception {
Payload<byte[]> payload = new Payload<>((byte) 0, 1, "{}".getBytes(UTF_8));
ConsumerRecord<Void, Payload<byte[]>> consumerRecord = new ConsumerRecord<>(topicName, 0, 1L, 2L,
TimestampType.CREATE_TIME, ConsumerRecord.NULL_CHECKSUM, ConsumerRecord.NULL_SIZE, ConsumerRecord.NULL_SIZE,
null, payload);
Map<TopicPartition, List<ConsumerRecord<Void, Payload<byte[]>>>> recordsMaps = singletonMap(topicPartition,
singletonList(consumerRecord));
ConsumerRecords<Void, Payload<byte[]>> records = new ConsumerRecords<>(recordsMaps);
when(consumer.poll(100)).thenReturn(records);
when(payloadDecoder.decode(any(), any())).thenReturn(mapper.createObjectNode());
Record record = new Record(0, 1L, 2L, new Payload<JsonNode>((byte) 0, 1, mapper.createObjectNode()));
underTest.init(1L, rebalanceListener);
Iterable<Record> result = underTest.poll();
assertThat(Iterables.size(result), is(1));
assertThat(Iterables.get(result, 0), is(record));
}
@Override
MessageAndOffset getMessageAndOffset(ConsumerRecord message, boolean isEnabled) {
MessageAndOffset messageAndOffset;
if (message.timestampType() != TimestampType.NO_TIMESTAMP_TYPE && message.timestamp() > 0 && isEnabled) {
messageAndOffset = new MessageAndOffsetWithTimestamp(
message.key(),
message.value(),
message.offset(),
message.partition(),
message.timestamp(),
message.timestampType().toString()
);
} else {
messageAndOffset = new MessageAndOffset(message.key(), message.value(), message.offset(), message.partition());
}
return messageAndOffset;
}
private ByteBuffer newMemoryRecordsBuffer(List<SimpleRecord> records,
long producerId,
short producerEpoch,
boolean isTxnOffsetCommit) {
TimestampType timestampType = TimestampType.CREATE_TIME;
long timestamp = Time.SYSTEM.milliseconds();
ByteBuffer buffer = ByteBuffer.allocate(
AbstractRecords.estimateSizeInBytes(
RecordBatch.CURRENT_MAGIC_VALUE, offsetConfig.offsetsTopicCompressionType(), records
)
);
MemoryRecordsBuilder builder = MemoryRecords.builder(
buffer, RecordBatch.CURRENT_MAGIC_VALUE, offsetConfig.offsetsTopicCompressionType(),
timestampType, 0L, timestamp,
producerId,
producerEpoch,
0,
isTxnOffsetCommit,
RecordBatch.NO_PARTITION_LEADER_EPOCH
);
records.forEach(builder::append);
return builder.build().buffer();
}
private int completeTransactionalOffsetCommit(ByteBuffer buffer,
long producerId,
short producerEpoch,
long baseOffset,
boolean isCommit) {
MemoryRecordsBuilder builder = MemoryRecords.builder(
buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
TimestampType.LOG_APPEND_TIME, baseOffset, Time.SYSTEM.milliseconds(),
producerId, producerEpoch, 0, true, true,
RecordBatch.NO_PARTITION_LEADER_EPOCH);
ControlRecordType controlRecordType;
if (isCommit) {
controlRecordType = ControlRecordType.COMMIT;
} else {
controlRecordType = ControlRecordType.ABORT;
}
builder.appendEndTxnMarker(Time.SYSTEM.milliseconds(), new EndTransactionMarker(controlRecordType, 0));
builder.build();
return 1;
}
private SinkRecord createRecord(final String topic,
final int partition,
final String key,
final String value,
final int offset,
final long timestamp) {
return new SinkRecord(
topic,
partition,
Schema.BYTES_SCHEMA,
key.getBytes(StandardCharsets.UTF_8),
Schema.BYTES_SCHEMA,
value.getBytes(StandardCharsets.UTF_8),
offset,
timestamp,
TimestampType.CREATE_TIME);
}
private SinkRecord createRecordStringKey(final String topic,
final int partition,
final String key,
final String value,
final int offset,
final long timestamp) {
return new SinkRecord(
topic,
partition,
Schema.OPTIONAL_STRING_SCHEMA,
key,
Schema.BYTES_SCHEMA,
value.getBytes(StandardCharsets.UTF_8),
offset,
timestamp,
TimestampType.CREATE_TIME);
}
@Test
public void simpleRoundtripTest() throws Exception {
int partition = 0;
List<Record> records = new ArrayList<>();
records.add(new Record(TOPIC, partition, KEY_BYTES, VALUE_BYTES, 0));
records.add(new Record(TOPIC, partition, null, null, 1));
records.add(new Record(TOPIC, partition, new byte[0], new byte[0], 2));
records.add(new Record(TOPIC, partition, KEY_BYTES, VALUE_BYTES, 3, null, TimestampType.NO_TIMESTAMP_TYPE, HEADERS));
PartitionWriter partitionWriter = new PartitionWriter(TOPIC, partition, TEMP_DIR, 50);
partitionWriter.append(records.get(0));
partitionWriter.append(records.get(1));
partitionWriter.append(records.get(2));
partitionWriter.append(records.get(3));
partitionWriter.close();
PartitionReader partitionReader = new PartitionReader(TOPIC, partition, TEMP_DIR);
assertEquals(records, partitionReader.readFully());
assertFalse(partitionReader.hasMoreData());
partitionReader.seek(1);
assertEquals(records.get(1), partitionReader.read());
partitionReader.seek(3);
assertEquals(records.get(3), partitionReader.read());
assertFalse(partitionReader.hasMoreData());
}
@Test
public void restoreTest() throws Exception {
int partition = 1;
List<Record> records = new ArrayList<>();
records.add(new Record(TOPIC, partition, KEY_BYTES, VALUE_BYTES, 0));
records.add(new Record(TOPIC, partition, null, null, 1));
records.add(new Record(TOPIC, partition, new byte[0], new byte[0], 2));
records.add(new Record(TOPIC, partition, KEY_BYTES, VALUE_BYTES, 3, null, TimestampType.NO_TIMESTAMP_TYPE));
SegmentWriter segmentWriter = new SegmentWriter(TOPIC, partition, 0, TEMP_DIR);
for (Record record : records) {
segmentWriter.append(record);
}
segmentWriter.close();
Path indexFile = SegmentUtils.indexFile(TEMP_DIR, partition, 0);
SegmentIndex a = new SegmentIndex(indexFile);
Files.delete(indexFile);
SegmentIndexRestore restore = new SegmentIndexRestore(SegmentUtils.recordsFile(TEMP_DIR, partition, 0));
restore.restore();
SegmentIndex b = new SegmentIndex(indexFile);
assertEquals(a.index(), b.index());
}
/**
* Utility function to be run once when the format on disk changes to be able to stay backwards-compatible
* <p>
* Call it manually once when the format changes
*/
private static void writeTestSegmentsToFile() throws Exception {
int partition = 0;
Path directory = Paths.get("src/test/assets/v1/segments"); // CHANGEME WHEN CHANGING DATA FORMAT!
Files.createDirectories(directory);
List<Record> records = new ArrayList<>();
records.add(new Record(TOPIC, partition, KEY_BYTES, VALUE_BYTES, 0));
records.add(new Record(TOPIC, partition, null, null, 1));
records.add(new Record(TOPIC, partition, KEY_BYTES, VALUE_BYTES, 3, null, TimestampType.NO_TIMESTAMP_TYPE, HEADERS));
records.add(new Record(TOPIC, partition, KEY_BYTES, VALUE_BYTES, 10));
SegmentWriter segmentWriter = new SegmentWriter(TOPIC, partition, 0, directory);
for (Record record : records) {
segmentWriter.append(record);
}
}
@Override
MessageAndOffset getMessageAndOffset(ConsumerRecord message, boolean isEnabled) {
MessageAndOffset messageAndOffset;
if (message.timestampType() != TimestampType.NO_TIMESTAMP_TYPE && message.timestamp() > 0 && isEnabled) {
messageAndOffset = new MessageAndOffsetWithTimestamp(
message.key(),
message.value(),
message.offset(),
message.partition(),
message.timestamp(),
message.timestampType().toString()
);
} else {
messageAndOffset = new MessageAndOffset(message.key(), message.value(), message.offset(), message.partition());
}
return messageAndOffset;
}
@Test
public void shouldDisplayRateThroughput() throws Exception {
ConsumerCollector collector = new ConsumerCollector();//
collector.configure(new Metrics(), "group", new SystemTime());
for (int i = 0; i < 100; i++){
Map<TopicPartition, List<ConsumerRecord<Object, Object>>> records = ImmutableMap.of(
new TopicPartition(TEST_TOPIC, 1), Arrays.asList(
new ConsumerRecord<>(TEST_TOPIC, 1, i, 1l, TimestampType.CREATE_TIME, 1l, 10, 10, "key", "1234567890")) );
ConsumerRecords<Object, Object> consumerRecords = new ConsumerRecords<>(records);
collector.onConsume(consumerRecords);
}
Collection<TopicSensors.Stat> stats = collector.stats(TEST_TOPIC, false);
assertNotNull(stats);
assertThat( stats.toString(), containsString("name=consumer-messages-per-sec,"));
assertThat( stats.toString(), containsString("total-messages, value=100.0"));
}
@Test
public void shouldAggregateStatsAcrossAllConsumers() throws Exception {
ConsumerCollector collector1 = new ConsumerCollector();
collector1.configure(ImmutableMap.of(ConsumerConfig.CLIENT_ID_CONFIG, "client1"));
ConsumerCollector collector2 = new ConsumerCollector();
collector2.configure(ImmutableMap.of(ConsumerConfig.CLIENT_ID_CONFIG, "client2"));
Map<TopicPartition, List<ConsumerRecord<Object, Object>>> records = new HashMap<>();
List<ConsumerRecord<Object, Object>> recordList = new ArrayList<>();
for (int i = 0; i < 500; i++) {
recordList.add(new ConsumerRecord<>(TEST_TOPIC, 1, 1, 1l, TimestampType
.CREATE_TIME, 1l, 10, 10, "key", "1234567890"));
}
records.put(new TopicPartition(TEST_TOPIC, 1), recordList);
ConsumerRecords<Object, Object> consumerRecords = new ConsumerRecords<>(records);
collector1.onConsume(consumerRecords);
collector2.onConsume(consumerRecords);
// Same as the above test, the kafka `Rate` measurable stat reports the rate as a tenth
// of what it should be because all the samples haven't been filled out yet.
assertEquals(10, Math.floor(MetricCollectors.currentConsumptionRate()), 0);
}
@Test
public void shouldAggregateTotalMessageConsumptionAcrossAllConsumers() throws Exception {
ConsumerCollector collector1 = new ConsumerCollector();
collector1.configure(ImmutableMap.of(ConsumerConfig.CLIENT_ID_CONFIG, "client1"));
ConsumerCollector collector2 = new ConsumerCollector();
collector2.configure(ImmutableMap.of(ConsumerConfig.CLIENT_ID_CONFIG, "client2"));
Map<TopicPartition, List<ConsumerRecord<Object, Object>>> records = new HashMap<>();
List<ConsumerRecord<Object, Object>> recordList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
recordList.add(new ConsumerRecord<>(TEST_TOPIC, 1, 1, 1l, TimestampType
.CREATE_TIME, 1l, 10, 10,"key", "1234567890"));
}
records.put(new TopicPartition(TEST_TOPIC, 1), recordList);
ConsumerRecords<Object, Object> consumerRecords = new ConsumerRecords<>(records);
collector1.onConsume(consumerRecords);
collector2.onConsume(consumerRecords);
assertEquals(20, MetricCollectors.totalMessageConsumption(), 0);
}
@Test
public void shouldAggregateTotalBytesConsumptionAcrossAllConsumers() throws Exception {
ConsumerCollector collector1 = new ConsumerCollector();
collector1.configure(ImmutableMap.of(ConsumerConfig.CLIENT_ID_CONFIG, "client1"));
ConsumerCollector collector2 = new ConsumerCollector();
collector2.configure(ImmutableMap.of(ConsumerConfig.CLIENT_ID_CONFIG, "client2"));
Map<TopicPartition, List<ConsumerRecord<Object, Object>>> records = new HashMap<>();
List<ConsumerRecord<Object, Object>> recordList = new ArrayList<>();
int totalSz = 0;
for (int i = 0; i < 10; i++) {
recordList.add(new ConsumerRecord<>(TEST_TOPIC, 1, 1, 1l, TimestampType
.CREATE_TIME, 1l, 5 + i, 10 + i, "key", "1234567890"));
totalSz += 15 + 2 * i;
}
records.put(new TopicPartition(TEST_TOPIC, 1), recordList);
ConsumerRecords<Object, Object> consumerRecords = new ConsumerRecords<>(records);
collector1.onConsume(consumerRecords);
collector2.onConsume(consumerRecords);
assertEquals(2 * totalSz, MetricCollectors.totalBytesConsumption(), 0);
}
private ConsumerRecord<byte[], byte[]> newConsumerRecord(
String topic, int partition, int offset, Long timestamp, Headers headers) {
final Long checksum = 1234L;
final byte[] key = "test-key".getBytes(StandardCharsets.UTF_8);
final int serializedKeySize = key.length;
final byte[] value = "test-value".getBytes(StandardCharsets.UTF_8);
final int serializedValueSize = value.length;
return new ConsumerRecord<>(
topic,
partition,
offset,
timestamp,
TimestampType.CREATE_TIME,
checksum,
serializedKeySize,
serializedValueSize,
key,
value,
headers);
}
private ConsumerRecords<byte[], byte[]> createTestRecordsWithHeaders() {
RecordHeader header = new RecordHeader("testHeader", new byte[0]);
RecordHeaders headers = new RecordHeaders();
headers.add(header);
TimestampType timestampType = TimestampType.NO_TIMESTAMP_TYPE;
byte testByte = 0;
byte[] testKey = { testByte };
byte[] testValue = { testByte };
ConnectHeaders destinationHeaders = new ConnectHeaders();
destinationHeaders.add(header.key(), header.value(), Schema.OPTIONAL_BYTES_SCHEMA);
ConsumerRecord<byte[], byte[]> testConsumerRecord = new ConsumerRecord<byte[], byte[]>(FIRST_TOPIC, FIRST_PARTITION,
FIRST_OFFSET, System.currentTimeMillis(), timestampType, 0L, 0, 0, testKey, testValue, headers);
TopicPartition topicPartition = new TopicPartition(FIRST_TOPIC, FIRST_PARTITION);
List<ConsumerRecord<byte[], byte[]>> consumerRecords = new ArrayList<>();
consumerRecords.add(testConsumerRecord);
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> consumerRecordMap = new HashMap<>(1);
consumerRecordMap.put(topicPartition, consumerRecords);
ConsumerRecords<byte[], byte[]> testRecords = new ConsumerRecords<>(consumerRecordMap);
return testRecords;
}
/**
* Creates a record to be received from a specified topic and partition
*
* @param topic The topic this record is received from
* @param partition The partition of the topic this record is received from
* @param offset The offset of this record in the corresponding Kafka partition
* @param timestamp The timestamp of the record.
* @param timestampType The timestamp type
* @param checksum The checksum (CRC32) of the full record
* @param serializedKeySize The length of the serialized key
* @param serializedValueSize The length of the serialized value
* @param key The key of the record, if one exists (null is allowed)
* @param value The record contents
*/
public DBusConsumerRecord(String topic,
int partition,
long offset,
long timestamp,
TimestampType timestampType,
long checksum,
int serializedKeySize,
int serializedValueSize,
K key,
V value) {
if (topic == null)
throw new IllegalArgumentException("Topic cannot be null");
this.topic = topic;
this.partition = partition;
this.offset = offset;
this.timestamp = timestamp;
this.timestampType = timestampType;
this.checksum = checksum;
this.serializedKeySize = serializedKeySize;
this.serializedValueSize = serializedValueSize;
this.key = key;
this.value = value;
}
@Test
public void shouldNotChangeDurationBehindOnNoRecords() {
// given
final KafkaRecordsConsumer consumer = someKafkaRecordsConsumer(fromHorizon());
durationBehindHandler.onPartitionsAssigned(asList(new TopicPartition("", 0), new TopicPartition("", 1)));
ConsumerRecord<String,String> consumerRecord = new ConsumerRecord<>("", 0, 23, now().minusSeconds(100).toEpochMilli(), TimestampType.CREATE_TIME, 0, 0, 0, "key", "value");
consumer.apply(new ConsumerRecords<>(ImmutableMap.of(new TopicPartition("", 0), singletonList(consumerRecord))));
assertThat(getSecondsBehind("0"), is(100L));
assertThat(getSecondsBehind("1"), is(9223372036854775L));
// when
consumer.apply(ConsumerRecords.empty());
// then
assertThat(getSecondsBehind("0"), is(100L));
assertThat(getSecondsBehind("1"), is(9223372036854775L));
}
@Test
public void shouldDecodeCompoundKeys() {
final KafkaDecoder decoder = new KafkaDecoder();
final ConsumerRecord<String,String> record = new ConsumerRecord<>(
"ch01",
0,
42L,
1234L, TimestampType.CREATE_TIME,
-1L, -1, -1,
"key-1234",
null,
new RecordHeaders(asList(
new RecordHeader("_synapse_msg_partitionKey", "1234".getBytes(UTF_8)),
new RecordHeader("_synapse_msg_compactionKey", "key-1234".getBytes(UTF_8))
))
);
// when
final TextMessage decodedMessage = decoder.apply(record);
// then
assertThat(decodedMessage.getKey().isCompoundKey(), is(true));
assertThat(decodedMessage.getKey().compactionKey(), is("key-1234"));
assertThat(decodedMessage.getKey().partitionKey(), is("1234"));
}
@Test
public void shouldDecodeBrokenCompoundKeysAsMessageKey() {
final KafkaDecoder decoder = new KafkaDecoder();
final ConsumerRecord<String,String> record = new ConsumerRecord<>(
"ch01",
0,
42L,
1234L, TimestampType.CREATE_TIME,
-1L, -1, -1,
"record-key",
null,
new RecordHeaders(asList(
new RecordHeader("_synapse_msg_partitionKey", "1234".getBytes(UTF_8)),
new RecordHeader("_synapse_msg_compactionKey", "key-1234".getBytes(UTF_8))
))
);
// when
final TextMessage decodedMessage = decoder.apply(record);
// then
assertThat(decodedMessage.getKey().isCompoundKey(), is(false));
assertThat(decodedMessage.getKey().compactionKey(), is("record-key"));
}
@Test
public void test() {
final SinkRecord input = new SinkRecord(
"test",
1,
Schema.STRING_SCHEMA,
"key",
null,
"",
1234123L,
12341312L,
TimestampType.NO_TIMESTAMP_TYPE
);
final Long expectedTimestamp = 1537808219123L;
SetNull<SinkRecord> transform = new SetNull.Key<>();
final SinkRecord actual = transform.apply(input);
assertNull(actual.key(), "key should be null.");
assertNull(actual.keySchema(), "keySchema should be null.");
}
@Test
public void test() {
final SinkRecord input = new SinkRecord(
"test",
1,
null,
"",
null,
"",
1234123L,
12341312L,
TimestampType.NO_TIMESTAMP_TYPE
);
final Long expectedTimestamp = 1537808219123L;
TimestampNow<SinkRecord> transform = new TimestampNow<>();
transform.time = mock(Time.class);
when(transform.time.milliseconds()).thenReturn(expectedTimestamp);
final SinkRecord actual = transform.apply(input);
assertEquals(expectedTimestamp, actual.timestamp(), "Timestamp should match.");
verify(transform.time, times(1)).milliseconds();
}
public static SinkRecord delete(String topic, SchemaAndValue key) {
Preconditions.checkNotNull(topic, "topic cannot be null");
if (null == key) {
throw new DataException("key cannot be null.");
}
if (null == key.value()) {
throw new DataException("key cannot be null.");
}
return new SinkRecord(
topic,
PARTITION,
key.schema(),
key.value(),
null,
null,
OFFSET,
TIMESTAMP,
TimestampType.CREATE_TIME
);
}
public static SinkRecord write(String topic, SchemaAndValue key, SchemaAndValue value) {
Preconditions.checkNotNull(topic, "topic cannot be null");
Preconditions.checkNotNull(key, "key cannot be null.");
Preconditions.checkNotNull(key.value(), "key cannot be null.");
Preconditions.checkNotNull(value, "value cannot be null.");
Preconditions.checkNotNull(value.value(), "value cannot be null.");
return new SinkRecord(
topic,
PARTITION,
key.schema(),
key.value(),
value.schema(),
value.value(),
OFFSET,
TIMESTAMP,
TimestampType.CREATE_TIME
);
}
@Test
public void testConsumeWithHeader(TestContext ctx) {
MockConsumer<String, String> mock = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
KafkaReadStream<String, String> consumer = createConsumer(vertx, mock);
Async doneLatch = ctx.async();
consumer.handler(record -> {
ctx.assertEquals("the_topic", record.topic());
ctx.assertEquals(0, record.partition());
ctx.assertEquals("abc", record.key());
ctx.assertEquals("def", record.value());
Header[] headers = record.headers().toArray();
ctx.assertEquals(1, headers.length);
Header header = headers[0];
ctx.assertEquals("header_key", header.key());
ctx.assertEquals("header_value", new String(header.value()));
consumer.close(v -> doneLatch.complete());
});
consumer.subscribe(Collections.singleton("the_topic"), v -> {
mock.schedulePollTask(() -> {
mock.rebalance(Collections.singletonList(new TopicPartition("the_topic", 0)));
mock.addRecord(new ConsumerRecord<>("the_topic", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0L, 0, 0, "abc", "def",
new RecordHeaders(Collections.singletonList(new RecordHeader("header_key", "header_value".getBytes())))));
mock.seek(new TopicPartition("the_topic", 0), 0L);
});
});
}
static MapTestCase map() {
MapTestCase testCase = new MapTestCase();
testCase.map = ImmutableMap.of(
"firstName", "example",
"lastName", "user",
"email", "[email protected]",
"age", 27
);
testCase.record = new SinkRecord(
"testing",
1,
null,
null,
null,
testCase.map,
1L,
1484897702123L,
TimestampType.CREATE_TIME
);
return testCase;
}
protected List<Record> getKafkaConsumerRecords(ColumnMapping columnMapping, Instant now, String topicName,
int partitionStart, long offsetStart, Clock clock, TestMessage...testMessages) throws InvalidProtocolBufferException {
ProtoParser protoParser = new ProtoParser(StencilClientFactory.getClient(), TestMessage.class.getName());
ConsumerRecordConverter customConverter = new ConsumerRecordConverter(new RowMapper(columnMapping), protoParser, clock);
List<ConsumerRecord<byte[], byte[]>> consumerRecordsList = new ArrayList<ConsumerRecord<byte[], byte[]>>();
for (int i = 0; i < testMessages.length; i++) {
consumerRecordsList.add(new ConsumerRecord<>(topicName, partitionStart + i, offsetStart + i, now.getEpochSecond(), TimestampType.CREATE_TIME,
0, 0, 1, null, testMessages[i].toByteArray()));
}
return customConverter.convert(consumerRecordsList);
}
@Ignore
@Test
public void shouldPushTestNestedRepeatedMessages() throws InvalidProtocolBufferException {
Instant now = Instant.now();
long second = now.getEpochSecond();
ProtoParser protoParser = new ProtoParser(StencilClientFactory.getClient(), TestNestedRepeatedMessage.class.getName());
TestNestedRepeatedMessage protoMessage = TestNestedRepeatedMessage.newBuilder()
.addRepeatedMessage(ProtoUtil.generateTestMessage(now))
.addRepeatedMessage(ProtoUtil.generateTestMessage(now))
.build();
TableId tableId = TableId.of("bqsinktest", "nested_messages");
BqSink bqSink = new BqSink(authenticatedBQ(), tableId, new BQResponseParser(), gcsSinkHandler, bqRow);
ColumnMapping columnMapping = new ColumnMapping();
ColumnMapping nested = new ColumnMapping();
nested.put("record_name", "messsages");
nested.put("1", "order_number");
nested.put("2", "order_url");
columnMapping.put("2", nested);
ConsumerRecordConverter customConverter = new ConsumerRecordConverter(new RowMapper(columnMapping), protoParser, clock);
ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>("topic", 1, 1, second, TimestampType.CREATE_TIME,
0, 0, 1, null, protoMessage.toByteArray());
List<Record> records = customConverter.convert(Collections.singleton(consumerRecord));
Status push = bqSink.push(new Records(records));
assertTrue(push.isSuccess());
}
public ConsumerRecord<byte[], byte[]> createConsumerRecord(String orderNumber, String orderUrl, String orderDetails) {
TestKey key = TestKey.newBuilder()
.setOrderNumber(orderNumber)
.setOrderUrl(orderUrl)
.build();
TestMessage message = TestMessage.newBuilder()
.setOrderNumber(orderNumber)
.setOrderUrl(orderUrl)
.setOrderDetails(orderDetails)
.build();
return new ConsumerRecord<>(topic, partition, offset++, timestamp, TimestampType.CREATE_TIME, 0, 0, 0, key.toByteArray(), message.toByteArray());
}
private SinkRecord createSinkRecord(Schema keySchema, Object key, Schema valueSchema, Object value) {
return new SinkRecord(
TEST_TOPIC,
TEST_PARTITION,
keySchema,
key,
valueSchema,
value,
TEST_OFFSET,
TEST_TIMESTAMP,
TimestampType.NO_TIMESTAMP_TYPE
);
}