下面列出了org.apache.kafka.clients.producer.Producer#initTransactions ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test(timeout = 30000L)
public void testHappyPath() throws IOException {
String topicName = "flink-kafka-producer-happy-path";
Producer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
try {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
kafkaProducer.commitTransaction();
} finally {
kafkaProducer.close(Duration.ofSeconds(5));
}
assertRecord(topicName, "42", "42");
deleteTestTopic(topicName);
}
@Test(timeout = 30000L)
public void testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator() throws Exception {
String topic = "flink-kafka-producer-txn-coordinator-changed";
createTestTopic(topic, 1, 2);
Producer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
try {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
restartBroker(kafkaServer.getLeaderToShutDown("__transaction_state"));
kafkaProducer.flush();
kafkaProducer.commitTransaction();
} finally {
kafkaProducer.close(Duration.ofSeconds(5));
}
deleteTestTopic(topic);
}
private Producer<K, V> createTransactionalProducer() {
Producer<K, V> producer = this.cache.poll();
if (producer != null) {
return producer;
}
Map<String, Object> configs = new HashMap<>(this.configuration);
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
this.transactionIdPrefix + this.transactionIdSuffix.getAndIncrement());
producer = new PoolableProducer<>(createKafkaProducer(configs), cache, closeTimeout);
producer.initTransactions();
return producer;
}
/**
* 在一个事务只有生产消息操作
*/
public static void onlyProduceInTransaction() {
Producer producer = buildProducer();
// 1.初始化事务
producer.initTransactions();
// 2.开启事务
producer.beginTransaction();
try {
// 3.kafka写操作集合
// 3.1 do业务逻辑
// 3.2 发送消息
producer.send(new ProducerRecord<String, String>("test", "transaction-data-1"));
producer.send(new ProducerRecord<String, String>("test", "transaction-data-2"));
// 3.3 do其他业务逻辑,还可以发送其他topic的消息。
// 4.事务提交
producer.commitTransaction();
} catch (Exception e) {
// 5.放弃事务
producer.abortTransaction();
}
}
/**
* 在一个事务内,即有生产消息又有消费消息
*/
public static void consumeTransferProduce() {
// 1.构建上产者
Producer producer = buildProducer();
// 2.初始化事务(生成productId),对于一个生产者,只能执行一次初始化事务操作
producer.initTransactions();
// 3.构建消费者和订阅主题
Consumer consumer = buildConsumer();
consumer.subscribe(Arrays.asList("test"));
while (true) {
// 4.开启事务
producer.beginTransaction();
// 5.1 接受消息
ConsumerRecords<String, String> records = consumer.poll(500);
try {
// 5.2 do业务逻辑;
System.out.println("customer Message---");
Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
// 5.2.1 读取消息,并处理消息。print the offset,key and value for the consumer
// records.
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(),
record.value());
// 5.2.2 记录提交的偏移量
commits.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset()));
// 6.生产新的消息。比如外卖订单状态的消息,如果订单成功,则需要发送跟商家结转消息或者派送员的提成消息
producer.send(new ProducerRecord<String, String>("test", "data2"));
}
// 7.提交偏移量
producer.sendOffsetsToTransaction(commits, "group0323");
// 8.事务提交
producer.commitTransaction();
} catch (Exception e) {
// 7.放弃事务
producer.abortTransaction();
}
}
}