类org.apache.kafka.common.errors.ProducerFencedException源码实例Demo

下面列出了怎么用org.apache.kafka.common.errors.ProducerFencedException的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: nifi   文件: TestPublisherLease.java
@Test
public void testPoisonOnExceptionCreatingTransaction() {
    final PoisonCountingLease lease = new PoisonCountingLease();

    final FlowFile flowFile = Mockito.spy(new MockFlowFile(1L));
    // Need a size grater than zero to make the lease reads the InputStream.
    Mockito.when(flowFile.getSize()).thenReturn(1L);
    doAnswer(new Answer<Object>() {
        @Override
        public Object answer(final InvocationOnMock invocationOnMock) {
            throw new ProducerFencedException("Intenitional exception thrown from unit test");
        }
    }).when(producer).beginTransaction();

    try {
        lease.beginTransaction();
        Assert.fail("Expected ProducerFencedException");
    } catch (final ProducerFencedException pfe) {
        // expected
    }

    assertEquals(1, lease.getPoisonCount());
}
 
源代码2 项目: Flink-CEPplus   文件: FlinkKafkaProducer011.java
@Override
protected void recoverAndCommit(KafkaTransactionState transaction) {
	if (transaction.isTransactional()) {
		try (
			FlinkKafkaProducer<byte[], byte[]> producer =
				initTransactionalProducer(transaction.transactionalId, false)) {
			producer.resumeTransaction(transaction.producerId, transaction.epoch);
			producer.commitTransaction();
		} catch (InvalidTxnStateException | ProducerFencedException ex) {
			// That means we have committed this transaction before.
			LOG.warn("Encountered error {} while recovering transaction {}. " +
					"Presumably this transaction has been already committed before",
				ex,
				transaction);
		}
	}
}
 
源代码3 项目: nifi   文件: TestPublishKafka_2_0.java
@Test
public void testPublisherPoisonedIfFencedDuringTransactionCreation() {
    runner.enqueue("hello world");
    runner.enqueue("Hello World");

    doAnswer(new Answer<Object>() {
        @Override
        public Object answer(final InvocationOnMock invocationOnMock) {
            throw new ProducerFencedException("Intentional ProducedFencedException for unit test");
        }
    }).when(mockLease).beginTransaction();

    runner.run();
    runner.assertAllFlowFilesTransferred(PublishKafka_2_0.REL_FAILURE, 2);

    verify(mockLease, times(1)).poison();
    verify(mockLease, times(1)).close();
}
 
源代码4 项目: flink   文件: FlinkKafkaProducer.java
@Override
protected void recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState transaction) {
	if (transaction.isTransactional()) {
		try (
			FlinkKafkaInternalProducer<byte[], byte[]> producer =
				initTransactionalProducer(transaction.transactionalId, false)) {
			producer.resumeTransaction(transaction.producerId, transaction.epoch);
			producer.commitTransaction();
		} catch (InvalidTxnStateException | ProducerFencedException ex) {
			// That means we have committed this transaction before.
			LOG.warn("Encountered error {} while recovering transaction {}. " +
					"Presumably this transaction has been already committed before",
				ex,
				transaction);
		}
	}
}
 
源代码5 项目: flink   文件: FlinkKafkaProducer011.java
@Override
protected void recoverAndCommit(KafkaTransactionState transaction) {
	if (transaction.isTransactional()) {
		try (
			FlinkKafkaProducer<byte[], byte[]> producer =
				initTransactionalProducer(transaction.transactionalId, false)) {
			producer.resumeTransaction(transaction.producerId, transaction.epoch);
			producer.commitTransaction();
		} catch (InvalidTxnStateException | ProducerFencedException ex) {
			// That means we have committed this transaction before.
			LOG.warn("Encountered error {} while recovering transaction {}. " +
					"Presumably this transaction has been already committed before",
				ex,
				transaction);
		}
	}
}
 
源代码6 项目: nifi   文件: TestPublisherLease.java
@Test
public void testPoisonOnExceptionCreatingTransaction() {
    final PoisonCountingLease lease = new PoisonCountingLease();

    final FlowFile flowFile = Mockito.spy(new MockFlowFile(1L));
    // Need a size grater than zero to make the lease reads the InputStream.
    Mockito.when(flowFile.getSize()).thenReturn(1L);
    doAnswer(new Answer<Object>() {
        @Override
        public Object answer(final InvocationOnMock invocationOnMock) {
            throw new ProducerFencedException("Intenitional exception thrown from unit test");
        }
    }).when(producer).beginTransaction();

    try {
        lease.beginTransaction();
        Assert.fail("Expected ProducerFencedException");
    } catch (final ProducerFencedException pfe) {
        // expected
    }

    assertEquals(1, lease.getPoisonCount());
}
 
源代码7 项目: nifi   文件: TestPublishKafkaRecord_2_0.java
@Test
public void testFailureWhenCreationgTransaction() {
    runner.enqueue("John Doe, 48");

    doAnswer(new Answer<Object>() {
        @Override
        public Object answer(final InvocationOnMock invocationOnMock) {
            throw new ProducerFencedException("Intentional ProducedFencedException for unit test");
        }
    }).when(mockLease).beginTransaction();

    runner.run();
    runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_FAILURE, 1);

    verify(mockLease, times(1)).poison();
    verify(mockLease, times(1)).close();
}
 
源代码8 项目: flink   文件: FlinkKafkaProducer.java
@Override
protected void recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState transaction) {
	if (transaction.isTransactional()) {
		FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
		try {
			producer =
				initTransactionalProducer(transaction.transactionalId, false);
			producer.resumeTransaction(transaction.producerId, transaction.epoch);
			producer.commitTransaction();
		} catch (InvalidTxnStateException | ProducerFencedException ex) {
			// That means we have committed this transaction before.
			LOG.warn("Encountered error {} while recovering transaction {}. " +
					"Presumably this transaction has been already committed before",
				ex,
				transaction);
		} finally {
			if (producer != null) {
				producer.close(0, TimeUnit.SECONDS);
			}
		}
	}
}
 
源代码9 项目: flink   文件: FlinkKafkaProducer011.java
@Override
protected void recoverAndCommit(KafkaTransactionState transaction) {
	if (transaction.isTransactional()) {
		try (
			FlinkKafkaProducer<byte[], byte[]> producer =
				initTransactionalProducer(transaction.transactionalId, false)) {
			producer.resumeTransaction(transaction.producerId, transaction.epoch);
			producer.commitTransaction();
		} catch (InvalidTxnStateException | ProducerFencedException ex) {
			// That means we have committed this transaction before.
			LOG.warn("Encountered error {} while recovering transaction {}. " +
					"Presumably this transaction has been already committed before",
				ex,
				transaction);
		}
	}
}
 
源代码10 项目: Flink-CEPplus   文件: FlinkKafkaProducerITCase.java
private void closeIgnoringProducerFenced(AutoCloseable autoCloseable) throws Exception {
	try {
		autoCloseable.close();
	}
	catch (Exception ex) {
		if (!(ex.getCause() instanceof ProducerFencedException)) {
			throw ex;
		}
	}
}
 
private void closeIgnoringProducerFenced(AutoCloseable autoCloseable) throws Exception {
	try {
		autoCloseable.close();
	}
	catch (Exception ex) {
		if (!(ex.getCause() instanceof ProducerFencedException)) {
			throw ex;
		}
	}
}
 
源代码12 项目: kafka_book_demo   文件: TransactionOnlySend.java
public static void main(String[] args) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class.getName());
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
    properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);

    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

    producer.initTransactions();
    producer.beginTransaction();

    try {
        //处理业务逻辑并创建ProducerRecord
        ProducerRecord<String, String> record1 = new ProducerRecord<>(topic, "msg1");
        producer.send(record1);
        ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, "msg2");
        producer.send(record2);
        ProducerRecord<String, String> record3 = new ProducerRecord<>(topic, "msg3");
        producer.send(record3);
        //处理一些其它逻辑
        producer.commitTransaction();
    } catch (ProducerFencedException e) {
        producer.abortTransaction();
    }
    producer.close();
}
 
源代码13 项目: flink   文件: FlinkKafkaInternalProducer.java
@Override
public void beginTransaction() throws ProducerFencedException {
	synchronized (producerClosingLock) {
		ensureNotClosed();
		kafkaProducer.beginTransaction();
	}
}
 
源代码14 项目: flink   文件: FlinkKafkaInternalProducer.java
@Override
public void commitTransaction() throws ProducerFencedException {
	synchronized (producerClosingLock) {
		ensureNotClosed();
		kafkaProducer.commitTransaction();
	}
}
 
源代码15 项目: flink   文件: FlinkKafkaInternalProducer.java
@Override
public void abortTransaction() throws ProducerFencedException {
	synchronized (producerClosingLock) {
		ensureNotClosed();
		kafkaProducer.abortTransaction();
	}
}
 
源代码16 项目: flink   文件: FlinkKafkaInternalProducer.java
@Override
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
	synchronized (producerClosingLock) {
		ensureNotClosed();
		kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
	}
}
 
源代码17 项目: flink   文件: FlinkKafkaProducerITCase.java
private void closeIgnoringProducerFenced(AutoCloseable autoCloseable) throws Exception {
	try {
		autoCloseable.close();
	}
	catch (Exception ex) {
		if (!(ex.getCause() instanceof ProducerFencedException)) {
			throw ex;
		}
	}
}
 
源代码18 项目: flink   文件: FlinkKafkaProducer.java
@Override
public void beginTransaction() throws ProducerFencedException {
	synchronized (producerClosingLock) {
		ensureNotClosed();
		kafkaProducer.beginTransaction();
	}
}
 
源代码19 项目: flink   文件: FlinkKafkaProducer.java
@Override
public void commitTransaction() throws ProducerFencedException {
	synchronized (producerClosingLock) {
		ensureNotClosed();
		kafkaProducer.commitTransaction();
	}
}
 
源代码20 项目: flink   文件: FlinkKafkaProducer.java
@Override
public void abortTransaction() throws ProducerFencedException {
	synchronized (producerClosingLock) {
		ensureNotClosed();
		kafkaProducer.abortTransaction();
	}
}
 
源代码21 项目: flink   文件: FlinkKafkaProducer.java
@Override
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
	synchronized (producerClosingLock) {
		ensureNotClosed();
		kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
	}
}
 
源代码22 项目: flink   文件: FlinkKafkaProducer011ITCase.java
private void closeIgnoringProducerFenced(AutoCloseable autoCloseable) throws Exception {
	try {
		autoCloseable.close();
	}
	catch (Exception ex) {
		if (!(ex.getCause() instanceof ProducerFencedException)) {
			throw ex;
		}
	}
}
 
public static void main(String[] args) {
    //初始化生产者和消费者
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getConsumerProperties());
    consumer.subscribe(Collections.singletonList("topic-source"));
    KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerProperties());
    //初始化事务
    producer.initTransactions();
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        if (!records.isEmpty()) {
            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
            //开启事务
            producer.beginTransaction();
            try {
                for (TopicPartition partition : records.partitions()) {
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                    for (ConsumerRecord<String, String> record : partitionRecords) {
                        //do some logical processing.
                        ProducerRecord<String, String> producerRecord =
                                new ProducerRecord<>("topic-sink", record.key(), record.value());
                        //消费-生产模型
                        producer.send(producerRecord);
                    }
                    long lastConsumedOffset = partitionRecords.
                            get(partitionRecords.size() - 1).offset();
                    offsets.put(partition, new OffsetAndMetadata(lastConsumedOffset + 1));
                }
                //提交消费位移
                producer.sendOffsetsToTransaction(offsets, "groupId");
                //提交事务
                producer.commitTransaction();
            } catch (ProducerFencedException e) {
                //log the exception
                //中止事务
                producer.abortTransaction();
            }
        }
    }
}
 
源代码24 项目: BigData-In-Practice   文件: TransactionOnlySend.java
public static void main(String[] args) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,                StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,                StringSerializer.class.getName());
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
    properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);

    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

    producer.initTransactions();
    producer.beginTransaction();

    try {
        //处理业务逻辑并创建ProducerRecord
        ProducerRecord<String, String> record1 = new ProducerRecord<>(topic, "msg1");
        producer.send(record1);
        ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, "msg2");
        producer.send(record2);
        ProducerRecord<String, String> record3 = new ProducerRecord<>(topic, "msg3");
        producer.send(record3);
        //处理一些其它逻辑
        producer.commitTransaction();
    } catch (ProducerFencedException e) {
        producer.abortTransaction();
    }
    producer.close();
}
 
源代码25 项目: extension-kafka   文件: KafkaPublisher.java
private void tryBeginTxn(Producer<?, ?> producer) {
    try {
        producer.beginTransaction();
    } catch (ProducerFencedException e) {
        logger.warn("Unable to begin transaction", e);
        throw new EventPublicationFailedException(
                "Event publication failed, exception occurred while starting Kafka transaction", e
        );
    }
}
 
源代码26 项目: extension-kafka   文件: KafkaPublisher.java
private void tryCommit(Producer<?, ?> producer, MonitorCallback monitorCallback) {
    try {
        producer.commitTransaction();
        monitorCallback.reportSuccess();
    } catch (ProducerFencedException e) {
        logger.warn("Unable to commit transaction", e);
        monitorCallback.reportFailure(e);
        throw new EventPublicationFailedException(
                "Event publication failed, exception occurred while committing Kafka transaction", e
        );
    }
}
 
源代码27 项目: extension-kafka   文件: KafkaPublisherTest.java
@SuppressWarnings("unchecked")
private static DefaultProducerFactory<String, byte[]> producerFactoryWithFencedExceptionOnBeginTransaction() {
    DefaultProducerFactory<String, byte[]> producerFactory =
            mock(DefaultProducerFactory.class, "FactoryForExceptionOnBeginTx");
    Producer<String, byte[]> producer = mock(Producer.class, "ExceptionOnBeginTxMock");
    when(producerFactory.confirmationMode()).thenReturn(ConfirmationMode.TRANSACTIONAL);
    when(producerFactory.createProducer()).thenReturn(producer);
    doThrow(ProducerFencedException.class).when(producer).beginTransaction();
    return producerFactory;
}
 
源代码28 项目: extension-kafka   文件: KafkaPublisherTest.java
@SuppressWarnings("unchecked")
private static DefaultProducerFactory<String, byte[]> producerFactoryWithFencedExceptionOnCommit() {
    DefaultProducerFactory<String, byte[]> producerFactory = mock(DefaultProducerFactory.class);
    Producer<String, byte[]> producer = mock(Producer.class, "ExceptionOnCommitTxMock");
    when(producerFactory.confirmationMode()).thenReturn(ConfirmationMode.TRANSACTIONAL);
    when(producerFactory.createProducer()).thenReturn(producer);
    doThrow(ProducerFencedException.class).when(producer).commitTransaction();
    return producerFactory;
}
 
@Override
public void beginTransaction() throws ProducerFencedException {
  verifyOpen();

  delegateLock.readLock().lock();
  try {
    delegate.beginTransaction();
  } finally {
    delegateLock.readLock().unlock();
  }
}
 
@Override
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
  verifyOpen();

  delegateLock.readLock().lock();
  try {
    delegate.sendOffsetsToTransaction(offsets, consumerGroupId);
  } finally {
    delegateLock.readLock().unlock();
  }
}
 
 类所在包
 类方法
 同包方法