下面列出了org.apache.kafka.clients.producer.Producer#flush ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testAuthorizedIdempotentWrite() throws Exception {
// Create the Producer
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:" + port);
producerProps.put("acks", "all");
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
producerProps.put("sasl.mechanism", "GSSAPI");
producerProps.put("sasl.kerberos.service.name", "kafka");
producerProps.put("enable.idempotence", "true");
final Producer<String, String> producer = new KafkaProducer<>(producerProps);
// Send a message
producer.send(new ProducerRecord<String, String>("test", "somekey", "somevalue"));
producer.flush();
producer.close();
}
private String produceMessage(String topicName, Object msg, Boolean storeSchemaInHeader) {
String bootstrapServers = CLUSTER.bootstrapServers();
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.putAll(SCHEMA_REGISTRY_TEST_SERVER_CLIENT_WRAPPER.exportClientConf(true));
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
config.put(KafkaAvroSerializer.STORE_SCHEMA_VERSION_ID_IN_HEADER, storeSchemaInHeader.toString());
final Producer<String, Object> producer = new KafkaProducer<>(config);
final Callback callback = new ProducerCallback();
LOG.info("Sending message: [{}] to topic: [{}]", msg, topicName);
ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topicName, getKey(msg), msg);
producer.send(producerRecord, callback);
producer.flush();
LOG.info("Message successfully sent to topic: [{}]", topicName);
producer.close(5, TimeUnit.SECONDS);
return bootstrapServers;
}
@SuppressWarnings("FutureReturnValueIgnored")
private void produceSomeRecordsWithDelay(int num, int delayMilis) {
Producer<String, String> producer = new KafkaProducer<String, String>(producerProps());
String topicName = pipeline.getOptions().as(KafkaOptions.class).getKafkaTopic();
for (int i = 0; i < num; i++) {
producer.send(
new ProducerRecord<String, String>(
topicName, "k" + i, i + "," + ((i % 3) + 1) + "," + i));
try {
TimeUnit.MILLISECONDS.sleep(delayMilis);
} catch (InterruptedException e) {
throw new RuntimeException("Could not wait for producing", e);
}
}
producer.flush();
producer.close();
}
@Test
public void testAuthorizedWrite() throws Exception {
// Create the Producer
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:" + port);
producerProps.put("acks", "all");
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
producerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, serviceKeystorePath);
producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "sspass");
producerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "skpass");
producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
final Producer<String, String> producer = new KafkaProducer<>(producerProps);
// Send a message
Future<RecordMetadata> record =
producer.send(new ProducerRecord<String, String>("dev", "somekey", "somevalue"));
producer.flush();
record.get();
producer.close();
}
@Test
public void testCreatedConsumerValidConfigCanCommunicateToKafka() {
String testTopic = "testCreatedConsumer_ValidConfig_CanCommunicateToKafka";
Producer<String, String> testProducer = producerFactory.createProducer();
testProducer.send(new ProducerRecord<>(testTopic, 0, null, null, "foo"));
testProducer.flush();
ConsumerFactory<?, ?> testSubject = new DefaultConsumerFactory<>(minimal(kafkaBroker));
testConsumer = testSubject.createConsumer(DEFAULT_GROUP_ID);
testConsumer.subscribe(Collections.singleton(testTopic));
assertThat(KafkaTestUtils.getRecords(testConsumer).count()).isOne();
}
private static void publishRecordsOnPartitions(Producer<String, String> producer,
String topic,
int recordsPerPartitions,
int partitionsPerTopic) {
for (int i = 0; i < recordsPerPartitions; i++) {
for (int p = 0; p < partitionsPerTopic; p++) {
producer.send(buildRecord(topic, p));
}
}
producer.flush();
}
private static void publishNewRecords(Producer<String, String> producer, String topic) {
producer.send(buildRecord(topic, 0));
producer.send(buildRecord(topic, 1));
producer.send(buildRecord(topic, 2));
producer.send(buildRecord(topic, 3));
producer.flush();
}
@Test
public void testSimpleProducer() throws Exception {
String topic = "testSimpleProducer";
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Consumer<byte[]> pulsarConsumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName("my-subscription")
.subscribe();
Properties props = new Properties();
props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
Producer<Integer, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i));
}
producer.flush();
producer.close();
for (int i = 0; i < 10; i++) {
Message<byte[]> msg = pulsarConsumer.receive(1, TimeUnit.SECONDS);
assertEquals(new String(msg.getData()), "hello-" + i);
pulsarConsumer.acknowledge(msg);
}
}
@Override
public void disconnect() {
for (Producer producer : producers) {
producer.flush();
producer.close();
}
producers.clear();
}
@Test
public void testAuthorizedWrite() throws Exception {
// Create the Producer
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:" + port);
producerProps.put("acks", "all");
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
producerProps.put("sasl.mechanism", "PLAIN");
producerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, serviceKeystorePath);
producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "sspass");
producerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "skpass");
producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
final Producer<String, String> producer = new KafkaProducer<>(producerProps);
// Send a message
Future<RecordMetadata> record =
producer.send(new ProducerRecord<String, String>("dev", "somekey", "somevalue"));
producer.flush();
record.get();
producer.close();
}
/**
* Sends a message to Kafka broker.
*
* @param records List of records.
* @return Producer used to send the message.
*/
public void sendMessages(List<ProducerRecord<String, String>> records) {
Producer<String, String> producer = new KafkaProducer<>(getProducerConfig());
for (ProducerRecord<String, String> rec : records)
producer.send(rec);
producer.flush();
producer.close();
}
public static void main(String[] args) {
String topic = "persistent://public/default/test-avro";
Properties props = new Properties();
props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
Bar bar = new Bar();
bar.setField1(true);
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(3);
Producer<Foo, Bar> producer = new KafkaProducer<>(props, fooSchema, barSchema);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<Foo, Bar>(topic, i, foo, bar));
log.info("Message {} sent successfully", i);
}
producer.flush();
producer.close();
}
public static void main(String[] args) {
String topic = "persistent://public/default/test-avro";
Properties props = new Properties();
props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
Bar bar = new Bar();
bar.setField1(true);
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(3);
Producer<Foo, Bar> producer = new KafkaProducer<>(props, fooSchema, barSchema);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<Foo, Bar>(topic, i, foo, bar));
log.info("Message {} sent successfully", i);
}
producer.flush();
producer.close();
}
@Test
public void testAuthorizedRead() throws Exception {
// Create the Producer
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:" + port);
producerProps.put("acks", "all");
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
producerProps.put("sasl.mechanism", "PLAIN");
producerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, serviceKeystorePath);
producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "sspass");
producerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "skpass");
producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
final Producer<String, String> producer = new KafkaProducer<>(producerProps);
// Create the Consumer
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:" + port);
consumerProps.put("group.id", "test");
consumerProps.put("enable.auto.commit", "true");
consumerProps.put("auto.offset.reset", "earliest");
consumerProps.put("auto.commit.interval.ms", "1000");
consumerProps.put("session.timeout.ms", "30000");
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
consumerProps.put("sasl.mechanism", "PLAIN");
consumerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
consumerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, clientKeystorePath);
consumerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "cspass");
consumerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "ckpass");
consumerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
consumerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Arrays.asList("test"));
// Send a message
producer.send(new ProducerRecord<String, String>("test", "somekey", "somevalue"));
producer.flush();
// Poll until we consume it
ConsumerRecord<String, String> record = null;
for (int i = 0; i < 1000; i++) {
ConsumerRecords<String, String> records = consumer.poll(100);
if (records.count() > 0) {
record = records.iterator().next();
break;
}
Thread.sleep(1000);
}
Assert.assertNotNull(record);
Assert.assertEquals("somevalue", record.value());
producer.close();
consumer.close();
}
/**
* testExtractor checks that the extractor code does the right thing. First it creates a topic, and sets up a source to point
* to it. workUnits are generated from the source (only a single wU should be returned). Then it writes a record to this topic
* and reads back from the extractor to verify the right record is returned. A second record is then written and read back
* through the extractor to verify poll works as expected. Finally we test the commit api by forcing a commit and then starting
* a new extractor to ensure we fetch data from after the commit. The commit is also verified in Kafka directly
* @throws IOException
* @throws InterruptedException
* @throws DataRecordException
*/
@Test(timeOut = 10000)
public void testExtractor()
throws IOException, InterruptedException, DataRecordException {
final String topic = "testSimpleStreamingExtractor";
_kafkaTestHelper.provisionTopic(topic);
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:" + _kafkaTestHelper.getKafkaServerPort());
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
Producer<String, byte[]> producer = new KafkaProducer<>(props);
final byte [] record_1 = {0, 1, 3};
final byte [] record_2 = {2, 4, 6};
final byte [] record_3 = {5, 7, 9};
// Write a sample record to the topic
producer.send(new ProducerRecord<String, byte[]>(topic, topic, record_1));
producer.flush();
KafkaSimpleStreamingExtractor<String, byte[]> kSSE = getStreamingExtractor(topic);
TopicPartition tP = new TopicPartition(topic, 0);
KafkaSimpleStreamingExtractor.KafkaWatermark kwm =
new KafkaSimpleStreamingExtractor.KafkaWatermark(tP, new LongWatermark(0));
byte [] reuse = new byte[1];
RecordEnvelope<byte[]> oldRecord = new RecordEnvelope<>(reuse, kwm);
Map<String, CheckpointableWatermark> committedWatermarks = new HashMap<>();
WatermarkStorage mockWatermarkStorage = mock(WatermarkStorage.class);
when(mockWatermarkStorage.getCommittedWatermarks(any(Class.class), any(Iterable.class)))
.thenReturn(committedWatermarks);
kSSE.start(mockWatermarkStorage);
// read and verify the record matches we just wrote
RecordEnvelope<byte[]> record = kSSE.readRecordEnvelope();
Assert.assertEquals(record.getRecord(), record_1);
// write a second record.
producer.send(new ProducerRecord<String, byte[]>(topic, topic, record_2));
producer.flush();
// read the second record using same extractor to verify it matches whats expected
record = kSSE.readRecordEnvelope();
Assert.assertEquals(record.getRecord(), record_2);
// Commit the watermark
committedWatermarks.put(record.getWatermark().getSource(), record.getWatermark());
// write a third record.
producer.send(new ProducerRecord<String, byte[]>(topic, topic, record_3));
producer.flush();
// recreate extractor to force a seek.
kSSE = getStreamingExtractor(topic);
kSSE.start(mockWatermarkStorage);
record = kSSE.readRecordEnvelope();
// check it matches the data written
Assert.assertEquals(record.getRecord(), record_3);
}
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "10.10.1.104:9093");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//“所有”设置将导致记录的完整提交阻塞,最慢的,但最持久的设置。
//properties.put("acks", "all");
//如果请求失败,生产者也会自动重试,即使设置成0 the producer can automatically retry.
properties.put("retries", 1);
//The producer maintains buffers of unsent records for each partition.
properties.put("batch.size", 16384);
//默认立即发送,这里这是延时毫秒数
properties.put("linger.ms", 1);
//生产者缓冲大小,当缓冲区耗尽后,额外的发送调用将被阻塞。时间超过max.block.ms将抛出TimeoutException
properties.put("buffer.memory", 34432);
System.out.println("start...");
Producer<String, String> producer = new KafkaProducer<String, String>(properties);
logger.debug("config {}", properties);
while(true) {
for (int i = 0; i < 1000; i++) {
System.out.println("start......");
producer.send(new ProducerRecord<String, String>("foo", Integer.toString(i), Integer.toString(i)));
System.out.println("send " + i);
}
//producer.close();
producer.flush();
logger.debug("---------metrics{}", producer.metrics());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Test
void shouldManageProducerAndConsumerMetrics() {
SimpleMeterRegistry registry = new SimpleMeterRegistry();
assertThat(registry.getMeters()).hasSize(0);
Properties producerConfigs = new Properties();
producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers());
Producer<String, String> producer = new KafkaProducer<>(
producerConfigs, new StringSerializer(), new StringSerializer());
KafkaClientMetrics producerKafkaMetrics = new KafkaClientMetrics(producer);
producerKafkaMetrics.bindTo(registry);
int producerMetrics = registry.getMeters().size();
assertThat(registry.getMeters()).hasSizeGreaterThan(0);
assertThat(registry.getMeters())
.extracting(m -> m.getId().getTag("kafka-version"))
.allMatch(v -> !v.isEmpty());
Properties consumerConfigs = new Properties();
consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers());
consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
Consumer<String, String> consumer = new KafkaConsumer<>(
consumerConfigs, new StringDeserializer(), new StringDeserializer());
KafkaClientMetrics consumerKafkaMetrics = new KafkaClientMetrics(consumer);
consumerKafkaMetrics.bindTo(registry);
//Printing out for discovery purposes
out.println("Meters from producer before sending:");
printMeters(registry);
int producerAndConsumerMetrics = registry.getMeters().size();
assertThat(registry.getMeters()).hasSizeGreaterThan(producerMetrics);
assertThat(registry.getMeters())
.extracting(m -> m.getId().getTag("kafka-version"))
.allMatch(v -> !v.isEmpty());
String topic = "test";
producer.send(new ProducerRecord<>(topic, "key", "value"));
producer.flush();
//Printing out for discovery purposes
out.println("Meters from producer after sending and consumer before poll:");
printMeters(registry);
producerKafkaMetrics.checkAndBindMetrics(registry);
int producerAndConsumerMetricsAfterSend = registry.getMeters().size();
assertThat(registry.getMeters()).hasSizeGreaterThan(producerAndConsumerMetrics);
assertThat(registry.getMeters())
.extracting(m -> m.getId().getTag("kafka-version"))
.allMatch(v -> !v.isEmpty());
consumer.subscribe(Collections.singletonList(topic));
consumer.poll(Duration.ofMillis(100));
//Printing out for discovery purposes
out.println("Meters from producer and consumer after polling:");
printMeters(registry);
consumerKafkaMetrics.checkAndBindMetrics(registry);
assertThat(registry.getMeters()).hasSizeGreaterThan(producerAndConsumerMetricsAfterSend);
assertThat(registry.getMeters())
.extracting(m -> m.getId().getTag("kafka-version"))
.allMatch(v -> !v.isEmpty());
//Printing out for discovery purposes
out.println("All meters from producer and consumer:");
printMeters(registry);
producerKafkaMetrics.close();
consumerKafkaMetrics.close();
}
@Test void shouldRegisterMetricsFromDifferentClients() {
SimpleMeterRegistry registry = new SimpleMeterRegistry();
assertThat(registry.getMeters()).hasSize(0);
Properties producer1Configs = new Properties();
producer1Configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers());
producer1Configs.put(ProducerConfig.CLIENT_ID_CONFIG, "producer1");
Producer<String, String> producer1 = new KafkaProducer<>(
producer1Configs, new StringSerializer(), new StringSerializer());
KafkaClientMetrics producer1KafkaMetrics = new KafkaClientMetrics(producer1);
producer1KafkaMetrics.bindTo(registry);
int producer1Metrics = registry.getMeters().size();
assertThat(producer1Metrics).isGreaterThan(0);
producer1.send(new ProducerRecord<>("topic1", "foo"));
producer1.flush();
producer1KafkaMetrics.checkAndBindMetrics(registry);
int producer1MetricsAfterSend = registry.getMeters().size();
assertThat(producer1MetricsAfterSend).isGreaterThan(producer1Metrics);
Properties producer2Configs = new Properties();
producer2Configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers());
producer2Configs.put(ProducerConfig.CLIENT_ID_CONFIG, "producer2");
Producer<String, String> producer2 = new KafkaProducer<>(
producer2Configs, new StringSerializer(), new StringSerializer());
KafkaClientMetrics producer2KafkaMetrics = new KafkaClientMetrics(producer2);
producer2KafkaMetrics.bindTo(registry);
producer2.send(new ProducerRecord<>("topic1", "foo"));
producer2.flush();
producer2KafkaMetrics.checkAndBindMetrics(registry);
int producer2MetricsAfterSend = registry.getMeters().size();
assertThat(producer2MetricsAfterSend).isEqualTo(producer1MetricsAfterSend * 2);
}
@Test
public void testAuthorizedRead() throws Exception {
// Create the Producer
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:" + port);
producerProps.put("acks", "all");
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
producerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, serviceKeystorePath);
producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "sspass");
producerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "skpass");
producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
final Producer<String, String> producer = new KafkaProducer<>(producerProps);
// Create the Consumer
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:" + port);
consumerProps.put("group.id", "test");
consumerProps.put("enable.auto.commit", "true");
consumerProps.put("auto.offset.reset", "earliest");
consumerProps.put("auto.commit.interval.ms", "1000");
consumerProps.put("session.timeout.ms", "30000");
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
consumerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
consumerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, clientKeystorePath);
consumerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "cspass");
consumerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "ckpass");
consumerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
consumerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Arrays.asList("test"));
// Send a message
producer.send(new ProducerRecord<String, String>("test", "somekey", "somevalue"));
producer.flush();
// Poll until we consume it
ConsumerRecord<String, String> record = null;
for (int i = 0; i < 1000; i++) {
ConsumerRecords<String, String> records = consumer.poll(100);
if (records.count() > 0) {
record = records.iterator().next();
break;
}
Thread.sleep(1000);
}
Assert.assertNotNull(record);
Assert.assertEquals("somevalue", record.value());
producer.close();
consumer.close();
}
@Test
public void testProducerConsumerMixedSchemaWithPulsarKafkaClient() throws Exception {
String topic = "testProducerConsumerMixedSchemaWithPulsarKafkaClient";
Schema<String> keySchema = new PulsarKafkaSchema<>(new StringSerializer(), new StringDeserializer());
JSONSchema<Foo> valueSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
Properties props = new Properties();
props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
@Cleanup
Consumer<String, Foo> consumer = new KafkaConsumer<>(props, keySchema, valueSchema);
consumer.subscribe(Arrays.asList(topic));
Producer<String, Foo> producer = new KafkaProducer<>(props, keySchema, valueSchema);
for (int i = 0; i < 10; i++) {
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(i);
producer.send(new ProducerRecord<>(topic, "hello" + i, foo));
}
producer.flush();
producer.close();
AtomicInteger received = new AtomicInteger();
while (received.get() < 10) {
ConsumerRecords<String, Foo> records = consumer.poll(100);
if (!records.isEmpty()) {
records.forEach(record -> {
String key = record.key();
Assert.assertEquals(key, "hello" + received.get());
Foo value = record.value();
Assert.assertEquals(value.getField1(), "field1");
Assert.assertEquals(value.getField2(), "field2");
Assert.assertEquals(value.getField3(), received.get());
received.incrementAndGet();
});
consumer.commitSync();
}
}
}