org.apache.kafka.clients.producer.Producer#initTransactions ( )源码实例Demo

下面列出了org.apache.kafka.clients.producer.Producer#initTransactions ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: flink   文件: FlinkKafkaInternalProducerITCase.java
@Test(timeout = 30000L)
public void testHappyPath() throws IOException {
	String topicName = "flink-kafka-producer-happy-path";

	Producer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
	try {
		kafkaProducer.initTransactions();
		kafkaProducer.beginTransaction();
		kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
		kafkaProducer.commitTransaction();
	} finally {
		kafkaProducer.close(Duration.ofSeconds(5));
	}
	assertRecord(topicName, "42", "42");
	deleteTestTopic(topicName);
}
 
源代码2 项目: flink   文件: FlinkKafkaInternalProducerITCase.java
@Test(timeout = 30000L)
public void testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator() throws Exception {
	String topic = "flink-kafka-producer-txn-coordinator-changed";
	createTestTopic(topic, 1, 2);
	Producer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
	try {
		kafkaProducer.initTransactions();
		kafkaProducer.beginTransaction();
		restartBroker(kafkaServer.getLeaderToShutDown("__transaction_state"));
		kafkaProducer.flush();
		kafkaProducer.commitTransaction();
	} finally {
		kafkaProducer.close(Duration.ofSeconds(5));
	}
	deleteTestTopic(topic);
}
 
源代码3 项目: extension-kafka   文件: DefaultProducerFactory.java
private Producer<K, V> createTransactionalProducer() {
    Producer<K, V> producer = this.cache.poll();
    if (producer != null) {
        return producer;
    }
    Map<String, Object> configs = new HashMap<>(this.configuration);
    configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
                this.transactionIdPrefix + this.transactionIdSuffix.getAndIncrement());
    producer = new PoolableProducer<>(createKafkaProducer(configs), cache, closeTimeout);
    producer.initTransactions();
    return producer;
}
 
源代码4 项目: javatech   文件: ProducerInTransaction.java
/**
 * 在一个事务只有生产消息操作
 */
public static void onlyProduceInTransaction() {
	Producer producer = buildProducer();

	// 1.初始化事务
	producer.initTransactions();

	// 2.开启事务
	producer.beginTransaction();

	try {
		// 3.kafka写操作集合
		// 3.1 do业务逻辑

		// 3.2 发送消息
		producer.send(new ProducerRecord<String, String>("test", "transaction-data-1"));
		producer.send(new ProducerRecord<String, String>("test", "transaction-data-2"));

		// 3.3 do其他业务逻辑,还可以发送其他topic的消息。

		// 4.事务提交
		producer.commitTransaction();
	} catch (Exception e) {
		// 5.放弃事务
		producer.abortTransaction();
	}
}
 
源代码5 项目: javatech   文件: ProducerInTransaction.java
/**
 * 在一个事务内,即有生产消息又有消费消息
 */
public static void consumeTransferProduce() {
	// 1.构建上产者
	Producer producer = buildProducer();
	// 2.初始化事务(生成productId),对于一个生产者,只能执行一次初始化事务操作
	producer.initTransactions();

	// 3.构建消费者和订阅主题
	Consumer consumer = buildConsumer();
	consumer.subscribe(Arrays.asList("test"));
	while (true) {
		// 4.开启事务
		producer.beginTransaction();

		// 5.1 接受消息
		ConsumerRecords<String, String> records = consumer.poll(500);

		try {
			// 5.2 do业务逻辑;
			System.out.println("customer Message---");
			Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();
			for (ConsumerRecord<String, String> record : records) {
				// 5.2.1 读取消息,并处理消息。print the offset,key and value for the consumer
				// records.
				System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(),
					record.value());

				// 5.2.2 记录提交的偏移量
				commits.put(new TopicPartition(record.topic(), record.partition()),
					new OffsetAndMetadata(record.offset()));

				// 6.生产新的消息。比如外卖订单状态的消息,如果订单成功,则需要发送跟商家结转消息或者派送员的提成消息
				producer.send(new ProducerRecord<String, String>("test", "data2"));
			}

			// 7.提交偏移量
			producer.sendOffsetsToTransaction(commits, "group0323");

			// 8.事务提交
			producer.commitTransaction();
		} catch (Exception e) {
			// 7.放弃事务
			producer.abortTransaction();
		}
	}
}