org.apache.kafka.clients.producer.Producer#send ( )源码实例Demo

下面列出了org.apache.kafka.clients.producer.Producer#send ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: pulsar   文件: ProducerExample.java
public static void main(String[] args) {
    String topic = "persistent://public/default/test";

    Properties props = new Properties();
    props.put("bootstrap.servers", "pulsar://localhost:6650");
    props.put("key.serializer", IntegerSerializer.class.getName());
    props.put("value.serializer", StringSerializer.class.getName());

    Producer<Integer, String> producer = new KafkaProducer<>(props);

    for (int i = 0; i < 10; i++) {
        producer.send(new ProducerRecord<Integer, String>(topic, i, Integer.toString(i)));
        log.info("Message {} sent successfully", i);
    }

    producer.flush();
    producer.close();
}
 
源代码2 项目: components   文件: KafkaDatasetOtherDelimTestIT.java
@Before
public void init() throws TimeoutException {
    // there may exists other topics than these build in(configured in pom.xml) topics, but ignore them

    // ----------------- Send sample data to TOPIC_IN start --------------------
    String testID = "sampleTest" + new Random().nextInt();

    List<Person> expectedPersons = Person.genRandomList(testID, 10);

    Properties props = new Properties();
    props.put("bootstrap.servers", BOOTSTRAP_HOST);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<Void, String> producer = new KafkaProducer<>(props);
    for (Person person : expectedPersons) {
        ProducerRecord<Void, String> message = new ProducerRecord<>(TOPIC_IN, person.toCSV(fieldDelimiter));
        producer.send(message);
    }
    producer.close();
    // ----------------- Send sample data to TOPIC_IN end --------------------
}
 
源代码3 项目: javatech   文件: ProducerInTransaction.java
/**
 * 在一个事务只有生产消息操作
 */
public static void onlyProduceInTransaction() {
	Producer producer = buildProducer();

	// 1.初始化事务
	producer.initTransactions();

	// 2.开启事务
	producer.beginTransaction();

	try {
		// 3.kafka写操作集合
		// 3.1 do业务逻辑

		// 3.2 发送消息
		producer.send(new ProducerRecord<String, String>("test", "transaction-data-1"));
		producer.send(new ProducerRecord<String, String>("test", "transaction-data-2"));

		// 3.3 do其他业务逻辑,还可以发送其他topic的消息。

		// 4.事务提交
		producer.commitTransaction();
	} catch (Exception e) {
		// 5.放弃事务
		producer.abortTransaction();
	}
}
 
源代码4 项目: hawkular-alerts   文件: KafkaPlugin.java
protected void writeAlert(Action a) throws Exception {
    Properties props = initKafkaProperties(a.getProperties());

    Producer<String, String> producer = new KafkaProducer<>(props);
    String topic = a.getProperties().getOrDefault(PROP_TOPIC,
            HawkularProperties.getProperty(KAFKA_TOPIC, KAFKA_TOPIC_ENV, KAFKA_TOPIC_DEFAULT));

    producer.send(new ProducerRecord<>(topic, a.getActionId(), transform(a)));
    producer.close();
}
 
源代码5 项目: kbear   文件: ProducerTest.java
protected void produceMessagesWithoutCallback(Producer<String, String> producer, String topic)
        throws InterruptedException {
    List<Future<RecordMetadata>> futures = new ArrayList<>();
    for (int i = 0; i < messageCount; i++) {
        String v = String.valueOf(i);
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, v, v);
        Future<RecordMetadata> future = producer.send(record);
        futures.add(future);
        System.out.printf("\nrecord: %s, sent\n", record);

        if (sendInterval > 0)
            Thread.sleep(sendInterval);
    }

    long now = System.currentTimeMillis();
    futures.forEach(f -> {
        try {
            long timeout = _waitTimeout - (System.currentTimeMillis() - now);
            if (timeout <= 0)
                timeout = 1;

            RecordMetadata recordMetadata = f.get(timeout, TimeUnit.MILLISECONDS);
            Assert.assertNotNull(recordMetadata);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    });
}
 
源代码6 项目: java-kafka-client   文件: TracingKafkaTest.java
@Test
public void testConsumerBuilderWithoutDecorators() throws InterruptedException {
  Producer<Integer, String> producer = createTracingProducer();
  producer.send(new ProducerRecord<>("messages", 1, "test"));

  producer.close();

  assertEquals(1, mockTracer.finishedSpans().size());

  ExecutorService executorService = Executors.newSingleThreadExecutor();
  final CountDownLatch latch = new CountDownLatch(1);

  executorService.execute(() -> {
    Consumer<Integer, String> consumer = createConsumerWithDecorators(new ArrayList());

    while (latch.getCount() > 0) {
      ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(100));
      for (ConsumerRecord<Integer, String> record : records) {
        SpanContext spanContext = TracingKafkaUtils
            .extractSpanContext(record.headers(), mockTracer);
        assertNotNull(spanContext);
        assertEquals("test", record.value());
        assertEquals((Integer) 1, record.key());

        consumer.commitSync();
        latch.countDown();
      }
    }
    consumer.close();
  });

  assertTrue(latch.await(30, TimeUnit.SECONDS));

  List<MockSpan> mockSpans = mockTracer.finishedSpans();

  MockSpan span = mockSpans.get(1);
  assertEquals("consumer", span.tags().get("span.kind"));
}
 
源代码7 项目: netty-pubsub   文件: KafkaPublisherTest.java
public static void main(String[] args) {
	Properties properties = new Properties();
       properties.put("bootstrap.servers", "127.0.0.1:9092");
       properties.put("acks", "all");
       properties.put("retries", 0);
       properties.put("batch.size", 16384);
       properties.put("linger.ms", 1);
       properties.put("buffer.memory", 33554432);
       properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
       //Ĭ��ֵΪ�ַ������л�
       properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
       //�����Զ������л�
      // properties.put("value.serializer", "client.KafkaSerialization");
       Producer<String, String> producer = null;
       
       try {
           producer = new KafkaProducer<String, String>(properties);
           for (int i = 0; i < 100; i++) {
           	//TestMsg testMsg = new TestMsg(i, "hello".concat(i+""));
               String msg  = "Message " + i;
               producer.send(new ProducerRecord<String, String>("HelloWorld", msg));
               System.out.println("Sent:" + msg);
           }
       } catch (Exception e) {
           e.printStackTrace();

       } finally {
           producer.close();
       }
}
 
源代码8 项目: pulsar   文件: KafkaApiTest.java
@Test
public void testSimpleProducer() throws Exception {
    String topic = "testSimpleProducer";

    @Cleanup
    PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
    org.apache.pulsar.client.api.Consumer<byte[]> pulsarConsumer = pulsarClient.newConsumer().topic(topic)
            .subscriptionName("my-subscription")
            .subscribe();

    Properties props = new Properties();
    props.put("bootstrap.servers", getPlainTextServiceUrl());

    props.put("key.serializer", IntegerSerializer.class.getName());
    props.put("value.serializer", StringSerializer.class.getName());

    Producer<Integer, String> producer = new KafkaProducer<>(props);

    for (int i = 0; i < 10; i++) {
        producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i));
    }

    producer.flush();
    producer.close();

    for (int i = 0; i < 10; i++) {
        Message<byte[]> msg = pulsarConsumer.receive(1, TimeUnit.SECONDS);
        assertEquals(new String(msg.getData()), "hello-" + i);
        pulsarConsumer.acknowledge(msg);
    }
}
 
源代码9 项目: javatech   文件: ProducerDemo.java
public static void main(String[] args) {
	// 1. 指定生产者的配置
	Properties properties = new Properties();
	properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, HOST);
	properties.put(ProducerConfig.ACKS_CONFIG, "all");
	properties.put(ProducerConfig.RETRIES_CONFIG, 0);
	properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
	properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
	properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
	properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
		"org.apache.kafka.common.serialization.StringSerializer");
	properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
		"org.apache.kafka.common.serialization.StringSerializer");

	// 2. 使用配置初始化 Kafka 生产者
	Producer<String, String> producer = new KafkaProducer<>(properties);

	try {
		// 3. 使用 send 方法发送异步消息
		for (int i = 0; i < 100; i++) {
			String msg = "Message " + i;
			producer.send(new ProducerRecord<>("HelloWorld", msg));
			System.out.println("Sent:" + msg);
		}
	} catch (Exception e) {
		e.printStackTrace();
	} finally {
		// 4. 关闭生产者
		producer.close();
	}
}
 
源代码10 项目: DBus   文件: BoltCommandHandlerHelper.java
public static void onEncodeError(final DbusMessage dbusMessage, final MetaVersion version, EncodeColumn column, Exception e) {
    Producer<String, String> producer = null;
    try {
        logger.error("");
        producer = createProducer();
        TopicProvider provider = new DataOutputTopicProvider();
        List<String> topics = provider.provideTopics(version.getSchema(), version.getTable());
        String errorTopic = topics.get(0) + ".error";

        // 将脱敏失败的数据写入到kafka
        ProducerRecord<String, String> record = new ProducerRecord<>(errorTopic, buildKey(dbusMessage), dbusMessage.toString());
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                logger.error("Send message to 'error topic' error.{}", dbusMessage.toString(), exception);
            }
        });

        // 发邮件给管理员
        String content =
                "您好:\n" +
                        "  报警类型: 数据脱敏异常\n" +
                        "  数据源:" + Utils.getDatasource().getDsName() + "\n" +
                        "  数据库:" + version.getSchema() + "\n" +
                        "  表名: " + version.getTable() + "\n" +
                        "  脱敏列:" + column.getFieldName() + "\n" +
                        "  异常信息:" + e.getMessage() + "\n" +
                        "请及时处理.\n";
        writeEmailMessage("Dbus数据脱敏异常", content, version.getSchema(), producer);
    } catch (Exception e1) {
        logger.error("exception data process error.", e1);
    } finally {
        if (producer != null) producer.close();
    }
}
 
源代码11 项目: hawkular-alerts   文件: KafkaPluginTest.java
@Ignore
@Test
public void simpleProducer() throws Exception {
    Producer<String, String> producer = new KafkaProducer<>(producerProps);
    for(int i = 0; i < 100; i++) {
        producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
    }
    producer.close();
}
 
源代码12 项目: javabase   文件: AvroProducerExample.java
private static void sendRecords(Producer<String, byte[]> producer) throws IOException, InterruptedException {
    String topic = "avro-topic";
    int partition = 0;
    while (true) {
        for (int i = 1; i < 100; i++)
            producer.send(new ProducerRecord<String, byte[]>(topic, partition, Integer.toString(0), record(i + "")));

        Thread.sleep(1000);
    }
}
 
public static void main(String[] args) {
	Properties props = new Properties();
	props.put("bootstrap.servers", "192.168.99.100:9092");
	props.put("key.serializer", StringSerializer.class.getName());
	props.put("value.serializer", StringSerializer.class.getName());
	Producer<String, String> producer = new KafkaProducer<>(props);

	List<GeoLocation> geolocations = Arrays.asList(
			new GeoLocation(38.6270, 90.1994),
			new GeoLocation(93.9879, 76.9876), // invalid lat
			new GeoLocation(41.8034, -88.1440),
			new GeoLocation(40.9879, -200.9876), // invalid long
			new GeoLocation(-93.9879, 76.9876), // invalid lat
			new GeoLocation(9.5680, 77.9624),
			new GeoLocation(13.0827, 80.2707),
			new GeoLocation(40.9879, 200.9876), // invalid long
			new GeoLocation(9.9252, 78.1198));

	for(GeoLocation geolocation : geolocations) {
		System.out.println("Sending geolocaiton [" + geolocation.toString() + "]");
		ProducerRecord<String, String> record = new ProducerRecord<>(
				"geolocationJob", 
				geolocation.toString());
		producer.send(record);
	}

	producer.close();
}
 
源代码14 项目: ranger   文件: KafkaRangerAuthorizerTest.java
@Test
public void testUnauthorizedWrite() throws Exception {
    // Create the Producer
    Properties producerProps = new Properties();
    producerProps.put("bootstrap.servers", "localhost:" + port);
    producerProps.put("acks", "all");
    producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
    producerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
    producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, clientKeystorePath);
    producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "cspass");
    producerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "ckpass");
    producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
    producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
    
    final Producer<String, String> producer = new KafkaProducer<>(producerProps);
    
    // Send a message
    Future<RecordMetadata> record =
        producer.send(new ProducerRecord<String, String>("test", "somekey", "somevalue"));
    producer.flush();
    record.get();
    
    try {
        record = producer.send(new ProducerRecord<String, String>("dev", "somekey", "somevalue"));
        producer.flush();
        record.get();
    } catch (Exception ex) {
        Assert.assertTrue(ex.getMessage().contains("Not authorized to access topics"));
    }
    
    producer.close();
}
 
源代码15 项目: DBus   文件: BoltCommandHandlerHelper.java
public static void onBuildMessageError(String errId, MetaVersion version, Exception e) {
    Producer<String, String> producer = null;
    try {
        // 修改表的状态为:ABORT
        BoltCommandHandlerHelper.changeDataTableStatus(version.getSchema(), version.getTable(), DataTable.STATUS_ABORT);
        producer = createProducer();
        // 发送control message 通知appender所有线程reload,同步状态
        ControlMessage message = new ControlMessage(System.currentTimeMillis(), ControlType.APPENDER_RELOAD_CONFIG.name(), BoltCommandHandlerHelper.class.getName());
        ProducerRecord<String, String> record = new ProducerRecord<>(Utils.getDatasource().getControlTopic(), message.getType(), message.toJSONString());
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                logger.error("Send control message error.{}", message.toJSONString(), exception);
            }
        });

        // 发邮件
        ControlMessage gm = new ControlMessage(System.currentTimeMillis(), ControlType.COMMON_EMAIL_MESSAGE.toString(), BoltCommandHandlerHelper.class.getName());
        StringWriter sw = new StringWriter();
        PrintWriter writer = new PrintWriter(sw);
        e.printStackTrace(writer);

        gm.addPayload("subject", "DBus生成消息异常报警");
        gm.addPayload("contents", String.format("[%s]dbus-stream 生成dbus message失败:%s/%s/%s,原因:%s", errId, Utils.getDatasource().getDsName(), version.getSchema(), version.getTable(), sw));
        gm.addPayload("datasource_schema", Utils.getDatasource().getDsName() + "/" + version.getSchema());

        String topic = PropertiesHolder.getProperties(Constants.Properties.CONFIGURE, Constants.ConfigureKey.GLOBAL_EVENT_TOPIC);
        record = new ProducerRecord<>(topic, gm.getType(), gm.toJSONString());
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                logger.error("Send global event error.{}", exception.getMessage());
            }
        });
    } catch (Exception e1) {
        logger.error("exception data process error.", e1);
    } finally {
        if (producer != null) producer.close();
    }
}
 
源代码16 项目: pulsar   文件: ProducerAvroExample.java
public static void main(String[] args) {
    String topic = "persistent://public/default/test-avro";

    Properties props = new Properties();
    props.put("bootstrap.servers", "pulsar://localhost:6650");

    props.put("key.serializer", IntegerSerializer.class.getName());
    props.put("value.serializer", StringSerializer.class.getName());

    AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
    AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());

    Bar bar = new Bar();
    bar.setField1(true);

    Foo foo = new Foo();
    foo.setField1("field1");
    foo.setField2("field2");
    foo.setField3(3);


    Producer<Foo, Bar> producer = new KafkaProducer<>(props, fooSchema, barSchema);

    for (int i = 0; i < 10; i++) {
        producer.send(new ProducerRecord<Foo, Bar>(topic, i, foo, bar));
        log.info("Message {} sent successfully", i);
    }

    producer.flush();
    producer.close();
}
 
源代码17 项目: secor   文件: TestLogMessageProducer.java
public void run() {
    Properties properties = new Properties();
    if (mMetadataBrokerList == null || mMetadataBrokerList.isEmpty()) {
        properties.put("bootstrap.servers", "localhost:9092");
    } else {
        properties.put("bootstrap.severs", mMetadataBrokerList);
    }
    properties.put("value.serializer", ByteArraySerializer.class);
    properties.put("key.serializer", StringSerializer.class);
    properties.put("acks", "1");

    Producer<String, byte[]> producer = new KafkaProducer<>(properties);

    TProtocolFactory protocol = null;
    if(mType.equals("json")) {
        protocol = new TSimpleJSONProtocol.Factory();
    } else if (mType.equals("binary")) {
        protocol = new TBinaryProtocol.Factory();
    } else {
        throw new RuntimeException("Undefined message encoding type: " + mType);
    }

    TSerializer serializer = new TSerializer(protocol);
    for (int i = 0; i < mNumMessages; ++i) {
        long time = (System.currentTimeMillis() - mTimeshift * 1000L) * 1000000L + i;
        TestMessage testMessage = new TestMessage(time,
                                                  "some_value_" + i);
        if (i % 2 == 0) {
            testMessage.setEnumField(TestEnum.SOME_VALUE);
        } else {
            testMessage.setEnumField(TestEnum.SOME_OTHER_VALUE);
        }
        byte[] bytes;
        try {
            bytes = serializer.serialize(testMessage);
        } catch(TException e) {
            throw new RuntimeException("Failed to serialize message " + testMessage, e);
        }
        ProducerRecord<String, byte[]> data = new ProducerRecord<>(mTopic, Integer.toString(i), bytes);
        producer.send(data);
    }
    producer.close();
}
 
源代码18 项目: pulsar   文件: KafkaApiTest.java
@Test
public void testProducerConsumerMixedSchemaWithPulsarKafkaClient() throws Exception {
    String topic = "testProducerConsumerMixedSchemaWithPulsarKafkaClient";

    Schema<String> keySchema = new PulsarKafkaSchema<>(new StringSerializer(), new StringDeserializer());
    JSONSchema<Foo> valueSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());

    Properties props = new Properties();
    props.put("bootstrap.servers", getPlainTextServiceUrl());
    props.put("group.id", "my-subscription-name");
    props.put("enable.auto.commit", "false");
    props.put("key.serializer", IntegerSerializer.class.getName());
    props.put("value.serializer", StringSerializer.class.getName());
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());

    @Cleanup
    Consumer<String, Foo> consumer = new KafkaConsumer<>(props, keySchema, valueSchema);
    consumer.subscribe(Arrays.asList(topic));

    Producer<String, Foo> producer = new KafkaProducer<>(props, keySchema, valueSchema);

    for (int i = 0; i < 10; i++) {
        Foo foo = new Foo();
        foo.setField1("field1");
        foo.setField2("field2");
        foo.setField3(i);
        producer.send(new ProducerRecord<>(topic, "hello" + i, foo));
    }
    producer.flush();
    producer.close();

    AtomicInteger received = new AtomicInteger();
    while (received.get() < 10) {
        ConsumerRecords<String, Foo> records = consumer.poll(100);
        if (!records.isEmpty()) {
            records.forEach(record -> {
                String key = record.key();
                Assert.assertEquals(key, "hello" + received.get());
                Foo value = record.value();
                Assert.assertEquals(value.getField1(), "field1");
                Assert.assertEquals(value.getField2(), "field2");
                Assert.assertEquals(value.getField3(), received.get());
                received.incrementAndGet();
            });

            consumer.commitSync();
        }
    }
}
 
public static void main(String args[]) throws InterruptedException {

    Random r = new Random();
    long counter = 0;

    String brokers = System.getenv("BROKERS");
    String zookeepers = System.getenv("ZK");
    String schemaregistry = System.getenv("SCHEMA_REGISTRY");

    // Producer myProducer = getAvroProducer("192.168.99.100:9092", "http://192.168.99.100:8081");
    // String zookeepers = "192.168.99.100:2181";
    Producer myProducer = getStringAvroProducer(brokers, schemaregistry);
    String topicName = "SalesStringExample";
    Schema valueSchema = getSchema(VALUE_SCHEMA);
    KafkaTools.createTopic(zookeepers, topicName, 3, 1);

    while (true) {

      Long randomInt = new Long(r.nextInt(10));

      GenericRecord valueRecord = new GenericData.Record(valueSchema);
      valueRecord.put("itemID", randomInt);
      valueRecord.put("storeCode", "store-code-" + randomInt);
      valueRecord.put("count", randomInt);

      ProducerRecord newRecord = new ProducerRecord<String, GenericRecord>(topicName, "Key-" + randomInt, valueRecord);

      Integer dice = r.nextInt(1 + (int) counter % 100);
      if ((dice < 20) && (dice % 2 == 1)) {
        // noop
      } else {
        myProducer.send(newRecord);
      }
      myProducer.flush();
      Thread.sleep(randomInt);
      counter++;
      // Log out every 1K messages
      if (counter % 1000 == 0) {
        System.out.print(" . " + (counter / 1000) + "K");
      }

    }
  }
 
源代码20 项目: ranger   文件: KafkaRangerAuthorizerTest.java
@Test
public void testAuthorizedReadUsingTagPolicy() throws Exception {
    // Create the Producer
    Properties producerProps = new Properties();
    producerProps.put("bootstrap.servers", "localhost:" + port);
    producerProps.put("acks", "all");
    producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
    producerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
    producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, serviceKeystorePath);
    producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "sspass");
    producerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "skpass");
    producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
    producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");

    final Producer<String, String> producer = new KafkaProducer<>(producerProps);

    // Create the Consumer
    Properties consumerProps = new Properties();
    consumerProps.put("bootstrap.servers", "localhost:" + port);
    consumerProps.put("group.id", "test");
    consumerProps.put("enable.auto.commit", "true");
    consumerProps.put("auto.offset.reset", "earliest");
    consumerProps.put("auto.commit.interval.ms", "1000");
    consumerProps.put("session.timeout.ms", "30000");
    consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    consumerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
    consumerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
    consumerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, clientKeystorePath);
    consumerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "cspass");
    consumerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "ckpass");
    consumerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
    consumerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");

    final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
    consumer.subscribe(Arrays.asList("messages"));

    // Send a message
    producer.send(new ProducerRecord<String, String>("messages", "somekey", "somevalue"));
    producer.flush();

    // Poll until we consume it

    ConsumerRecord<String, String> record = null;
    for (int i = 0; i < 1000; i++) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        if (records.count() > 0) {
            record = records.iterator().next();
            break;
        }
        Thread.sleep(1000);
    }

    Assert.assertNotNull(record);
    Assert.assertEquals("somevalue", record.value());

    producer.close();
    consumer.close();
}