org.apache.kafka.clients.producer.Callback#onCompletion ( )源码实例Demo

下面列出了org.apache.kafka.clients.producer.Callback#onCompletion ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: skywalking   文件: KafkaProducerInterceptorTest.java
@Test
public void testSendMessageAndCallBack() throws Throwable {
    producerInterceptor.beforeMethod(kafkaProducerInstance, null, arguments, argumentType, null);
    Object argument = arguments[1];
    if (null != argument) {
        Callback callback = (Callback) argument;
        callback.onCompletion(null, null);
    }
    producerInterceptor.afterMethod(kafkaProducerInstance, null, arguments, argumentType, null);

    List<TraceSegment> traceSegmentList = segmentStorage.getTraceSegments();
    assertThat(traceSegmentList.size(), is(1));

    TraceSegment segment = traceSegmentList.get(0);
    List<AbstractTracingSpan> spans = SegmentHelper.getSpans(segment);
    assertThat(spans.size(), is(2));

    assertCallbackSpan(spans.get(0));
}
 
@Test
public void testCallbackWillNotTriggerOnFailedDeliveryOnNoException() {
    final ProducerRecord<String,String> record = new ProducerRecord<String,String>("topic", 0, null, "msg");
    unit.send(producer, record, "msg", failedDeliveryCallback);

    final ArgumentCaptor<Callback> callbackCaptor = ArgumentCaptor.forClass(Callback.class);
    verify(producer).send(Mockito.refEq(record), callbackCaptor.capture());

    final Callback callback = callbackCaptor.getValue();
    callback.onCompletion(recordMetadata, null);

    verify(failedDeliveryCallback, never()).onFailedDelivery(anyString(), any(Throwable.class));
}
 
@Test
public void testCallbackWillTriggerOnFailedDeliveryOnException() {
    final IOException exception = new IOException("KABOOM");
    final ProducerRecord<String,String> record = new ProducerRecord<String,String>("topic", 0, null, "msg");
    unit.send(producer, record, "msg", failedDeliveryCallback);

    final ArgumentCaptor<Callback> callbackCaptor = ArgumentCaptor.forClass(Callback.class);
    verify(producer).send(Mockito.refEq(record), callbackCaptor.capture());

    final Callback callback = callbackCaptor.getValue();
    callback.onCompletion(recordMetadata, exception);

    verify(failedDeliveryCallback).onFailedDelivery("msg", exception);
}
 
源代码4 项目: incubator-gobblin   文件: FlakyKafkaProducer.java
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, final Callback callback) {
  boolean error = errorManager.nextError(record.value());
  if (errorManager.nextError(record.value()))
  {
    final Exception e = new Exception();
    callback.onCompletion(null, e);
    return nullFuture;
  }
  else {
    return super.send(record, callback);
  }

}
 
源代码5 项目: brave   文件: TracingProducerBenchmarks.java
@Override
public Future<RecordMetadata> send(ProducerRecord<String, String> record, Callback callback) {
  TopicPartition tp = new TopicPartition(record.topic(), 0);
  RecordMetadata rm = new RecordMetadata(tp, -1L, -1L, 1L, 2L, 3, 4);
  if (callback != null) callback.onCompletion(rm, null);
  return Futures.immediateFuture(rm);
}
 
源代码6 项目: brave   文件: TracingCallbackTest.java
@Test public void onCompletion_shouldKeepContext_whenNotSampled() {
  Span span = tracing.tracer().nextSpan(TraceContextOrSamplingFlags.NOT_SAMPLED);

  Callback delegate =
    (metadata, exception) -> assertThat(tracing.tracer().currentSpan()).isEqualTo(span);
  Callback tracingCallback = TracingCallback.create(delegate, span, tracing.currentTraceContext());

  tracingCallback.onCompletion(null, null);
}
 
源代码7 项目: brave   文件: TracingCallbackTest.java
@Test public void on_completion_should_finish_span() {
  Span span = tracing.tracer().nextSpan().start();

  Callback tracingCallback = TracingCallback.create(null, span, currentTraceContext);
  tracingCallback.onCompletion(createRecordMetadata(), null);

  assertThat(spans.get(0).finishTimestamp()).isNotZero();
}
 
源代码8 项目: brave   文件: TracingCallbackTest.java
@Test public void on_completion_should_tag_if_exception() {
  Span span = tracing.tracer().nextSpan().start();

  Callback tracingCallback = TracingCallback.create(null, span, currentTraceContext);
  tracingCallback.onCompletion(null, error);

  assertThat(spans.get(0).finishTimestamp()).isNotZero();
  assertThat(spans.get(0).error()).isEqualTo(error);
}
 
源代码9 项目: brave   文件: TracingCallbackTest.java
@Test public void on_completion_should_forward_then_finish_span() {
  Span span = tracing.tracer().nextSpan().start();

  Callback delegate = mock(Callback.class);
  Callback tracingCallback = TracingCallback.create(delegate, span, currentTraceContext);
  RecordMetadata md = createRecordMetadata();
  tracingCallback.onCompletion(md, null);

  verify(delegate).onCompletion(md, null);

  assertThat(spans.get(0).finishTimestamp()).isNotZero();
}
 
源代码10 项目: brave   文件: TracingCallbackTest.java
@Test public void on_completion_should_forward_then_tag_if_exception() {
  Span span = tracing.tracer().nextSpan().start();

  Callback delegate = mock(Callback.class);
  Callback tracingCallback = TracingCallback.create(delegate, span, currentTraceContext);
  RecordMetadata md = createRecordMetadata();
  tracingCallback.onCompletion(md, error);

  verify(delegate).onCompletion(md, error);

  assertThat(spans.get(0).finishTimestamp()).isNotZero();
  assertThat(spans.get(0).error()).isEqualTo(error);
}
 
源代码11 项目: vertx-kafka-client   文件: ProducerMockTest.java
@Override
public synchronized Future<RecordMetadata> send(ProducerRecord<String, String> record, Callback callback) {
  callback.onCompletion(null, new SimulatedWriteException());
  return super.send(record, callback);
}