org.apache.kafka.clients.producer.Producer#close ( )源码实例Demo

下面列出了org.apache.kafka.clients.producer.Producer#close ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: flink   文件: FlinkKafkaInternalProducerITCase.java
@Test(timeout = 30000L)
public void testHappyPath() throws IOException {
	String topicName = "flink-kafka-producer-happy-path";

	Producer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
	try {
		kafkaProducer.initTransactions();
		kafkaProducer.beginTransaction();
		kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
		kafkaProducer.commitTransaction();
	} finally {
		kafkaProducer.close(Duration.ofSeconds(5));
	}
	assertRecord(topicName, "42", "42");
	deleteTestTopic(topicName);
}
 
源代码2 项目: kafka-serializer-example   文件: Example.java
public static void runProducer(Properties properties, String topic) throws Exception {
    properties.put("acks", "all");
    properties.put("retries", 0);
    properties.put("batch.size", 16384);
    properties.put("linger.ms", 1);
    properties.put("buffer.memory", 33554432);
    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    System.out.printf("Running producer with serializer %s on topic %s\n", properties.getProperty("value.serializer"), topic);

    Producer<String, SensorReading> producer = new KafkaProducer<>(properties);

    for(int i = 0; i < Integer.MAX_VALUE; i++) {
        producer.send(new ProducerRecord<>(topic, Integer.toString(i), randomReading()));
        Thread.sleep(500);
    }

    producer.close();
}
 
源代码3 项目: datacollector   文件: KafkaNewConsumerITBase.java
protected void produce(String topic, String bootstrapServers, String message, Integer partition) {
  Properties props = new Properties();
  props.put("bootstrap.servers", bootstrapServers);
  props.put("acks", "all");
  props.put("retries", 0);
  props.put("batch.size", 16384);
  props.put("linger.ms", 1);
  props.put("buffer.memory", 33554432);
  props.put(KafkaConstants.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  props.put(KafkaConstants.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

  Producer<String, String> producer = new KafkaProducer<>(props);
  for(int i = 0; i < NUM_MESSAGES; i++) {
    ProducerRecord<String, String> record;
    if (partition != null) {
      record = new ProducerRecord<>(topic, partition, Integer.toString(0), message + i);
    } else {
      record = new ProducerRecord<>(topic, Integer.toString(0), message + i);
    }
    producer.send(record);
  }
  producer.close();

}
 
源代码4 项目: KafkaExample   文件: ItemProducer.java
public static void main(String[] args) throws Exception {
	Properties props = new Properties();
	props.put("bootstrap.servers", "kafka0:19092");
	props.put("acks", "all");
	props.put("retries", 3);
	props.put("batch.size", 16384);
	props.put("linger.ms", 1);
	props.put("buffer.memory", 33554432);
	props.put("key.serializer", StringSerializer.class.getName());
	props.put("value.serializer", GenericSerializer.class.getName());
	props.put("value.serializer.type", Item.class.getName());
	props.put("partitioner.class", HashPartitioner.class.getName());

	Producer<String, Item> producer = new KafkaProducer<String, Item>(props);
	List<Item> items = readItem();
	items.forEach((Item item) -> producer.send(new ProducerRecord<String, Item>("items", item.getItemName(), item)));
	producer.close();
}
 
@Test
void testSendingMessagesUsingMultipleProducers() throws ExecutionException, InterruptedException {
    ProducerFactory<String, String> producerFactory = producerFactory(kafkaBroker);
    List<Producer<String, String>> testProducers = new ArrayList<>();
    String testTopic = "testSendingMessagesUsingMultipleProducers";

    List<Future<RecordMetadata>> results = new ArrayList<>();
    // The reason we are looping 12 times is a bug we used to have where the (producerCacheSize + 2)-th send failed because the producer was closed.
    // To avoid regression, we keep the test like this.
    for (int i = 0; i < 12; i++) {
        Producer<String, String> producer = producerFactory.createProducer();
        results.add(send(producer, testTopic, "foo" + i));
        producer.close();
        testProducers.add(producer);
    }
    assertOffsets(results);

    cleanup(producerFactory, testProducers);
}
 
源代码6 项目: java-kafka-client   文件: TracingKafkaTest.java
@Test
public void testNotTracedProducer() throws Exception {
  Producer<Integer, String> producer = createProducer();

  // Send 1
  producer.send(new ProducerRecord<>("messages", 1, "test"));

  // Send 2
  producer.send(new ProducerRecord<>("messages", 1, "test"),
      (metadata, exception) -> assertEquals("messages", metadata.topic()));

  final CountDownLatch latch = new CountDownLatch(2);
  createConsumer(latch, 1, false, null);

  producer.close();

  List<MockSpan> mockSpans = mockTracer.finishedSpans();
  assertEquals(2, mockSpans.size());
  checkSpans(mockSpans);
  assertNull(mockTracer.activeSpan());
}
 
源代码7 项目: flink   文件: FlinkKafkaInternalProducerITCase.java
@Test(timeout = 30000L)
public void testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator() throws Exception {
	String topic = "flink-kafka-producer-txn-coordinator-changed";
	createTestTopic(topic, 1, 2);
	Producer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
	try {
		kafkaProducer.initTransactions();
		kafkaProducer.beginTransaction();
		restartBroker(kafkaServer.getLeaderToShutDown("__transaction_state"));
		kafkaProducer.flush();
		kafkaProducer.commitTransaction();
	} finally {
		kafkaProducer.close(Duration.ofSeconds(5));
	}
	deleteTestTopic(topic);
}
 
源代码8 项目: DBus   文件: ControlMessageUtils.java
public static void sendControlMessage(ControlMessage controlMessage) {
    Producer<String, String> producer = null;
    try {
        // 发邮件
        String topic = PropertiesHolder.getProperties(Constants.Properties.CONFIGURE, Constants.ConfigureKey.GLOBAL_EVENT_TOPIC);
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, controlMessage.getType(), controlMessage.toJSONString());
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                logger.error("Send global event error.{}", exception.getMessage());
            }
        });
    } catch (Exception e1) {
        logger.error("exception data process error.", e1);
    } finally {
        if (producer != null) producer.close();
    }
}
 
源代码9 项目: skywalking   文件: CaseController.java
private static void wrapProducer(Consumer<Producer<String, String>> consFunc, String bootstrapServers) {
    Properties producerProperties = new Properties();
    producerProperties.put("bootstrap.servers", bootstrapServers);
    producerProperties.put("acks", "all");
    producerProperties.put("retries", 0);
    producerProperties.put("batch.size", 16384);
    producerProperties.put("linger.ms", 1);
    producerProperties.put("buffer.memory", 33554432);
    producerProperties.put("auto.create.topics.enable", "true");
    producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(producerProperties);
    try {
        consFunc.accept(producer);
    } finally {
        producer.close();
    }
}
 
源代码10 项目: brooklyn-library   文件: KafkaSupport.java
/**
 * Send a message to the {@link KafkaCluster} on the given topic.
 */
public void sendMessage(String topic, String message) {
    Optional<Entity> anyBrokerNodeInCluster = Iterables.tryFind(cluster.getCluster().getChildren(), Predicates.and(
            Predicates.instanceOf(KafkaBroker.class),
            EntityPredicates.attributeEqualTo(KafkaBroker.SERVICE_UP, true)));
    if (anyBrokerNodeInCluster.isPresent()) {
        KafkaBroker broker = (KafkaBroker)anyBrokerNodeInCluster.get();

        Properties props = new Properties();

        props.put("metadata.broker.list", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
        props.put("bootstrap.servers", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        ((KafkaZooKeeper)cluster.getZooKeeper()).createTopic(topic);

        ProducerRecord<String, String> data = new ProducerRecord<>(topic, message);
        producer.send(data);
        producer.close();
    } else {
        throw new InvalidParameterException("No kafka broker node found");
    }
}
 
private void floodTopic(String topicName, String key) {
  Producer<String, String> producer = createProducer(kafkaConfig.getBootstrapServers());
  for (int i = 0; i < 10; i++)
    producer.send(new ProducerRecord<>(topicName, key, Integer.toString(i)));

  producer.close();
}
 
源代码12 项目: uavstack   文件: DoTestkafkaHookProxy.java
private static void kafkaAsycnSendTest() {
    Producer<String, String> producer = createProducer();
    for (int i = 0; i <= 3; i++) {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String testMsg = "this is producer send test msg,date:" + simpleDateFormat.format(new Date());
        String key = "key_" + i;
        String topic = "test";
        if (i % 3 == 0) {
            topic = "test1";
        }
        producer.send(new ProducerRecord<String, String>(topic, key, testMsg), new Callback() {

            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null) {
                    exception.printStackTrace();
                    System.out.println("find send exception:" + exception);
                }

                System.out.println(
                        "send to partition(" + metadata.partition() + ")," + "offset(" + metadata.offset() + ")");

            }
        });

        try {
            TimeUnit.SECONDS.sleep(1);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }

        i++;
    }
    System.out.println("send message over.");
    producer.close(100, TimeUnit.MILLISECONDS);
}
 
源代码13 项目: pulsar   文件: ProducerAvroExample.java
public static void main(String[] args) {
    String topic = "persistent://public/default/test-avro";

    Properties props = new Properties();
    props.put("bootstrap.servers", "pulsar://localhost:6650");

    props.put("key.serializer", IntegerSerializer.class.getName());
    props.put("value.serializer", StringSerializer.class.getName());

    AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
    AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());

    Bar bar = new Bar();
    bar.setField1(true);

    Foo foo = new Foo();
    foo.setField1("field1");
    foo.setField2("field2");
    foo.setField3(3);


    Producer<Foo, Bar> producer = new KafkaProducer<>(props, fooSchema, barSchema);

    for (int i = 0; i < 10; i++) {
        producer.send(new ProducerRecord<Foo, Bar>(topic, i, foo, bar));
        log.info("Message {} sent successfully", i);
    }

    producer.flush();
    producer.close();
}
 
源代码14 项目: javatech   文件: ProducerDemo.java
public static void main(String[] args) {
	// 1. 指定生产者的配置
	Properties properties = new Properties();
	properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, HOST);
	properties.put(ProducerConfig.ACKS_CONFIG, "all");
	properties.put(ProducerConfig.RETRIES_CONFIG, 0);
	properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
	properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
	properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
	properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
		"org.apache.kafka.common.serialization.StringSerializer");
	properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
		"org.apache.kafka.common.serialization.StringSerializer");

	// 2. 使用配置初始化 Kafka 生产者
	Producer<String, String> producer = new KafkaProducer<>(properties);

	try {
		// 3. 使用 send 方法发送异步消息
		for (int i = 0; i < 100; i++) {
			String msg = "Message " + i;
			producer.send(new ProducerRecord<>("HelloWorld", msg));
			System.out.println("Sent:" + msg);
		}
	} catch (Exception e) {
		e.printStackTrace();
	} finally {
		// 4. 关闭生产者
		producer.close();
	}
}
 
源代码15 项目: ranger   文件: KafkaRangerAuthorizerSASLSSLTest.java
@Test
public void testAuthorizedWrite() throws Exception {
    // Create the Producer
    Properties producerProps = new Properties();
    producerProps.put("bootstrap.servers", "localhost:" + port);
    producerProps.put("acks", "all");
    producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    producerProps.put("sasl.mechanism", "PLAIN");
    
    producerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
    producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, serviceKeystorePath);
    producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "sspass");
    producerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "skpass");
    producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
    producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
    
    final Producer<String, String> producer = new KafkaProducer<>(producerProps);
    
    // Send a message
    Future<RecordMetadata> record = 
        producer.send(new ProducerRecord<String, String>("dev", "somekey", "somevalue"));
    producer.flush();
    record.get();

    producer.close();
}
 
源代码16 项目: java-kafka-client   文件: TracingKafkaTest.java
@Test
public void with_parent() throws Exception {
  Producer<Integer, String> producer = createTracingProducer();

  final MockSpan parent = mockTracer.buildSpan("parent").start();
  try (Scope ignored = mockTracer.activateSpan(parent)) {
    producer.send(new ProducerRecord<>("messages", 1, "test"));
  }
  parent.finish();

  final CountDownLatch latch = new CountDownLatch(1);
  createConsumer(latch, 1, false, null);

  producer.close();

  List<MockSpan> mockSpans = mockTracer.finishedSpans();
  assertEquals(3, mockSpans.size());

  assertNotNull(parent);

  for (MockSpan span : mockSpans) {
    assertEquals(parent.context().traceId(), span.context().traceId());
  }

  MockSpan sendSpan = getByOperationName(mockSpans, TracingKafkaUtils.TO_PREFIX + "messages");
  assertNotNull(sendSpan);

  MockSpan receiveSpan = getByOperationName(mockSpans,
      TracingKafkaUtils.FROM_PREFIX + "messages");
  assertNotNull(receiveSpan);

  assertEquals(sendSpan.context().spanId(), receiveSpan.parentId());
  assertEquals(parent.context().spanId(), sendSpan.parentId());

  assertNull(mockTracer.activeSpan());
}
 
@Test
public void test() {
  Map<String, Object> senderProps = KafkaTestUtils
      .producerProps(embeddedKafka.getEmbeddedKafka());

  Properties config = new Properties();
  config.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-app");
  config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, senderProps.get("bootstrap.servers"));
  config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
  config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

  Producer<Integer, String> producer = createProducer();
  ProducerRecord<Integer, String> record = new ProducerRecord<>("stream-test", 1, "test");
  producer.send(record);

  final Serde<String> stringSerde = Serdes.String();
  final Serde<Integer> intSerde = Serdes.Integer();

  StreamsBuilder builder = new StreamsBuilder();
  KStream<Integer, String> kStream = builder.stream("stream-test");

  kStream.map((key, value) -> new KeyValue<>(key, value + "map"))
      .to("stream-out", Produced.with(intSerde, stringSerde));

  KafkaStreams streams = new KafkaStreams(builder.build(), config,
      new TracingKafkaClientSupplier(mockTracer));
  streams.start();

  await().atMost(15, TimeUnit.SECONDS).until(reportedSpansSize(), equalTo(3));

  streams.close();
  producer.close();

  List<MockSpan> spans = mockTracer.finishedSpans();
  assertEquals(3, spans.size());
  checkSpans(spans);

  assertNull(mockTracer.activeSpan());
}
 
源代码18 项目: glowroot   文件: KafkaPluginIT.java
@Override
public void executeApp() throws Exception {
    Producer<Long, String> producer = SendRecord.createProducer();
    ProducerRecord<Long, String> record =
            new ProducerRecord<Long, String>("demo", "message");
    producer.send(record).get();
    producer.close();

    consumer = createConsumer();
    transactionMarker();
    consumer.close();
}
 
源代码19 项目: pulsar   文件: KafkaApiTest.java
@Test
public void testProducerConsumerMixedSchemaWithPulsarKafkaClient() throws Exception {
    String topic = "testProducerConsumerMixedSchemaWithPulsarKafkaClient";

    Schema<String> keySchema = new PulsarKafkaSchema<>(new StringSerializer(), new StringDeserializer());
    JSONSchema<Foo> valueSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());

    Properties props = new Properties();
    props.put("bootstrap.servers", getPlainTextServiceUrl());
    props.put("group.id", "my-subscription-name");
    props.put("enable.auto.commit", "false");
    props.put("key.serializer", IntegerSerializer.class.getName());
    props.put("value.serializer", StringSerializer.class.getName());
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());

    @Cleanup
    Consumer<String, Foo> consumer = new KafkaConsumer<>(props, keySchema, valueSchema);
    consumer.subscribe(Arrays.asList(topic));

    Producer<String, Foo> producer = new KafkaProducer<>(props, keySchema, valueSchema);

    for (int i = 0; i < 10; i++) {
        Foo foo = new Foo();
        foo.setField1("field1");
        foo.setField2("field2");
        foo.setField3(i);
        producer.send(new ProducerRecord<>(topic, "hello" + i, foo));
    }
    producer.flush();
    producer.close();

    AtomicInteger received = new AtomicInteger();
    while (received.get() < 10) {
        ConsumerRecords<String, Foo> records = consumer.poll(100);
        if (!records.isEmpty()) {
            records.forEach(record -> {
                String key = record.key();
                Assert.assertEquals(key, "hello" + received.get());
                Foo value = record.value();
                Assert.assertEquals(value.getField1(), "field1");
                Assert.assertEquals(value.getField2(), "field2");
                Assert.assertEquals(value.getField3(), received.get());
                received.incrementAndGet();
            });

            consumer.commitSync();
        }
    }
}
 
源代码20 项目: secor   文件: TestLogMessageProducer.java
public void run() {
    Properties properties = new Properties();
    if (mMetadataBrokerList == null || mMetadataBrokerList.isEmpty()) {
        properties.put("bootstrap.servers", "localhost:9092");
    } else {
        properties.put("bootstrap.severs", mMetadataBrokerList);
    }
    properties.put("value.serializer", ByteArraySerializer.class);
    properties.put("key.serializer", StringSerializer.class);
    properties.put("acks", "1");

    Producer<String, byte[]> producer = new KafkaProducer<>(properties);

    TProtocolFactory protocol = null;
    if(mType.equals("json")) {
        protocol = new TSimpleJSONProtocol.Factory();
    } else if (mType.equals("binary")) {
        protocol = new TBinaryProtocol.Factory();
    } else {
        throw new RuntimeException("Undefined message encoding type: " + mType);
    }

    TSerializer serializer = new TSerializer(protocol);
    for (int i = 0; i < mNumMessages; ++i) {
        long time = (System.currentTimeMillis() - mTimeshift * 1000L) * 1000000L + i;
        TestMessage testMessage = new TestMessage(time,
                                                  "some_value_" + i);
        if (i % 2 == 0) {
            testMessage.setEnumField(TestEnum.SOME_VALUE);
        } else {
            testMessage.setEnumField(TestEnum.SOME_OTHER_VALUE);
        }
        byte[] bytes;
        try {
            bytes = serializer.serialize(testMessage);
        } catch(TException e) {
            throw new RuntimeException("Failed to serialize message " + testMessage, e);
        }
        ProducerRecord<String, byte[]> data = new ProducerRecord<>(mTopic, Integer.toString(i), bytes);
        producer.send(data);
    }
    producer.close();
}