下面列出了org.apache.kafka.clients.producer.Producer#commitTransaction ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
void testTransactionalProducerCreation() {
assumeFalse(
System.getProperty("os.name").contains("Windows"),
"Transactional producers not supported on Windows"
);
ProducerFactory<String, String> producerFactory = transactionalProducerFactory(kafkaBroker, "xyz");
Producer<String, String> testProducer = producerFactory.createProducer();
testProducer.beginTransaction();
testProducer.commitTransaction();
assertFalse(testProducer.metrics().isEmpty());
cleanup(producerFactory, testProducer);
}
@Test
void testSendingMessagesUsingMultipleTransactionalProducers()
throws ExecutionException, InterruptedException {
ProducerFactory<String, String> producerFactory = transactionalProducerFactory(kafkaBroker, "xyz");
List<Producer<String, String>> testProducers = new ArrayList<>();
String testTopic = "testSendingMessagesUsingMultipleTransactionalProducers";
List<Future<RecordMetadata>> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Producer<String, String> producer = producerFactory.createProducer();
producer.beginTransaction();
results.add(send(producer, testTopic, "foo" + i));
producer.commitTransaction();
testProducers.add(producer);
}
assertOffsets(results);
cleanup(producerFactory, testProducers);
}
@Test
void testTransactionalProducerBehaviorOnSendingOffsetsWhenTransactionIsClosed() {
assumeFalse(
System.getProperty("os.name").contains("Windows"),
"Transactional producers not supported on Windows"
);
ProducerFactory<String, String> producerFactory = transactionalProducerFactory(kafkaBroker, "xyz");
Producer<String, String> testProducer = producerFactory.createProducer();
testProducer.beginTransaction();
testProducer.commitTransaction();
assertThrows(KafkaException.class, () -> testProducer.sendOffsetsToTransaction(Collections.emptyMap(), "foo"));
cleanup(producerFactory, testProducer);
}
@Test(timeout = 30000L)
public void testHappyPath() throws IOException {
String topicName = "flink-kafka-producer-happy-path";
Producer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
try {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
kafkaProducer.commitTransaction();
} finally {
kafkaProducer.close(Duration.ofSeconds(5));
}
assertRecord(topicName, "42", "42");
deleteTestTopic(topicName);
}
@Test(timeout = 30000L)
public void testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator() throws Exception {
String topic = "flink-kafka-producer-txn-coordinator-changed";
createTestTopic(topic, 1, 2);
Producer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
try {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
restartBroker(kafkaServer.getLeaderToShutDown("__transaction_state"));
kafkaProducer.flush();
kafkaProducer.commitTransaction();
} finally {
kafkaProducer.close(Duration.ofSeconds(5));
}
deleteTestTopic(topic);
}
private void tryCommit(Producer<?, ?> producer, MonitorCallback monitorCallback) {
try {
producer.commitTransaction();
monitorCallback.reportSuccess();
} catch (ProducerFencedException e) {
logger.warn("Unable to commit transaction", e);
monitorCallback.reportFailure(e);
throw new EventPublicationFailedException(
"Event publication failed, exception occurred while committing Kafka transaction", e
);
}
}
/**
* 在一个事务只有生产消息操作
*/
public static void onlyProduceInTransaction() {
Producer producer = buildProducer();
// 1.初始化事务
producer.initTransactions();
// 2.开启事务
producer.beginTransaction();
try {
// 3.kafka写操作集合
// 3.1 do业务逻辑
// 3.2 发送消息
producer.send(new ProducerRecord<String, String>("test", "transaction-data-1"));
producer.send(new ProducerRecord<String, String>("test", "transaction-data-2"));
// 3.3 do其他业务逻辑,还可以发送其他topic的消息。
// 4.事务提交
producer.commitTransaction();
} catch (Exception e) {
// 5.放弃事务
producer.abortTransaction();
}
}
@Override
public void send(MQProperties.CanalDestination canalDestination, Message message, Callback callback) {
// 开启事务,需要kafka版本支持
Producer producerTmp;
if (!kafkaProperties.getFlatMessage()) {
producerTmp = producer;
} else {
producerTmp = producer2;
}
if (kafkaProperties.getTransaction()) {
producerTmp.beginTransaction();
}
try {
if (!StringUtils.isEmpty(canalDestination.getDynamicTopic())) {
// 动态topic
Map<String, Message> messageMap = MQMessageUtils.messageTopics(message,
canalDestination.getTopic(),
canalDestination.getDynamicTopic());
for (Map.Entry<String, Message> entry : messageMap.entrySet()) {
String topicName = entry.getKey().replace('.', '_');
Message messageSub = entry.getValue();
if (logger.isDebugEnabled()) {
logger.debug("## Send message to kafka topic: " + topicName);
}
send(canalDestination, topicName, messageSub);
}
} else {
send(canalDestination, canalDestination.getTopic(), message);
}
if (kafkaProperties.getTransaction()) {
producerTmp.commitTransaction();
}
callback.commit();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
if (kafkaProperties.getTransaction()) {
producerTmp.abortTransaction();
}
callback.rollback();
}
}
/**
* 在一个事务内,即有生产消息又有消费消息
*/
public static void consumeTransferProduce() {
// 1.构建上产者
Producer producer = buildProducer();
// 2.初始化事务(生成productId),对于一个生产者,只能执行一次初始化事务操作
producer.initTransactions();
// 3.构建消费者和订阅主题
Consumer consumer = buildConsumer();
consumer.subscribe(Arrays.asList("test"));
while (true) {
// 4.开启事务
producer.beginTransaction();
// 5.1 接受消息
ConsumerRecords<String, String> records = consumer.poll(500);
try {
// 5.2 do业务逻辑;
System.out.println("customer Message---");
Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
// 5.2.1 读取消息,并处理消息。print the offset,key and value for the consumer
// records.
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(),
record.value());
// 5.2.2 记录提交的偏移量
commits.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset()));
// 6.生产新的消息。比如外卖订单状态的消息,如果订单成功,则需要发送跟商家结转消息或者派送员的提成消息
producer.send(new ProducerRecord<String, String>("test", "data2"));
}
// 7.提交偏移量
producer.sendOffsetsToTransaction(commits, "group0323");
// 8.事务提交
producer.commitTransaction();
} catch (Exception e) {
// 7.放弃事务
producer.abortTransaction();
}
}
}