下面列出了org.apache.kafka.clients.producer.Callback#onCompletion ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testSendMessageAndCallBack() throws Throwable {
producerInterceptor.beforeMethod(kafkaProducerInstance, null, arguments, argumentType, null);
Object argument = arguments[1];
if (null != argument) {
Callback callback = (Callback) argument;
callback.onCompletion(null, null);
}
producerInterceptor.afterMethod(kafkaProducerInstance, null, arguments, argumentType, null);
List<TraceSegment> traceSegmentList = segmentStorage.getTraceSegments();
assertThat(traceSegmentList.size(), is(1));
TraceSegment segment = traceSegmentList.get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(segment);
assertThat(spans.size(), is(2));
assertCallbackSpan(spans.get(0));
}
@Test
public void testCallbackWillNotTriggerOnFailedDeliveryOnNoException() {
final ProducerRecord<String,String> record = new ProducerRecord<String,String>("topic", 0, null, "msg");
unit.send(producer, record, "msg", failedDeliveryCallback);
final ArgumentCaptor<Callback> callbackCaptor = ArgumentCaptor.forClass(Callback.class);
verify(producer).send(Mockito.refEq(record), callbackCaptor.capture());
final Callback callback = callbackCaptor.getValue();
callback.onCompletion(recordMetadata, null);
verify(failedDeliveryCallback, never()).onFailedDelivery(anyString(), any(Throwable.class));
}
@Test
public void testCallbackWillTriggerOnFailedDeliveryOnException() {
final IOException exception = new IOException("KABOOM");
final ProducerRecord<String,String> record = new ProducerRecord<String,String>("topic", 0, null, "msg");
unit.send(producer, record, "msg", failedDeliveryCallback);
final ArgumentCaptor<Callback> callbackCaptor = ArgumentCaptor.forClass(Callback.class);
verify(producer).send(Mockito.refEq(record), callbackCaptor.capture());
final Callback callback = callbackCaptor.getValue();
callback.onCompletion(recordMetadata, exception);
verify(failedDeliveryCallback).onFailedDelivery("msg", exception);
}
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, final Callback callback) {
boolean error = errorManager.nextError(record.value());
if (errorManager.nextError(record.value()))
{
final Exception e = new Exception();
callback.onCompletion(null, e);
return nullFuture;
}
else {
return super.send(record, callback);
}
}
@Override
public Future<RecordMetadata> send(ProducerRecord<String, String> record, Callback callback) {
TopicPartition tp = new TopicPartition(record.topic(), 0);
RecordMetadata rm = new RecordMetadata(tp, -1L, -1L, 1L, 2L, 3, 4);
if (callback != null) callback.onCompletion(rm, null);
return Futures.immediateFuture(rm);
}
@Test public void onCompletion_shouldKeepContext_whenNotSampled() {
Span span = tracing.tracer().nextSpan(TraceContextOrSamplingFlags.NOT_SAMPLED);
Callback delegate =
(metadata, exception) -> assertThat(tracing.tracer().currentSpan()).isEqualTo(span);
Callback tracingCallback = TracingCallback.create(delegate, span, tracing.currentTraceContext());
tracingCallback.onCompletion(null, null);
}
@Test public void on_completion_should_finish_span() {
Span span = tracing.tracer().nextSpan().start();
Callback tracingCallback = TracingCallback.create(null, span, currentTraceContext);
tracingCallback.onCompletion(createRecordMetadata(), null);
assertThat(spans.get(0).finishTimestamp()).isNotZero();
}
@Test public void on_completion_should_tag_if_exception() {
Span span = tracing.tracer().nextSpan().start();
Callback tracingCallback = TracingCallback.create(null, span, currentTraceContext);
tracingCallback.onCompletion(null, error);
assertThat(spans.get(0).finishTimestamp()).isNotZero();
assertThat(spans.get(0).error()).isEqualTo(error);
}
@Test public void on_completion_should_forward_then_finish_span() {
Span span = tracing.tracer().nextSpan().start();
Callback delegate = mock(Callback.class);
Callback tracingCallback = TracingCallback.create(delegate, span, currentTraceContext);
RecordMetadata md = createRecordMetadata();
tracingCallback.onCompletion(md, null);
verify(delegate).onCompletion(md, null);
assertThat(spans.get(0).finishTimestamp()).isNotZero();
}
@Test public void on_completion_should_forward_then_tag_if_exception() {
Span span = tracing.tracer().nextSpan().start();
Callback delegate = mock(Callback.class);
Callback tracingCallback = TracingCallback.create(delegate, span, currentTraceContext);
RecordMetadata md = createRecordMetadata();
tracingCallback.onCompletion(md, error);
verify(delegate).onCompletion(md, error);
assertThat(spans.get(0).finishTimestamp()).isNotZero();
assertThat(spans.get(0).error()).isEqualTo(error);
}
@Override
public synchronized Future<RecordMetadata> send(ProducerRecord<String, String> record, Callback callback) {
callback.onCompletion(null, new SimulatedWriteException());
return super.send(record, callback);
}