下面列出了怎么用org.apache.kafka.common.errors.ProducerFencedException的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testPoisonOnExceptionCreatingTransaction() {
final PoisonCountingLease lease = new PoisonCountingLease();
final FlowFile flowFile = Mockito.spy(new MockFlowFile(1L));
// Need a size grater than zero to make the lease reads the InputStream.
Mockito.when(flowFile.getSize()).thenReturn(1L);
doAnswer(new Answer<Object>() {
@Override
public Object answer(final InvocationOnMock invocationOnMock) {
throw new ProducerFencedException("Intenitional exception thrown from unit test");
}
}).when(producer).beginTransaction();
try {
lease.beginTransaction();
Assert.fail("Expected ProducerFencedException");
} catch (final ProducerFencedException pfe) {
// expected
}
assertEquals(1, lease.getPoisonCount());
}
@Override
protected void recoverAndCommit(KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
try (
FlinkKafkaProducer<byte[], byte[]> producer =
initTransactionalProducer(transaction.transactionalId, false)) {
producer.resumeTransaction(transaction.producerId, transaction.epoch);
producer.commitTransaction();
} catch (InvalidTxnStateException | ProducerFencedException ex) {
// That means we have committed this transaction before.
LOG.warn("Encountered error {} while recovering transaction {}. " +
"Presumably this transaction has been already committed before",
ex,
transaction);
}
}
}
@Test
public void testPublisherPoisonedIfFencedDuringTransactionCreation() {
runner.enqueue("hello world");
runner.enqueue("Hello World");
doAnswer(new Answer<Object>() {
@Override
public Object answer(final InvocationOnMock invocationOnMock) {
throw new ProducerFencedException("Intentional ProducedFencedException for unit test");
}
}).when(mockLease).beginTransaction();
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafka_2_0.REL_FAILURE, 2);
verify(mockLease, times(1)).poison();
verify(mockLease, times(1)).close();
}
@Override
protected void recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
try (
FlinkKafkaInternalProducer<byte[], byte[]> producer =
initTransactionalProducer(transaction.transactionalId, false)) {
producer.resumeTransaction(transaction.producerId, transaction.epoch);
producer.commitTransaction();
} catch (InvalidTxnStateException | ProducerFencedException ex) {
// That means we have committed this transaction before.
LOG.warn("Encountered error {} while recovering transaction {}. " +
"Presumably this transaction has been already committed before",
ex,
transaction);
}
}
}
@Override
protected void recoverAndCommit(KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
try (
FlinkKafkaProducer<byte[], byte[]> producer =
initTransactionalProducer(transaction.transactionalId, false)) {
producer.resumeTransaction(transaction.producerId, transaction.epoch);
producer.commitTransaction();
} catch (InvalidTxnStateException | ProducerFencedException ex) {
// That means we have committed this transaction before.
LOG.warn("Encountered error {} while recovering transaction {}. " +
"Presumably this transaction has been already committed before",
ex,
transaction);
}
}
}
@Test
public void testPoisonOnExceptionCreatingTransaction() {
final PoisonCountingLease lease = new PoisonCountingLease();
final FlowFile flowFile = Mockito.spy(new MockFlowFile(1L));
// Need a size grater than zero to make the lease reads the InputStream.
Mockito.when(flowFile.getSize()).thenReturn(1L);
doAnswer(new Answer<Object>() {
@Override
public Object answer(final InvocationOnMock invocationOnMock) {
throw new ProducerFencedException("Intenitional exception thrown from unit test");
}
}).when(producer).beginTransaction();
try {
lease.beginTransaction();
Assert.fail("Expected ProducerFencedException");
} catch (final ProducerFencedException pfe) {
// expected
}
assertEquals(1, lease.getPoisonCount());
}
@Test
public void testFailureWhenCreationgTransaction() {
runner.enqueue("John Doe, 48");
doAnswer(new Answer<Object>() {
@Override
public Object answer(final InvocationOnMock invocationOnMock) {
throw new ProducerFencedException("Intentional ProducedFencedException for unit test");
}
}).when(mockLease).beginTransaction();
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_FAILURE, 1);
verify(mockLease, times(1)).poison();
verify(mockLease, times(1)).close();
}
@Override
protected void recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
try {
producer =
initTransactionalProducer(transaction.transactionalId, false);
producer.resumeTransaction(transaction.producerId, transaction.epoch);
producer.commitTransaction();
} catch (InvalidTxnStateException | ProducerFencedException ex) {
// That means we have committed this transaction before.
LOG.warn("Encountered error {} while recovering transaction {}. " +
"Presumably this transaction has been already committed before",
ex,
transaction);
} finally {
if (producer != null) {
producer.close(0, TimeUnit.SECONDS);
}
}
}
}
@Override
protected void recoverAndCommit(KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
try (
FlinkKafkaProducer<byte[], byte[]> producer =
initTransactionalProducer(transaction.transactionalId, false)) {
producer.resumeTransaction(transaction.producerId, transaction.epoch);
producer.commitTransaction();
} catch (InvalidTxnStateException | ProducerFencedException ex) {
// That means we have committed this transaction before.
LOG.warn("Encountered error {} while recovering transaction {}. " +
"Presumably this transaction has been already committed before",
ex,
transaction);
}
}
}
private void closeIgnoringProducerFenced(AutoCloseable autoCloseable) throws Exception {
try {
autoCloseable.close();
}
catch (Exception ex) {
if (!(ex.getCause() instanceof ProducerFencedException)) {
throw ex;
}
}
}
private void closeIgnoringProducerFenced(AutoCloseable autoCloseable) throws Exception {
try {
autoCloseable.close();
}
catch (Exception ex) {
if (!(ex.getCause() instanceof ProducerFencedException)) {
throw ex;
}
}
}
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
producer.initTransactions();
producer.beginTransaction();
try {
//处理业务逻辑并创建ProducerRecord
ProducerRecord<String, String> record1 = new ProducerRecord<>(topic, "msg1");
producer.send(record1);
ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, "msg2");
producer.send(record2);
ProducerRecord<String, String> record3 = new ProducerRecord<>(topic, "msg3");
producer.send(record3);
//处理一些其它逻辑
producer.commitTransaction();
} catch (ProducerFencedException e) {
producer.abortTransaction();
}
producer.close();
}
@Override
public void beginTransaction() throws ProducerFencedException {
synchronized (producerClosingLock) {
ensureNotClosed();
kafkaProducer.beginTransaction();
}
}
@Override
public void commitTransaction() throws ProducerFencedException {
synchronized (producerClosingLock) {
ensureNotClosed();
kafkaProducer.commitTransaction();
}
}
@Override
public void abortTransaction() throws ProducerFencedException {
synchronized (producerClosingLock) {
ensureNotClosed();
kafkaProducer.abortTransaction();
}
}
@Override
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
synchronized (producerClosingLock) {
ensureNotClosed();
kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
}
}
private void closeIgnoringProducerFenced(AutoCloseable autoCloseable) throws Exception {
try {
autoCloseable.close();
}
catch (Exception ex) {
if (!(ex.getCause() instanceof ProducerFencedException)) {
throw ex;
}
}
}
@Override
public void beginTransaction() throws ProducerFencedException {
synchronized (producerClosingLock) {
ensureNotClosed();
kafkaProducer.beginTransaction();
}
}
@Override
public void commitTransaction() throws ProducerFencedException {
synchronized (producerClosingLock) {
ensureNotClosed();
kafkaProducer.commitTransaction();
}
}
@Override
public void abortTransaction() throws ProducerFencedException {
synchronized (producerClosingLock) {
ensureNotClosed();
kafkaProducer.abortTransaction();
}
}
@Override
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
synchronized (producerClosingLock) {
ensureNotClosed();
kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
}
}
private void closeIgnoringProducerFenced(AutoCloseable autoCloseable) throws Exception {
try {
autoCloseable.close();
}
catch (Exception ex) {
if (!(ex.getCause() instanceof ProducerFencedException)) {
throw ex;
}
}
}
public static void main(String[] args) {
//初始化生产者和消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getConsumerProperties());
consumer.subscribe(Collections.singletonList("topic-source"));
KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerProperties());
//初始化事务
producer.initTransactions();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
if (!records.isEmpty()) {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
//开启事务
producer.beginTransaction();
try {
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
//do some logical processing.
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>("topic-sink", record.key(), record.value());
//消费-生产模型
producer.send(producerRecord);
}
long lastConsumedOffset = partitionRecords.
get(partitionRecords.size() - 1).offset();
offsets.put(partition, new OffsetAndMetadata(lastConsumedOffset + 1));
}
//提交消费位移
producer.sendOffsetsToTransaction(offsets, "groupId");
//提交事务
producer.commitTransaction();
} catch (ProducerFencedException e) {
//log the exception
//中止事务
producer.abortTransaction();
}
}
}
}
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
producer.initTransactions();
producer.beginTransaction();
try {
//处理业务逻辑并创建ProducerRecord
ProducerRecord<String, String> record1 = new ProducerRecord<>(topic, "msg1");
producer.send(record1);
ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, "msg2");
producer.send(record2);
ProducerRecord<String, String> record3 = new ProducerRecord<>(topic, "msg3");
producer.send(record3);
//处理一些其它逻辑
producer.commitTransaction();
} catch (ProducerFencedException e) {
producer.abortTransaction();
}
producer.close();
}
private void tryBeginTxn(Producer<?, ?> producer) {
try {
producer.beginTransaction();
} catch (ProducerFencedException e) {
logger.warn("Unable to begin transaction", e);
throw new EventPublicationFailedException(
"Event publication failed, exception occurred while starting Kafka transaction", e
);
}
}
private void tryCommit(Producer<?, ?> producer, MonitorCallback monitorCallback) {
try {
producer.commitTransaction();
monitorCallback.reportSuccess();
} catch (ProducerFencedException e) {
logger.warn("Unable to commit transaction", e);
monitorCallback.reportFailure(e);
throw new EventPublicationFailedException(
"Event publication failed, exception occurred while committing Kafka transaction", e
);
}
}
@SuppressWarnings("unchecked")
private static DefaultProducerFactory<String, byte[]> producerFactoryWithFencedExceptionOnBeginTransaction() {
DefaultProducerFactory<String, byte[]> producerFactory =
mock(DefaultProducerFactory.class, "FactoryForExceptionOnBeginTx");
Producer<String, byte[]> producer = mock(Producer.class, "ExceptionOnBeginTxMock");
when(producerFactory.confirmationMode()).thenReturn(ConfirmationMode.TRANSACTIONAL);
when(producerFactory.createProducer()).thenReturn(producer);
doThrow(ProducerFencedException.class).when(producer).beginTransaction();
return producerFactory;
}
@SuppressWarnings("unchecked")
private static DefaultProducerFactory<String, byte[]> producerFactoryWithFencedExceptionOnCommit() {
DefaultProducerFactory<String, byte[]> producerFactory = mock(DefaultProducerFactory.class);
Producer<String, byte[]> producer = mock(Producer.class, "ExceptionOnCommitTxMock");
when(producerFactory.confirmationMode()).thenReturn(ConfirmationMode.TRANSACTIONAL);
when(producerFactory.createProducer()).thenReturn(producer);
doThrow(ProducerFencedException.class).when(producer).commitTransaction();
return producerFactory;
}
@Override
public void beginTransaction() throws ProducerFencedException {
verifyOpen();
delegateLock.readLock().lock();
try {
delegate.beginTransaction();
} finally {
delegateLock.readLock().unlock();
}
}
@Override
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
verifyOpen();
delegateLock.readLock().lock();
try {
delegate.sendOffsetsToTransaction(offsets, consumerGroupId);
} finally {
delegateLock.readLock().unlock();
}
}