下面列出了org.apache.kafka.clients.producer.Producer#close ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test(timeout = 30000L)
public void testHappyPath() throws IOException {
String topicName = "flink-kafka-producer-happy-path";
Producer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
try {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
kafkaProducer.commitTransaction();
} finally {
kafkaProducer.close(Duration.ofSeconds(5));
}
assertRecord(topicName, "42", "42");
deleteTestTopic(topicName);
}
public static void runProducer(Properties properties, String topic) throws Exception {
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
System.out.printf("Running producer with serializer %s on topic %s\n", properties.getProperty("value.serializer"), topic);
Producer<String, SensorReading> producer = new KafkaProducer<>(properties);
for(int i = 0; i < Integer.MAX_VALUE; i++) {
producer.send(new ProducerRecord<>(topic, Integer.toString(i), randomReading()));
Thread.sleep(500);
}
producer.close();
}
protected void produce(String topic, String bootstrapServers, String message, Integer partition) {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put(KafkaConstants.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(KafkaConstants.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < NUM_MESSAGES; i++) {
ProducerRecord<String, String> record;
if (partition != null) {
record = new ProducerRecord<>(topic, partition, Integer.toString(0), message + i);
} else {
record = new ProducerRecord<>(topic, Integer.toString(0), message + i);
}
producer.send(record);
}
producer.close();
}
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "kafka0:19092");
props.put("acks", "all");
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", GenericSerializer.class.getName());
props.put("value.serializer.type", Item.class.getName());
props.put("partitioner.class", HashPartitioner.class.getName());
Producer<String, Item> producer = new KafkaProducer<String, Item>(props);
List<Item> items = readItem();
items.forEach((Item item) -> producer.send(new ProducerRecord<String, Item>("items", item.getItemName(), item)));
producer.close();
}
@Test
void testSendingMessagesUsingMultipleProducers() throws ExecutionException, InterruptedException {
ProducerFactory<String, String> producerFactory = producerFactory(kafkaBroker);
List<Producer<String, String>> testProducers = new ArrayList<>();
String testTopic = "testSendingMessagesUsingMultipleProducers";
List<Future<RecordMetadata>> results = new ArrayList<>();
// The reason we are looping 12 times is a bug we used to have where the (producerCacheSize + 2)-th send failed because the producer was closed.
// To avoid regression, we keep the test like this.
for (int i = 0; i < 12; i++) {
Producer<String, String> producer = producerFactory.createProducer();
results.add(send(producer, testTopic, "foo" + i));
producer.close();
testProducers.add(producer);
}
assertOffsets(results);
cleanup(producerFactory, testProducers);
}
@Test
public void testNotTracedProducer() throws Exception {
Producer<Integer, String> producer = createProducer();
// Send 1
producer.send(new ProducerRecord<>("messages", 1, "test"));
// Send 2
producer.send(new ProducerRecord<>("messages", 1, "test"),
(metadata, exception) -> assertEquals("messages", metadata.topic()));
final CountDownLatch latch = new CountDownLatch(2);
createConsumer(latch, 1, false, null);
producer.close();
List<MockSpan> mockSpans = mockTracer.finishedSpans();
assertEquals(2, mockSpans.size());
checkSpans(mockSpans);
assertNull(mockTracer.activeSpan());
}
@Test(timeout = 30000L)
public void testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator() throws Exception {
String topic = "flink-kafka-producer-txn-coordinator-changed";
createTestTopic(topic, 1, 2);
Producer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
try {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
restartBroker(kafkaServer.getLeaderToShutDown("__transaction_state"));
kafkaProducer.flush();
kafkaProducer.commitTransaction();
} finally {
kafkaProducer.close(Duration.ofSeconds(5));
}
deleteTestTopic(topic);
}
public static void sendControlMessage(ControlMessage controlMessage) {
Producer<String, String> producer = null;
try {
// 发邮件
String topic = PropertiesHolder.getProperties(Constants.Properties.CONFIGURE, Constants.ConfigureKey.GLOBAL_EVENT_TOPIC);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, controlMessage.getType(), controlMessage.toJSONString());
producer.send(record, (metadata, exception) -> {
if (exception != null) {
logger.error("Send global event error.{}", exception.getMessage());
}
});
} catch (Exception e1) {
logger.error("exception data process error.", e1);
} finally {
if (producer != null) producer.close();
}
}
private static void wrapProducer(Consumer<Producer<String, String>> consFunc, String bootstrapServers) {
Properties producerProperties = new Properties();
producerProperties.put("bootstrap.servers", bootstrapServers);
producerProperties.put("acks", "all");
producerProperties.put("retries", 0);
producerProperties.put("batch.size", 16384);
producerProperties.put("linger.ms", 1);
producerProperties.put("buffer.memory", 33554432);
producerProperties.put("auto.create.topics.enable", "true");
producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(producerProperties);
try {
consFunc.accept(producer);
} finally {
producer.close();
}
}
/**
* Send a message to the {@link KafkaCluster} on the given topic.
*/
public void sendMessage(String topic, String message) {
Optional<Entity> anyBrokerNodeInCluster = Iterables.tryFind(cluster.getCluster().getChildren(), Predicates.and(
Predicates.instanceOf(KafkaBroker.class),
EntityPredicates.attributeEqualTo(KafkaBroker.SERVICE_UP, true)));
if (anyBrokerNodeInCluster.isPresent()) {
KafkaBroker broker = (KafkaBroker)anyBrokerNodeInCluster.get();
Properties props = new Properties();
props.put("metadata.broker.list", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
props.put("bootstrap.servers", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
((KafkaZooKeeper)cluster.getZooKeeper()).createTopic(topic);
ProducerRecord<String, String> data = new ProducerRecord<>(topic, message);
producer.send(data);
producer.close();
} else {
throw new InvalidParameterException("No kafka broker node found");
}
}
private void floodTopic(String topicName, String key) {
Producer<String, String> producer = createProducer(kafkaConfig.getBootstrapServers());
for (int i = 0; i < 10; i++)
producer.send(new ProducerRecord<>(topicName, key, Integer.toString(i)));
producer.close();
}
private static void kafkaAsycnSendTest() {
Producer<String, String> producer = createProducer();
for (int i = 0; i <= 3; i++) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String testMsg = "this is producer send test msg,date:" + simpleDateFormat.format(new Date());
String key = "key_" + i;
String topic = "test";
if (i % 3 == 0) {
topic = "test1";
}
producer.send(new ProducerRecord<String, String>(topic, key, testMsg), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
System.out.println("find send exception:" + exception);
}
System.out.println(
"send to partition(" + metadata.partition() + ")," + "offset(" + metadata.offset() + ")");
}
});
try {
TimeUnit.SECONDS.sleep(1);
}
catch (InterruptedException e) {
e.printStackTrace();
}
i++;
}
System.out.println("send message over.");
producer.close(100, TimeUnit.MILLISECONDS);
}
public static void main(String[] args) {
String topic = "persistent://public/default/test-avro";
Properties props = new Properties();
props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
Bar bar = new Bar();
bar.setField1(true);
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(3);
Producer<Foo, Bar> producer = new KafkaProducer<>(props, fooSchema, barSchema);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<Foo, Bar>(topic, i, foo, bar));
log.info("Message {} sent successfully", i);
}
producer.flush();
producer.close();
}
public static void main(String[] args) {
// 1. 指定生产者的配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, HOST);
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, 0);
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// 2. 使用配置初始化 Kafka 生产者
Producer<String, String> producer = new KafkaProducer<>(properties);
try {
// 3. 使用 send 方法发送异步消息
for (int i = 0; i < 100; i++) {
String msg = "Message " + i;
producer.send(new ProducerRecord<>("HelloWorld", msg));
System.out.println("Sent:" + msg);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 4. 关闭生产者
producer.close();
}
}
@Test
public void testAuthorizedWrite() throws Exception {
// Create the Producer
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:" + port);
producerProps.put("acks", "all");
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
producerProps.put("sasl.mechanism", "PLAIN");
producerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, serviceKeystorePath);
producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "sspass");
producerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "skpass");
producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
final Producer<String, String> producer = new KafkaProducer<>(producerProps);
// Send a message
Future<RecordMetadata> record =
producer.send(new ProducerRecord<String, String>("dev", "somekey", "somevalue"));
producer.flush();
record.get();
producer.close();
}
@Test
public void with_parent() throws Exception {
Producer<Integer, String> producer = createTracingProducer();
final MockSpan parent = mockTracer.buildSpan("parent").start();
try (Scope ignored = mockTracer.activateSpan(parent)) {
producer.send(new ProducerRecord<>("messages", 1, "test"));
}
parent.finish();
final CountDownLatch latch = new CountDownLatch(1);
createConsumer(latch, 1, false, null);
producer.close();
List<MockSpan> mockSpans = mockTracer.finishedSpans();
assertEquals(3, mockSpans.size());
assertNotNull(parent);
for (MockSpan span : mockSpans) {
assertEquals(parent.context().traceId(), span.context().traceId());
}
MockSpan sendSpan = getByOperationName(mockSpans, TracingKafkaUtils.TO_PREFIX + "messages");
assertNotNull(sendSpan);
MockSpan receiveSpan = getByOperationName(mockSpans,
TracingKafkaUtils.FROM_PREFIX + "messages");
assertNotNull(receiveSpan);
assertEquals(sendSpan.context().spanId(), receiveSpan.parentId());
assertEquals(parent.context().spanId(), sendSpan.parentId());
assertNull(mockTracer.activeSpan());
}
@Test
public void test() {
Map<String, Object> senderProps = KafkaTestUtils
.producerProps(embeddedKafka.getEmbeddedKafka());
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-app");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, senderProps.get("bootstrap.servers"));
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
Producer<Integer, String> producer = createProducer();
ProducerRecord<Integer, String> record = new ProducerRecord<>("stream-test", 1, "test");
producer.send(record);
final Serde<String> stringSerde = Serdes.String();
final Serde<Integer> intSerde = Serdes.Integer();
StreamsBuilder builder = new StreamsBuilder();
KStream<Integer, String> kStream = builder.stream("stream-test");
kStream.map((key, value) -> new KeyValue<>(key, value + "map"))
.to("stream-out", Produced.with(intSerde, stringSerde));
KafkaStreams streams = new KafkaStreams(builder.build(), config,
new TracingKafkaClientSupplier(mockTracer));
streams.start();
await().atMost(15, TimeUnit.SECONDS).until(reportedSpansSize(), equalTo(3));
streams.close();
producer.close();
List<MockSpan> spans = mockTracer.finishedSpans();
assertEquals(3, spans.size());
checkSpans(spans);
assertNull(mockTracer.activeSpan());
}
@Override
public void executeApp() throws Exception {
Producer<Long, String> producer = SendRecord.createProducer();
ProducerRecord<Long, String> record =
new ProducerRecord<Long, String>("demo", "message");
producer.send(record).get();
producer.close();
consumer = createConsumer();
transactionMarker();
consumer.close();
}
@Test
public void testProducerConsumerMixedSchemaWithPulsarKafkaClient() throws Exception {
String topic = "testProducerConsumerMixedSchemaWithPulsarKafkaClient";
Schema<String> keySchema = new PulsarKafkaSchema<>(new StringSerializer(), new StringDeserializer());
JSONSchema<Foo> valueSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
Properties props = new Properties();
props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
@Cleanup
Consumer<String, Foo> consumer = new KafkaConsumer<>(props, keySchema, valueSchema);
consumer.subscribe(Arrays.asList(topic));
Producer<String, Foo> producer = new KafkaProducer<>(props, keySchema, valueSchema);
for (int i = 0; i < 10; i++) {
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(i);
producer.send(new ProducerRecord<>(topic, "hello" + i, foo));
}
producer.flush();
producer.close();
AtomicInteger received = new AtomicInteger();
while (received.get() < 10) {
ConsumerRecords<String, Foo> records = consumer.poll(100);
if (!records.isEmpty()) {
records.forEach(record -> {
String key = record.key();
Assert.assertEquals(key, "hello" + received.get());
Foo value = record.value();
Assert.assertEquals(value.getField1(), "field1");
Assert.assertEquals(value.getField2(), "field2");
Assert.assertEquals(value.getField3(), received.get());
received.incrementAndGet();
});
consumer.commitSync();
}
}
}
public void run() {
Properties properties = new Properties();
if (mMetadataBrokerList == null || mMetadataBrokerList.isEmpty()) {
properties.put("bootstrap.servers", "localhost:9092");
} else {
properties.put("bootstrap.severs", mMetadataBrokerList);
}
properties.put("value.serializer", ByteArraySerializer.class);
properties.put("key.serializer", StringSerializer.class);
properties.put("acks", "1");
Producer<String, byte[]> producer = new KafkaProducer<>(properties);
TProtocolFactory protocol = null;
if(mType.equals("json")) {
protocol = new TSimpleJSONProtocol.Factory();
} else if (mType.equals("binary")) {
protocol = new TBinaryProtocol.Factory();
} else {
throw new RuntimeException("Undefined message encoding type: " + mType);
}
TSerializer serializer = new TSerializer(protocol);
for (int i = 0; i < mNumMessages; ++i) {
long time = (System.currentTimeMillis() - mTimeshift * 1000L) * 1000000L + i;
TestMessage testMessage = new TestMessage(time,
"some_value_" + i);
if (i % 2 == 0) {
testMessage.setEnumField(TestEnum.SOME_VALUE);
} else {
testMessage.setEnumField(TestEnum.SOME_OTHER_VALUE);
}
byte[] bytes;
try {
bytes = serializer.serialize(testMessage);
} catch(TException e) {
throw new RuntimeException("Failed to serialize message " + testMessage, e);
}
ProducerRecord<String, byte[]> data = new ProducerRecord<>(mTopic, Integer.toString(i), bytes);
producer.send(data);
}
producer.close();
}