org.apache.kafka.clients.producer.Producer#beginTransaction ( )源码实例Demo

下面列出了org.apache.kafka.clients.producer.Producer#beginTransaction ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Test
void testTransactionalProducerCreation() {
    assumeFalse(
            System.getProperty("os.name").contains("Windows"),
            "Transactional producers not supported on Windows"
    );

    ProducerFactory<String, String> producerFactory = transactionalProducerFactory(kafkaBroker, "xyz");
    Producer<String, String> testProducer = producerFactory.createProducer();

    testProducer.beginTransaction();
    testProducer.commitTransaction();
    assertFalse(testProducer.metrics().isEmpty());

    cleanup(producerFactory, testProducer);
}
 
@Test
void testSendingMessagesUsingMultipleTransactionalProducers()
        throws ExecutionException, InterruptedException {
    ProducerFactory<String, String> producerFactory = transactionalProducerFactory(kafkaBroker, "xyz");
    List<Producer<String, String>> testProducers = new ArrayList<>();
    String testTopic = "testSendingMessagesUsingMultipleTransactionalProducers";

    List<Future<RecordMetadata>> results = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
        Producer<String, String> producer = producerFactory.createProducer();
        producer.beginTransaction();
        results.add(send(producer, testTopic, "foo" + i));
        producer.commitTransaction();
        testProducers.add(producer);
    }
    assertOffsets(results);

    cleanup(producerFactory, testProducers);
}
 
@Test
void testTransactionalProducerBehaviorOnCommittingAnAbortedTransaction() {
    assumeFalse(
            System.getProperty("os.name").contains("Windows"),
            "Transactional producers not supported on Windows"
    );

    ProducerFactory<String, String> producerFactory = transactionalProducerFactory(kafkaBroker, "xyz");
    Producer<String, String> testProducer = producerFactory.createProducer();

    try {
        testProducer.beginTransaction();
        send(testProducer, "testTransactionalProducerBehaviorOnCommittingAnAbortedTransaction", "bar");
        testProducer.abortTransaction();
        assertThrows(KafkaException.class, testProducer::commitTransaction);
    } finally {
        cleanup(producerFactory, testProducer);
    }
}
 
@Test
void testTransactionalProducerBehaviorOnSendingOffsetsWhenTransactionIsClosed() {
    assumeFalse(
            System.getProperty("os.name").contains("Windows"),
            "Transactional producers not supported on Windows"
    );

    ProducerFactory<String, String> producerFactory = transactionalProducerFactory(kafkaBroker, "xyz");
    Producer<String, String> testProducer = producerFactory.createProducer();

    testProducer.beginTransaction();
    testProducer.commitTransaction();
    assertThrows(KafkaException.class, () -> testProducer.sendOffsetsToTransaction(Collections.emptyMap(), "foo"));

    cleanup(producerFactory, testProducer);
}
 
源代码5 项目: flink   文件: FlinkKafkaInternalProducerITCase.java
@Test(timeout = 30000L)
public void testHappyPath() throws IOException {
	String topicName = "flink-kafka-producer-happy-path";

	Producer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
	try {
		kafkaProducer.initTransactions();
		kafkaProducer.beginTransaction();
		kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
		kafkaProducer.commitTransaction();
	} finally {
		kafkaProducer.close(Duration.ofSeconds(5));
	}
	assertRecord(topicName, "42", "42");
	deleteTestTopic(topicName);
}
 
源代码6 项目: flink   文件: FlinkKafkaInternalProducerITCase.java
@Test(timeout = 30000L)
public void testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator() throws Exception {
	String topic = "flink-kafka-producer-txn-coordinator-changed";
	createTestTopic(topic, 1, 2);
	Producer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
	try {
		kafkaProducer.initTransactions();
		kafkaProducer.beginTransaction();
		restartBroker(kafkaServer.getLeaderToShutDown("__transaction_state"));
		kafkaProducer.flush();
		kafkaProducer.commitTransaction();
	} finally {
		kafkaProducer.close(Duration.ofSeconds(5));
	}
	deleteTestTopic(topic);
}
 
源代码7 项目: extension-kafka   文件: KafkaPublisher.java
private void tryBeginTxn(Producer<?, ?> producer) {
    try {
        producer.beginTransaction();
    } catch (ProducerFencedException e) {
        logger.warn("Unable to begin transaction", e);
        throw new EventPublicationFailedException(
                "Event publication failed, exception occurred while starting Kafka transaction", e
        );
    }
}
 
源代码8 项目: javatech   文件: ProducerInTransaction.java
/**
 * 在一个事务只有生产消息操作
 */
public static void onlyProduceInTransaction() {
	Producer producer = buildProducer();

	// 1.初始化事务
	producer.initTransactions();

	// 2.开启事务
	producer.beginTransaction();

	try {
		// 3.kafka写操作集合
		// 3.1 do业务逻辑

		// 3.2 发送消息
		producer.send(new ProducerRecord<String, String>("test", "transaction-data-1"));
		producer.send(new ProducerRecord<String, String>("test", "transaction-data-2"));

		// 3.3 do其他业务逻辑,还可以发送其他topic的消息。

		// 4.事务提交
		producer.commitTransaction();
	} catch (Exception e) {
		// 5.放弃事务
		producer.abortTransaction();
	}
}
 
源代码9 项目: canal-1.1.3   文件: CanalKafkaProducer.java
@Override
public void send(MQProperties.CanalDestination canalDestination, Message message, Callback callback) {
    // 开启事务,需要kafka版本支持
    Producer producerTmp;
    if (!kafkaProperties.getFlatMessage()) {
        producerTmp = producer;
    } else {
        producerTmp = producer2;
    }

    if (kafkaProperties.getTransaction()) {
        producerTmp.beginTransaction();
    }
    try {
        if (!StringUtils.isEmpty(canalDestination.getDynamicTopic())) {
            // 动态topic
            Map<String, Message> messageMap = MQMessageUtils.messageTopics(message,
                canalDestination.getTopic(),
                canalDestination.getDynamicTopic());

            for (Map.Entry<String, Message> entry : messageMap.entrySet()) {
                String topicName = entry.getKey().replace('.', '_');
                Message messageSub = entry.getValue();
                if (logger.isDebugEnabled()) {
                    logger.debug("## Send message to kafka topic: " + topicName);
                }
                send(canalDestination, topicName, messageSub);
            }
        } else {
            send(canalDestination, canalDestination.getTopic(), message);
        }
        if (kafkaProperties.getTransaction()) {
            producerTmp.commitTransaction();
        }
        callback.commit();
    } catch (Throwable e) {
        logger.error(e.getMessage(), e);
        if (kafkaProperties.getTransaction()) {
            producerTmp.abortTransaction();
        }
        callback.rollback();
    }
}
 
源代码10 项目: javatech   文件: ProducerInTransaction.java
/**
 * 在一个事务内,即有生产消息又有消费消息
 */
public static void consumeTransferProduce() {
	// 1.构建上产者
	Producer producer = buildProducer();
	// 2.初始化事务(生成productId),对于一个生产者,只能执行一次初始化事务操作
	producer.initTransactions();

	// 3.构建消费者和订阅主题
	Consumer consumer = buildConsumer();
	consumer.subscribe(Arrays.asList("test"));
	while (true) {
		// 4.开启事务
		producer.beginTransaction();

		// 5.1 接受消息
		ConsumerRecords<String, String> records = consumer.poll(500);

		try {
			// 5.2 do业务逻辑;
			System.out.println("customer Message---");
			Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();
			for (ConsumerRecord<String, String> record : records) {
				// 5.2.1 读取消息,并处理消息。print the offset,key and value for the consumer
				// records.
				System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(),
					record.value());

				// 5.2.2 记录提交的偏移量
				commits.put(new TopicPartition(record.topic(), record.partition()),
					new OffsetAndMetadata(record.offset()));

				// 6.生产新的消息。比如外卖订单状态的消息,如果订单成功,则需要发送跟商家结转消息或者派送员的提成消息
				producer.send(new ProducerRecord<String, String>("test", "data2"));
			}

			// 7.提交偏移量
			producer.sendOffsetsToTransaction(commits, "group0323");

			// 8.事务提交
			producer.commitTransaction();
		} catch (Exception e) {
			// 7.放弃事务
			producer.abortTransaction();
		}
	}
}