org.apache.kafka.clients.producer.Producer#flush ( )源码实例Demo

下面列出了org.apache.kafka.clients.producer.Producer#flush ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: ranger   文件: KafkaRangerAuthorizerGSSTest.java
@Test
public void testAuthorizedIdempotentWrite() throws Exception {
    // Create the Producer
    Properties producerProps = new Properties();
    producerProps.put("bootstrap.servers", "localhost:" + port);
    producerProps.put("acks", "all");
    producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
    producerProps.put("sasl.mechanism", "GSSAPI");
    producerProps.put("sasl.kerberos.service.name", "kafka");
    producerProps.put("enable.idempotence", "true");

    final Producer<String, String> producer = new KafkaProducer<>(producerProps);

    // Send a message
    producer.send(new ProducerRecord<String, String>("test", "somekey", "somevalue"));
    producer.flush();
    producer.close();
}
 
private String produceMessage(String topicName, Object msg, Boolean storeSchemaInHeader) {
    String bootstrapServers = CLUSTER.bootstrapServers();
    Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    config.putAll(SCHEMA_REGISTRY_TEST_SERVER_CLIENT_WRAPPER.exportClientConf(true));
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
    config.put(KafkaAvroSerializer.STORE_SCHEMA_VERSION_ID_IN_HEADER, storeSchemaInHeader.toString());

    final Producer<String, Object> producer = new KafkaProducer<>(config);
    final Callback callback = new ProducerCallback();
    LOG.info("Sending message: [{}] to topic: [{}]", msg, topicName);
    ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topicName, getKey(msg), msg);
    producer.send(producerRecord, callback);
    producer.flush();
    LOG.info("Message successfully sent to topic: [{}]", topicName);
    producer.close(5, TimeUnit.SECONDS);

    return bootstrapServers;
}
 
源代码3 项目: beam   文件: KafkaCSVTableIT.java
@SuppressWarnings("FutureReturnValueIgnored")
private void produceSomeRecordsWithDelay(int num, int delayMilis) {
  Producer<String, String> producer = new KafkaProducer<String, String>(producerProps());
  String topicName = pipeline.getOptions().as(KafkaOptions.class).getKafkaTopic();
  for (int i = 0; i < num; i++) {
    producer.send(
        new ProducerRecord<String, String>(
            topicName, "k" + i, i + "," + ((i % 3) + 1) + "," + i));
    try {
      TimeUnit.MILLISECONDS.sleep(delayMilis);
    } catch (InterruptedException e) {
      throw new RuntimeException("Could not wait for producing", e);
    }
  }
  producer.flush();
  producer.close();
}
 
源代码4 项目: ranger   文件: KafkaRangerAuthorizerTest.java
@Test
public void testAuthorizedWrite() throws Exception {
    // Create the Producer
    Properties producerProps = new Properties();
    producerProps.put("bootstrap.servers", "localhost:" + port);
    producerProps.put("acks", "all");
    producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
    producerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
    producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, serviceKeystorePath);
    producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "sspass");
    producerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "skpass");
    producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
    producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
    
    final Producer<String, String> producer = new KafkaProducer<>(producerProps);
    // Send a message
    Future<RecordMetadata> record = 
        producer.send(new ProducerRecord<String, String>("dev", "somekey", "somevalue"));
    producer.flush();
    record.get();

    producer.close();
}
 
@Test
public void testCreatedConsumerValidConfigCanCommunicateToKafka() {
    String testTopic = "testCreatedConsumer_ValidConfig_CanCommunicateToKafka";

    Producer<String, String> testProducer = producerFactory.createProducer();
    testProducer.send(new ProducerRecord<>(testTopic, 0, null, null, "foo"));
    testProducer.flush();

    ConsumerFactory<?, ?> testSubject = new DefaultConsumerFactory<>(minimal(kafkaBroker));
    testConsumer = testSubject.createConsumer(DEFAULT_GROUP_ID);
    testConsumer.subscribe(Collections.singleton(testTopic));

    assertThat(KafkaTestUtils.getRecords(testConsumer).count()).isOne();
}
 
private static void publishRecordsOnPartitions(Producer<String, String> producer,
                                               String topic,
                                               int recordsPerPartitions,
                                               int partitionsPerTopic) {
    for (int i = 0; i < recordsPerPartitions; i++) {
        for (int p = 0; p < partitionsPerTopic; p++) {
            producer.send(buildRecord(topic, p));
        }
    }
    producer.flush();
}
 
private static void publishNewRecords(Producer<String, String> producer, String topic) {
    producer.send(buildRecord(topic, 0));
    producer.send(buildRecord(topic, 1));
    producer.send(buildRecord(topic, 2));
    producer.send(buildRecord(topic, 3));
    producer.flush();
}
 
源代码8 项目: pulsar   文件: KafkaApiTest.java
@Test
public void testSimpleProducer() throws Exception {
    String topic = "testSimpleProducer";

    @Cleanup
    PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
    org.apache.pulsar.client.api.Consumer<byte[]> pulsarConsumer = pulsarClient.newConsumer().topic(topic)
            .subscriptionName("my-subscription")
            .subscribe();

    Properties props = new Properties();
    props.put("bootstrap.servers", getPlainTextServiceUrl());

    props.put("key.serializer", IntegerSerializer.class.getName());
    props.put("value.serializer", StringSerializer.class.getName());

    Producer<Integer, String> producer = new KafkaProducer<>(props);

    for (int i = 0; i < 10; i++) {
        producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i));
    }

    producer.flush();
    producer.close();

    for (int i = 0; i < 10; i++) {
        Message<byte[]> msg = pulsarConsumer.receive(1, TimeUnit.SECONDS);
        assertEquals(new String(msg.getData()), "hello-" + i);
        pulsarConsumer.acknowledge(msg);
    }
}
 
源代码9 项目: siddhi-io-kafka   文件: KafkaMultiDCSink.java
@Override
public void disconnect() {
    for (Producer producer : producers) {
        producer.flush();
        producer.close();
    }
    producers.clear();
}
 
源代码10 项目: ranger   文件: KafkaRangerAuthorizerSASLSSLTest.java
@Test
public void testAuthorizedWrite() throws Exception {
    // Create the Producer
    Properties producerProps = new Properties();
    producerProps.put("bootstrap.servers", "localhost:" + port);
    producerProps.put("acks", "all");
    producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    producerProps.put("sasl.mechanism", "PLAIN");
    
    producerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
    producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, serviceKeystorePath);
    producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "sspass");
    producerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "skpass");
    producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
    producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
    
    final Producer<String, String> producer = new KafkaProducer<>(producerProps);
    
    // Send a message
    Future<RecordMetadata> record = 
        producer.send(new ProducerRecord<String, String>("dev", "somekey", "somevalue"));
    producer.flush();
    record.get();

    producer.close();
}
 
源代码11 项目: ignite   文件: TestKafkaBroker.java
/**
 * Sends a message to Kafka broker.
 *
 * @param records List of records.
 * @return Producer used to send the message.
 */
public void sendMessages(List<ProducerRecord<String, String>> records) {
    Producer<String, String> producer = new KafkaProducer<>(getProducerConfig());

    for (ProducerRecord<String, String> rec : records)
        producer.send(rec);

    producer.flush();
    producer.close();
}
 
源代码12 项目: pulsar   文件: ProducerAvroExample.java
public static void main(String[] args) {
    String topic = "persistent://public/default/test-avro";

    Properties props = new Properties();
    props.put("bootstrap.servers", "pulsar://localhost:6650");

    props.put("key.serializer", IntegerSerializer.class.getName());
    props.put("value.serializer", StringSerializer.class.getName());

    AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
    AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());

    Bar bar = new Bar();
    bar.setField1(true);

    Foo foo = new Foo();
    foo.setField1("field1");
    foo.setField2("field2");
    foo.setField3(3);


    Producer<Foo, Bar> producer = new KafkaProducer<>(props, fooSchema, barSchema);

    for (int i = 0; i < 10; i++) {
        producer.send(new ProducerRecord<Foo, Bar>(topic, i, foo, bar));
        log.info("Message {} sent successfully", i);
    }

    producer.flush();
    producer.close();
}
 
源代码13 项目: pulsar   文件: ProducerAvroExample.java
public static void main(String[] args) {
    String topic = "persistent://public/default/test-avro";

    Properties props = new Properties();
    props.put("bootstrap.servers", "pulsar://localhost:6650");

    props.put("key.serializer", IntegerSerializer.class.getName());
    props.put("value.serializer", StringSerializer.class.getName());

    AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
    AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());

    Bar bar = new Bar();
    bar.setField1(true);

    Foo foo = new Foo();
    foo.setField1("field1");
    foo.setField2("field2");
    foo.setField3(3);


    Producer<Foo, Bar> producer = new KafkaProducer<>(props, fooSchema, barSchema);

    for (int i = 0; i < 10; i++) {
        producer.send(new ProducerRecord<Foo, Bar>(topic, i, foo, bar));
        log.info("Message {} sent successfully", i);
    }

    producer.flush();
    producer.close();
}
 
源代码14 项目: ranger   文件: KafkaRangerAuthorizerSASLSSLTest.java
@Test
public void testAuthorizedRead() throws Exception {
    // Create the Producer
    Properties producerProps = new Properties();
    producerProps.put("bootstrap.servers", "localhost:" + port);
    producerProps.put("acks", "all");
    producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    producerProps.put("sasl.mechanism", "PLAIN");
    
    producerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
    producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, serviceKeystorePath);
    producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "sspass");
    producerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "skpass");
    producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
    producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
    
    final Producer<String, String> producer = new KafkaProducer<>(producerProps);
    
    // Create the Consumer
    Properties consumerProps = new Properties();
    consumerProps.put("bootstrap.servers", "localhost:" + port);
    consumerProps.put("group.id", "test");
    consumerProps.put("enable.auto.commit", "true");
    consumerProps.put("auto.offset.reset", "earliest");
    consumerProps.put("auto.commit.interval.ms", "1000");
    consumerProps.put("session.timeout.ms", "30000");
    consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    consumerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    consumerProps.put("sasl.mechanism", "PLAIN");
    
    consumerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
    consumerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, clientKeystorePath);
    consumerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "cspass");
    consumerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "ckpass");
    consumerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
    consumerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
    
    final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
    consumer.subscribe(Arrays.asList("test"));
    
    // Send a message
    producer.send(new ProducerRecord<String, String>("test", "somekey", "somevalue"));
    producer.flush();
    
    // Poll until we consume it
    
    ConsumerRecord<String, String> record = null;
    for (int i = 0; i < 1000; i++) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        if (records.count() > 0) {
            record = records.iterator().next();
            break;
        }
        Thread.sleep(1000);
    }

    Assert.assertNotNull(record);
    Assert.assertEquals("somevalue", record.value());

    producer.close();
    consumer.close();
}
 
/**
 * testExtractor checks that the extractor code does the right thing. First it creates a topic, and sets up a source to point
 * to it. workUnits are generated from the source (only a single wU should be returned). Then it writes a record to this topic
 * and reads back from the extractor to verify the right record is returned. A second record is then written and read back
 * through the extractor to verify poll works as expected. Finally we test the commit api by forcing a commit and then starting
 * a new extractor to ensure we fetch data from after the commit. The commit is also verified in Kafka directly
 * @throws IOException
 * @throws InterruptedException
 * @throws DataRecordException
 */
@Test(timeOut = 10000)
public void testExtractor()
    throws IOException, InterruptedException, DataRecordException {
  final String topic = "testSimpleStreamingExtractor";
  _kafkaTestHelper.provisionTopic(topic);

  Properties props = new Properties();
  props.put("bootstrap.servers", "localhost:" + _kafkaTestHelper.getKafkaServerPort());
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
  Producer<String, byte[]> producer = new KafkaProducer<>(props);

  final byte [] record_1 = {0, 1, 3};
  final byte [] record_2 = {2, 4, 6};
  final byte [] record_3 = {5, 7, 9};

  // Write a sample record to the topic
  producer.send(new ProducerRecord<String, byte[]>(topic, topic, record_1));
  producer.flush();

  KafkaSimpleStreamingExtractor<String, byte[]> kSSE = getStreamingExtractor(topic);

  TopicPartition tP = new TopicPartition(topic, 0);
  KafkaSimpleStreamingExtractor.KafkaWatermark kwm =
      new KafkaSimpleStreamingExtractor.KafkaWatermark(tP, new LongWatermark(0));
  byte [] reuse = new byte[1];
  RecordEnvelope<byte[]> oldRecord = new RecordEnvelope<>(reuse, kwm);

  Map<String, CheckpointableWatermark> committedWatermarks = new HashMap<>();

  WatermarkStorage mockWatermarkStorage = mock(WatermarkStorage.class);
  when(mockWatermarkStorage.getCommittedWatermarks(any(Class.class), any(Iterable.class)))
      .thenReturn(committedWatermarks);

  kSSE.start(mockWatermarkStorage);


  // read and verify the record matches we just wrote
  RecordEnvelope<byte[]> record = kSSE.readRecordEnvelope();
  Assert.assertEquals(record.getRecord(), record_1);

  // write a second record.
  producer.send(new ProducerRecord<String, byte[]>(topic, topic, record_2));
  producer.flush();

  // read the second record using same extractor to verify it matches whats expected
  record = kSSE.readRecordEnvelope();
  Assert.assertEquals(record.getRecord(), record_2);

  // Commit the watermark
  committedWatermarks.put(record.getWatermark().getSource(), record.getWatermark());

  // write a third record.
  producer.send(new ProducerRecord<String, byte[]>(topic, topic, record_3));
  producer.flush();


  // recreate extractor to force a seek.
  kSSE = getStreamingExtractor(topic);

  kSSE.start(mockWatermarkStorage);
  record = kSSE.readRecordEnvelope();

  // check it matches the data written
  Assert.assertEquals(record.getRecord(), record_3);
}
 
源代码16 项目: kafka-monitor   文件: KafkaConfigTest.java
public static void main(String[] args) {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "10.10.1.104:9093");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //“所有”设置将导致记录的完整提交阻塞,最慢的,但最持久的设置。
        //properties.put("acks", "all");
        //如果请求失败,生产者也会自动重试,即使设置成0 the producer can automatically retry.
        properties.put("retries", 1);
        //The producer maintains buffers of unsent records for each partition.
        properties.put("batch.size", 16384);
        //默认立即发送,这里这是延时毫秒数
        properties.put("linger.ms", 1);
        //生产者缓冲大小,当缓冲区耗尽后,额外的发送调用将被阻塞。时间超过max.block.ms将抛出TimeoutException
        properties.put("buffer.memory", 34432);

        System.out.println("start...");
        Producer<String, String> producer = new KafkaProducer<String, String>(properties);

        logger.debug("config {}", properties);





        while(true) {

            for (int i = 0; i < 1000; i++) {
                System.out.println("start......");
                producer.send(new ProducerRecord<String, String>("foo", Integer.toString(i), Integer.toString(i)));
                System.out.println("send " + i);

            }

            //producer.close();
            producer.flush();

        logger.debug("---------metrics{}", producer.metrics());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
 
@Test
void shouldManageProducerAndConsumerMetrics() {
    SimpleMeterRegistry registry = new SimpleMeterRegistry();

    assertThat(registry.getMeters()).hasSize(0);

    Properties producerConfigs = new Properties();
    producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            kafkaContainer.getBootstrapServers());
    Producer<String, String> producer = new KafkaProducer<>(
            producerConfigs, new StringSerializer(), new StringSerializer());

    KafkaClientMetrics producerKafkaMetrics = new KafkaClientMetrics(producer);
    producerKafkaMetrics.bindTo(registry);

    int producerMetrics = registry.getMeters().size();
    assertThat(registry.getMeters()).hasSizeGreaterThan(0);
    assertThat(registry.getMeters())
            .extracting(m -> m.getId().getTag("kafka-version"))
            .allMatch(v -> !v.isEmpty());

    Properties consumerConfigs = new Properties();
    consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            kafkaContainer.getBootstrapServers());
    consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    Consumer<String, String> consumer = new KafkaConsumer<>(
            consumerConfigs, new StringDeserializer(), new StringDeserializer());

    KafkaClientMetrics consumerKafkaMetrics = new KafkaClientMetrics(consumer);
    consumerKafkaMetrics.bindTo(registry);

    //Printing out for discovery purposes
    out.println("Meters from producer before sending:");
    printMeters(registry);

    int producerAndConsumerMetrics = registry.getMeters().size();
    assertThat(registry.getMeters()).hasSizeGreaterThan(producerMetrics);
    assertThat(registry.getMeters())
            .extracting(m -> m.getId().getTag("kafka-version"))
            .allMatch(v -> !v.isEmpty());

    String topic = "test";
    producer.send(new ProducerRecord<>(topic, "key", "value"));
    producer.flush();

    //Printing out for discovery purposes
    out.println("Meters from producer after sending and consumer before poll:");
    printMeters(registry);

    producerKafkaMetrics.checkAndBindMetrics(registry);

    int producerAndConsumerMetricsAfterSend = registry.getMeters().size();
    assertThat(registry.getMeters()).hasSizeGreaterThan(producerAndConsumerMetrics);
    assertThat(registry.getMeters())
            .extracting(m -> m.getId().getTag("kafka-version"))
            .allMatch(v -> !v.isEmpty());

    consumer.subscribe(Collections.singletonList(topic));

    consumer.poll(Duration.ofMillis(100));

    //Printing out for discovery purposes
    out.println("Meters from producer and consumer after polling:");
    printMeters(registry);

    consumerKafkaMetrics.checkAndBindMetrics(registry);

    assertThat(registry.getMeters()).hasSizeGreaterThan(producerAndConsumerMetricsAfterSend);
    assertThat(registry.getMeters())
            .extracting(m -> m.getId().getTag("kafka-version"))
            .allMatch(v -> !v.isEmpty());

    //Printing out for discovery purposes
    out.println("All meters from producer and consumer:");
    printMeters(registry);

    producerKafkaMetrics.close();
    consumerKafkaMetrics.close();
}
 
@Test void shouldRegisterMetricsFromDifferentClients() {
    SimpleMeterRegistry registry = new SimpleMeterRegistry();

    assertThat(registry.getMeters()).hasSize(0);

    Properties producer1Configs = new Properties();
    producer1Configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            kafkaContainer.getBootstrapServers());
    producer1Configs.put(ProducerConfig.CLIENT_ID_CONFIG, "producer1");
    Producer<String, String> producer1 = new KafkaProducer<>(
            producer1Configs, new StringSerializer(), new StringSerializer());

    KafkaClientMetrics producer1KafkaMetrics = new KafkaClientMetrics(producer1);
    producer1KafkaMetrics.bindTo(registry);

    int producer1Metrics = registry.getMeters().size();
    assertThat(producer1Metrics).isGreaterThan(0);

    producer1.send(new ProducerRecord<>("topic1", "foo"));
    producer1.flush();

    producer1KafkaMetrics.checkAndBindMetrics(registry);

    int producer1MetricsAfterSend = registry.getMeters().size();
    assertThat(producer1MetricsAfterSend).isGreaterThan(producer1Metrics);

    Properties producer2Configs = new Properties();
    producer2Configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            kafkaContainer.getBootstrapServers());
    producer2Configs.put(ProducerConfig.CLIENT_ID_CONFIG, "producer2");
    Producer<String, String> producer2 = new KafkaProducer<>(
            producer2Configs, new StringSerializer(), new StringSerializer());

    KafkaClientMetrics producer2KafkaMetrics = new KafkaClientMetrics(producer2);
    producer2KafkaMetrics.bindTo(registry);

    producer2.send(new ProducerRecord<>("topic1", "foo"));
    producer2.flush();

    producer2KafkaMetrics.checkAndBindMetrics(registry);

    int producer2MetricsAfterSend = registry.getMeters().size();
    assertThat(producer2MetricsAfterSend).isEqualTo(producer1MetricsAfterSend * 2);
}
 
源代码19 项目: ranger   文件: KafkaRangerAuthorizerTest.java
@Test
public void testAuthorizedRead() throws Exception {
    // Create the Producer
    Properties producerProps = new Properties();
    producerProps.put("bootstrap.servers", "localhost:" + port);
    producerProps.put("acks", "all");
    producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
    producerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
    producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, serviceKeystorePath);
    producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "sspass");
    producerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "skpass");
    producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
    producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
    
    final Producer<String, String> producer = new KafkaProducer<>(producerProps);
    
    // Create the Consumer
    Properties consumerProps = new Properties();
    consumerProps.put("bootstrap.servers", "localhost:" + port);
    consumerProps.put("group.id", "test");
    consumerProps.put("enable.auto.commit", "true");
    consumerProps.put("auto.offset.reset", "earliest");
    consumerProps.put("auto.commit.interval.ms", "1000");
    consumerProps.put("session.timeout.ms", "30000");
    consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    consumerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
    consumerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
    consumerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, clientKeystorePath);
    consumerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "cspass");
    consumerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "ckpass");
    consumerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
    consumerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
    
    final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
    consumer.subscribe(Arrays.asList("test"));
    
    // Send a message
    producer.send(new ProducerRecord<String, String>("test", "somekey", "somevalue"));
    producer.flush();
    
    // Poll until we consume it
    
    ConsumerRecord<String, String> record = null;
    for (int i = 0; i < 1000; i++) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        if (records.count() > 0) {
            record = records.iterator().next();
            break;
        }
        Thread.sleep(1000);
    }

    Assert.assertNotNull(record);
    Assert.assertEquals("somevalue", record.value());

    producer.close();
    consumer.close();
}
 
源代码20 项目: pulsar   文件: KafkaApiTest.java
@Test
public void testProducerConsumerMixedSchemaWithPulsarKafkaClient() throws Exception {
    String topic = "testProducerConsumerMixedSchemaWithPulsarKafkaClient";

    Schema<String> keySchema = new PulsarKafkaSchema<>(new StringSerializer(), new StringDeserializer());
    JSONSchema<Foo> valueSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());

    Properties props = new Properties();
    props.put("bootstrap.servers", getPlainTextServiceUrl());
    props.put("group.id", "my-subscription-name");
    props.put("enable.auto.commit", "false");
    props.put("key.serializer", IntegerSerializer.class.getName());
    props.put("value.serializer", StringSerializer.class.getName());
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());

    @Cleanup
    Consumer<String, Foo> consumer = new KafkaConsumer<>(props, keySchema, valueSchema);
    consumer.subscribe(Arrays.asList(topic));

    Producer<String, Foo> producer = new KafkaProducer<>(props, keySchema, valueSchema);

    for (int i = 0; i < 10; i++) {
        Foo foo = new Foo();
        foo.setField1("field1");
        foo.setField2("field2");
        foo.setField3(i);
        producer.send(new ProducerRecord<>(topic, "hello" + i, foo));
    }
    producer.flush();
    producer.close();

    AtomicInteger received = new AtomicInteger();
    while (received.get() < 10) {
        ConsumerRecords<String, Foo> records = consumer.poll(100);
        if (!records.isEmpty()) {
            records.forEach(record -> {
                String key = record.key();
                Assert.assertEquals(key, "hello" + received.get());
                Foo value = record.value();
                Assert.assertEquals(value.getField1(), "field1");
                Assert.assertEquals(value.getField2(), "field2");
                Assert.assertEquals(value.getField3(), received.get());
                received.incrementAndGet();
            });

            consumer.commitSync();
        }
    }
}