org.apache.kafka.clients.producer.BufferExhaustedException#org.apache.kafka.clients.producer.RecordMetadata源码实例Demo

下面列出了org.apache.kafka.clients.producer.BufferExhaustedException#org.apache.kafka.clients.producer.RecordMetadata 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: apicurio-registry   文件: AsyncProducer.java
@Override
public CompletableFuture<RecordMetadata> apply(ProducerRecord<K, V> record) {
    CompletableFuture<RecordMetadata> result = null;
    try {
        KafkaProducer<K, V> producer = getProducer();
        result = new CFC(producer);
        producer.send(record, (CFC) result);
    } catch (Exception e) {
        if (result != null) {
            ((CFC) result).onCompletion(null, e);
        } else {
            result = new CompletableFuture<>();
            result.completeExceptionally(e);
        }
    }
    return result;
}
 
源代码2 项目: DBus   文件: MessageProcessor.java
/***
 * send stat info to statistic topic, do not care about success or not.
 * @param message
 */
private void sendTableStatInfo(StatMessage message) {

    String key = String.format("%s.%s.%s.%s.%s", message.getDsName(), message.getSchemaName(), message.getTableName(),
            message.getType(), message.getTxTimeMS());
    String value = message.toJSONString();

    Callback callback = new Callback() {
        @Override
        public void onCompletion(RecordMetadata ignored, Exception e) {
            if (e != null) {
                logger.error(String.format("Send statistic FAIL: toTopic=%s, key=%s", statTopic, key));
            } else {
                logger.info(String.format("  Send statistic successful: toTopic=%s, key=(%s)", statTopic, key));
            }
        }
    };

    Future<RecordMetadata> result = producer.send(new ProducerRecord<>(statTopic, key, value), callback);
}
 
源代码3 项目: singer   文件: AuditEventKafkaSender.java
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  try {
    if (e == null) {
      OpenTsdbMetricConverter
          .incr(LoggingAuditClientMetrics.AUDIT_CLIENT_SENDER_KAFKA_EVENTS_ACKED, 1,
              "host=" + host, "stage=" + stage.toString(),
              "logName=" + event.getLoggingAuditHeaders().getLogName());

      // if send is successful, remove the event from the map eventTriedCount if it was added
      // LoggingAuditHeaders can uniquely identify an event.
      eventTriedCount.remove(event.getLoggingAuditHeaders());
      // if send out successfully, remove the partition from the badPartitions if it was added.
      badPartitions.remove(recordMetadata.partition());
    } else {
      checkAndEnqueueWhenSendFailed();
    }
  } catch (Throwable t) {
    LOG.warn("Exception throws in the callback. Drop this event {}", event, t);
    OpenTsdbMetricConverter
        .incr(LoggingAuditClientMetrics.AUDIT_CLIENT_SENDER_KAFKA_CALLBACK_EXCEPTION, 1,
            "host=" + host, "stage=" + stage.toString(), "topic=" + topic);
  }
}
 
源代码4 项目: java-kafka-client   文件: TracingKafkaProducer.java
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback,
    SpanContext parent) {
  /*
  // Create wrappedRecord because headers can be read only in record (if record is sent second time)
  ProducerRecord<K, V> wrappedRecord = new ProducerRecord<>(record.topic(),
      record.partition(),
      record.timestamp(),
      record.key(),
      record.value(),
      record.headers());
  */

  Span span = TracingKafkaUtils
      .buildAndInjectSpan(record, tracer, producerSpanNameProvider, parent, spanDecorators);
  try (Scope ignored = tracer.activateSpan(span)) {
    Callback wrappedCallback = new TracingCallback(callback, span, tracer, spanDecorators);
    return producer.send(record, wrappedCallback);
  }
}
 
源代码5 项目: SO   文件: ContextModelController.java
private ContextModelForMQ processContextModel(ContextModelForIf2 contextModelForIf, HttpServletRequest request) {
    log.debug("input:ContextModelForIf: {}", contextModelForIf);
    // create a message From ContextModelForMQ for messageQueue, publish to message queue
    // ContextModelForIf --> ContextModelForMQ
    ContextModelForMQ contextModelForMQ = ContextModelMapper2.toContextModelForMQ(contextModelForIf);

    // tracking
    TrackingEntity trackingEntity = (TrackingEntity) request.getSession().getAttribute("tracking");
    trackingEntity.setSimulatorType(contextModelForIf.getSimulatorType());  // simulator type 지정
    contextModelForMQ.setTrackingEntity(trackingEntity);

    contextModelForMQ.addState(Const.CONTEXTMODEL_ID, contextModelForIf.getContextId());
    contextModelForMQ.addState(Const.RESULT_CM_VALUE, contextModelForIf.getResultCmValue());
    
    log.debug("converted:ContextModelForMQ: {}", contextModelForMQ);
    //object to json
    String contextModelForMqString = ContextModelMapper2.writeJsonString(contextModelForMQ);
    log.debug("generated:ContextModelForMQ {}", contextModelForMqString);
    //context model producer handler
    DefaultProducerHandler producerHandler = new DefaultProducerHandler(0, "contextmodel");
    Future<RecordMetadata> future = producerHandler.send(contextModelForMQ);
    producerHandler.close();
    log.debug("producer.send result: {}", future);

    return contextModelForMQ;
}
 
/**
 * Send a CruiseControlMetric to the Kafka topic.
 * @param ccm the Cruise Control metric to send.
 */
public void sendCruiseControlMetric(CruiseControlMetric ccm) {
  // Use topic name as key if existing so that the same sampler will be able to collect all the information
  // of a topic.
  String key = ccm.metricClassId() == CruiseControlMetric.MetricClassId.TOPIC_METRIC ?
      ((TopicMetric) ccm).topic() : Integer.toString(ccm.brokerId());
  ProducerRecord<String, CruiseControlMetric> producerRecord =
      new ProducerRecord<>(_cruiseControlMetricsTopic, null, ccm.time(), key, ccm);
  LOG.debug("Sending Cruise Control metric {}.", ccm);
  _producer.send(producerRecord, new Callback() {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
      if (e != null) {
        LOG.warn("Failed to send Cruise Control metric {}", ccm);
        _numMetricSendFailure++;
      }
    }
  });
}
 
源代码7 项目: rya   文件: KafkaPeriodicBindingSetExporter.java
/**
 * Exports BindingSets to Kafka.  The BindingSet and topic are extracted from
 * the indicated BindingSetRecord and the BindingSet is then exported to the topic.
 */
@Override
public void exportNotification(final BindingSetRecord record) throws BindingSetRecordExportException {
    try {
        log.info("Exporting {} records to Kafka to topic: {}", record.getBindingSet().size(), record.getTopic());
        final String bindingName = IncrementalUpdateConstants.PERIODIC_BIN_ID;

        final BindingSet bindingSet = record.getBindingSet();
        final String topic = record.getTopic();
        final long binId = ((Literal) bindingSet.getValue(bindingName)).longValue();

        final Future<RecordMetadata> future = producer
            .send(new ProducerRecord<String, BindingSet>(topic, Long.toString(binId), bindingSet));
        //wait for confirmation that results have been received
        future.get(5, TimeUnit.SECONDS);
    } catch (final Exception e) {  // catch all possible exceptional behavior and throw as our checked exception.
        throw new BindingSetRecordExportException(e.getMessage(), e);
    }
}
 
源代码8 项目: DBus   文件: ProjectTopologyService.java
public void rerunTopology(String topologyCode, String ctrlMsg) {
    KafkaProducer<String, byte[]> producer = null;
    try {
        String topic = StringUtils.joinWith("_", topologyCode, "ctrl");
        Properties props = zkService.getProperties(KeeperConstants.KEEPER_CTLMSG_PRODUCER_CONF);
        Properties globalConf = zkService.getProperties(KeeperConstants.GLOBAL_CONF);
        props.setProperty(GLOBAL_CONF_KEY_BOOTSTRAP_SERVERS, globalConf.getProperty(GLOBAL_CONF_KEY_BOOTSTRAP_SERVERS));
        if (StringUtils.equals(SecurityConfProvider.getSecurityConf(zkService), Constants.SECURITY_CONFIG_TRUE_VALUE)) {
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        }
        producer = new KafkaProducer<>(props);
        producer.send(new ProducerRecord<String, byte[]>(topic, ctrlMsg.getBytes()), new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
            }
        });
    } catch (Exception e) {
        throw new RuntimeException(e);
    } finally {
        if (producer != null) producer.close();
    }
}
 
源代码9 项目: DBus   文件: MetaEventWarningSender.java
public void sendMaasAppenderMessage(MaasAppenderMessage maasAppenderMessage) {
    ControlMessage message = new ControlMessage(System.currentTimeMillis(), ControlType.G_MAAS_APPENDER_EVENT.toString(), "dbus-appender");

    message.setPayload(JSONObject.parseObject(maasAppenderMessage.toString()));

    String topic = PropertiesHolder.getProperties(Constants.Properties.CONFIGURE, Constants.ConfigureKey.GLOBAL_EVENT_TOPIC);
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, message.getType(), message.toJSONString());
    Future<RecordMetadata> future = producer.send(record, (metadata, exception) -> {
        if (exception != null) {
            logger.error("Send global event error.{}", exception.getMessage());
        }
    });
    try {
        future.get(10000, TimeUnit.MILLISECONDS);
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    }
}
 
源代码10 项目: incubator-gobblin   文件: Kafka08DataWriter.java
@Override
public WriteResponse wrap(final RecordMetadata recordMetadata) {
  return new WriteResponse<RecordMetadata>() {
    @Override
    public RecordMetadata getRawResponse() {
      return recordMetadata;
    }

    @Override
    public String getStringResponse() {
      return recordMetadata.toString();
    }

    @Override
    public long bytesWritten() {
      // Don't know how many bytes were written
      return -1;
    }
  };
}
 
源代码11 项目: ambari-logsearch   文件: OutputKafka.java
public void onCompletion(RecordMetadata metadata, Exception exception) {
  if (metadata != null) {
    if (!output.isKafkaBrokerUp) {
      logger.info("Started writing to kafka. " + output.getShortDescription());
      output.isKafkaBrokerUp = true;
    }
    output.incrementStat(1);
    output.writeBytesMetric.value += message.length();
  } else {
    output.isKafkaBrokerUp = false;
    String logKeyMessage = this.getClass().getSimpleName() + "_KAFKA_ASYNC_ERROR";
    LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "Error sending message to Kafka. Async Callback", exception, logger,
        Level.ERROR);

    output.failedMessages.add(this);
  }
}
 
源代码12 项目: ranger   文件: KafkaRangerAuthorizerTest.java
@Test
public void testAuthorizedWrite() throws Exception {
    // Create the Producer
    Properties producerProps = new Properties();
    producerProps.put("bootstrap.servers", "localhost:" + port);
    producerProps.put("acks", "all");
    producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
    producerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
    producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, serviceKeystorePath);
    producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "sspass");
    producerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "skpass");
    producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
    producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
    
    final Producer<String, String> producer = new KafkaProducer<>(producerProps);
    // Send a message
    Future<RecordMetadata> record = 
        producer.send(new ProducerRecord<String, String>("dev", "somekey", "somevalue"));
    producer.flush();
    record.get();

    producer.close();
}
 
源代码13 项目: kafka-workers   文件: MergerTest.java
@Override
public void process(WorkerRecord<String, String> record, RecordStatusObserver observer) {
    logger.info("process(partition: {}, timestamp: {})", record.partition(), record.timestamp());

    Future<RecordMetadata> future = taskProducer.send(new ProducerRecord<>(
            OUTPUT_TOPIC,
            record.partition(),
            record.timestamp(),
            record.key(),
            record.value()));

    try {
        future.get();
    } catch (InterruptedException | ExecutionException e) {
        observer.onFailure(new ProcessingFailureException("could not send " + record, e));
    }

    observer.onSuccess();
}
 
源代码14 项目: stream-registry   文件: KafkaEventSenderTest.java
@Test
public void correlatorSuccess() {
  var correlator = mock(EventCorrelator.class);
  var correlationStrategy = new CorrelationStrategyImpl(correlator);

  var underTest = new KafkaEventSender(config, correlationStrategy, converter, producer);

  when(correlator.register(any())).thenReturn("correlationId");

  var result = underTest.send(event);

  verify(correlator).register(result);
  verify(producer).send(recordCaptor.capture(), callbackCaptor.capture());

  var record = recordCaptor.getValue();
  assertThat(record.topic(), is("topic"));
  assertThat(record.key(), is(avroKey));
  assertThat(record.value(), is(avroValue));
  assertThat(record.headers().toArray().length, is(1));

  var callback = callbackCaptor.getValue();
  assertThat(result.isDone(), is(false));
  var recordMetadata = mock(RecordMetadata.class);
  callback.onCompletion(recordMetadata, null);
  assertThat(result.isDone(), is(false));
}
 
/**
 * Topic topicName will be automatically created if it doesn't exist.
 * @param topicName
 * @param recordsToPublish
 * @param timestamp
 * @return
 * @throws InterruptedException
 * @throws TimeoutException
 * @throws ExecutionException
 */
public Map<String, RecordMetadata> produceData(String topicName,
                                               Map<String, GenericRow> recordsToPublish,
                                               Serializer<GenericRow> serializer,
                                               Long timestamp)
        throws InterruptedException, TimeoutException, ExecutionException {

  createTopic(topicName);

  Properties producerConfig = properties();
  KafkaProducer<String, GenericRow> producer =
          new KafkaProducer<>(producerConfig, new StringSerializer(), serializer);

  Map<String, RecordMetadata> result = new HashMap<>();
  for (Map.Entry<String, GenericRow> recordEntry : recordsToPublish.entrySet()) {
    String key = recordEntry.getKey();
    Future<RecordMetadata> recordMetadataFuture
        = producer.send(buildRecord(topicName, timestamp, recordEntry, key));
    result.put(key,
               recordMetadataFuture.get(TEST_RECORD_FUTURE_TIMEOUT_MS, TimeUnit.MILLISECONDS));
  }
  producer.close();

  return result;
}
 
protected void sendTuple(T tuple)
{
  if (alreadyInKafka(tuple)) {
    return;
  }

  getProducer().send(new ProducerRecord<>(getTopic(), key, tuple), new Callback()
  {
    public void onCompletion(RecordMetadata metadata, Exception e)
    {
      if (e != null) {
        logger.info("Wrting to Kafka failed with an exception {}" + e.getMessage());
        throw new RuntimeException(e);
      }
    }
  });
}
 
/**
 * Topic topicName will be automatically created if it doesn't exist.
 * @param topicName
 * @param recordsToPublish
 * @param schema
 * @return
 * @throws InterruptedException
 * @throws TimeoutException
 * @throws ExecutionException
 */
public Map<String, RecordMetadata> produceInputData(String topicName, Map<String, GenericRow> recordsToPublish, Schema schema)
    throws InterruptedException, TimeoutException, ExecutionException {

  KafkaProducer<String, GenericRow> producer =
      new KafkaProducer<>(producerConfig, new StringSerializer(), new KsqlJsonSerializer(schema));

  Map<String, RecordMetadata> result = new HashMap<>();
  for (Map.Entry<String, GenericRow> recordEntry : recordsToPublish.entrySet()) {
    String key = recordEntry.getKey();
    ProducerRecord<String, GenericRow> producerRecord = new ProducerRecord<>(topicName, key, recordEntry.getValue());
    Future<RecordMetadata> recordMetadataFuture = producer.send(producerRecord);
    result.put(key, recordMetadataFuture.get(TEST_RECORD_FUTURE_TIMEOUT_MS, TimeUnit.MILLISECONDS));
  }
  producer.close();

  return result;
}
 
源代码18 项目: jMetalSP   文件: CounterProviderAVRO.java
public void run() {
    int count = 0;
    long startTime = System.currentTimeMillis();
    DataSerializer<Counter> counterSerializer = new DataSerializer();
    while (true) {
        Counter counter = new Counter(count);
        byte [] aux= counterSerializer.serializeMessage(counter,"avsc/Counter.avsc");
        Future<RecordMetadata> send =

                producer.send(new ProducerRecord<Integer, byte[]>
                        (topic, count, aux));
        System.out.println("Kafka enviado : "+count);
        count++;

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}
 
private void reconsumeLater(ConsumerRecord<String, byte[]> consumeRecord) throws InterruptedException, ExecutionException {

		// add all header to headList except RETRY_COUNT
		Headers headers = consumeRecord.headers();
		List<Header> headerList = new ArrayList<Header>(8);
		Iterator<Header> iterator = headers.iterator();
		Integer retryCount = -1;
		boolean hasOrignalHeader = false;
		while (iterator.hasNext()) {
			Header next = iterator.next();
			if (next.key().equals(RETRY_COUNT_KEY)) {
				retryCount = serializer.deserialize(next.value());
				continue;
			}
			
			if(next.key().equals(ORGINAL_TOPIC)){
				hasOrignalHeader = true;
			}
			headerList.add(next);
		}
		
		// add RETRY_COUNT to header
		retryCount++;
		headerList.add(new RecordHeader(RETRY_COUNT_KEY, serializer.serialization(retryCount)));
		
		if(!hasOrignalHeader){
			headerList.add(new RecordHeader(ORGINAL_TOPIC, serializer.serialization(consumeRecord.topic())));
		}

		// send message to corresponding queue according to retry times
		String retryTopic = calcRetryTopic(consumeRecord.topic(), retryCount);
		
		ProducerRecord<String, byte[]> record = new ProducerRecord<>(retryTopic,
				consumeRecord.partition() % retryQueuePartitionCount.get(retryTopic), null, consumeRecord.key(),
				consumeRecord.value(), headerList);
		Future<RecordMetadata> publishKafkaMessage = retryQueueMsgProducer.publishKafkaMessage(record);
		publishKafkaMessage.get();
	}
 
源代码20 项目: ja-micro   文件: KafkaPublisherTest.java
@Test
public void sendFailsReturnsFalse() {
    KafkaProducer producer = mock(KafkaProducer.class);
    publisher.realProducer = producer;
    RecordMetadata metadata = new RecordMetadata(null, 0, 0,
            0, Long.valueOf(0), 0, 0);
    ArgumentCaptor<Callback> captor = ArgumentCaptor.forClass(Callback.class);
    when(producer.send(any(), captor.capture())).then(
        invocation -> {
            captor.getValue().onCompletion(metadata, new TimeoutException("error"));
            return new CompletableFuture();
        });
    String[] events = { "test" };
    assertThat(publisher.publishEvents(false, null, events)).isFalse();
}
 
源代码21 项目: apicurio-registry   文件: KafkaRegistryStorage.java
private CompletableFuture<RecordMetadata> makeSnapshot(long timestamp) {
    return snapshotProducer.apply(new ProducerRecord<>(
        snapshotTopic,
        timestamp,
        new StorageSnapshot(storage, global, artifactRules, globalRules, offset)
    )).whenComplete((recordMeta, exception) -> {
        if (exception != null) {
            log.warn("Exception dumping automatic snapshot: ", exception);
        } else {
            log.info("Dumped automatic snapshot to {} ({} bytes)", recordMeta, recordMeta.serializedValueSize());
        }
    });
}
 
源代码22 项目: data-highway   文件: OnrampImpl.java
@Override
protected Future<Boolean> sendEncodedEvent(Event<byte[], byte[]> event, SchemaVersion schemaVersion)
  throws InvalidKeyException {
  ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(getRoad().getTopicName(), event.getKey(),
      event.getMessage());
  Future<RecordMetadata> future = kafkaProducer.send(record, this::updateMetrics);
  return Futures.lazyTransform(future, metadata -> true);
}
 
源代码23 项目: data-highway   文件: OnrampImpl.java
private void updateMetrics(RecordMetadata metadata, Exception e) {
  if (e == null) {
    metrics.markSuccessMetrics(getRoad().getName(), metadata.serializedValueSize());
  } else {
    metrics.markFailureMetrics(getRoad().getName());
  }
}
 
源代码24 项目: brave   文件: TracingCallbackTest.java
@Test public void on_completion_should_forward_then_finish_span() {
  Span span = tracing.tracer().nextSpan().start();

  Callback delegate = mock(Callback.class);
  Callback tracingCallback = TracingCallback.create(delegate, span, currentTraceContext);
  RecordMetadata md = createRecordMetadata();
  tracingCallback.onCompletion(md, null);

  verify(delegate).onCompletion(md, null);

  assertThat(spans.get(0).finishTimestamp()).isNotZero();
}
 
源代码25 项目: datacollector   文件: BaseKafkaProducer09.java
@Override
public List<Record> write(Stage.Context context) throws StageException {
  // force all records in the buffer to be written out
  producer.flush();
  // make sure each record was written and handle exception if any
  List<Integer> failedRecordIndices = new ArrayList<Integer>();
  List<Exception> failedRecordExceptions = new ArrayList<Exception>();
  List<Record> responseRecords = new ArrayList<>();
  for (int i = 0; i < futureList.size(); i++) {
    Future<RecordMetadata> f = futureList.get(i);
    try {
      RecordMetadata recordMetadata = f.get();
      if (sendWriteResponse ) {
        Record record = context.createRecord("responseRecord");
        LinkedHashMap<String, Field> recordMetadataVal = new LinkedHashMap<>();
        recordMetadataVal.put("offset", Field.create(recordMetadata.offset()));
        recordMetadataVal.put("partition", Field.create(recordMetadata.partition()));
        recordMetadataVal.put("topic", Field.create(recordMetadata.topic()));
        record.set(Field.createListMap(recordMetadataVal));
        responseRecords.add(record);
      }
    } catch (InterruptedException | ExecutionException e) {
      Throwable actualCause = e.getCause();
      if (actualCause != null && actualCause instanceof RecordTooLargeException) {
        failedRecordIndices.add(i);
        failedRecordExceptions.add((Exception)actualCause);
      } else {
        throw createWriteException(e);
      }
    }
  }
  futureList.clear();
  if (!failedRecordIndices.isEmpty()) {
    throw new StageException(KafkaErrors.KAFKA_69, failedRecordIndices, failedRecordExceptions);
  }
  return responseRecords;
}
 
源代码26 项目: SO   文件: AGenericProducerHandler.java
public Future<RecordMetadata> send(T value) {
    try {
        ObjectMapper objectMapper = new ObjectMapper().configure(SerializationFeature.INDENT_OUTPUT, true).setSerializationInclusion(JsonInclude.Include.NON_NULL);
        return this.send(this.topic/*getTopic()*/, objectMapper.writeValueAsString(value));
    } catch (JsonProcessingException e) {
        e.printStackTrace();
    }
    return null;
}
 
源代码27 项目: brave   文件: KafkaTest.java
RecordMetadata createRecordMetadata() {
  TopicPartition tp = new TopicPartition("foo", 0);
  long timestamp = 2340234L;
  int keySize = 3;
  int valueSize = 5;
  Long checksum = 908923L;
  return new RecordMetadata(tp, -1L, -1L, timestamp, checksum, keySize, valueSize);
}
 
源代码28 项目: debezium-incubator   文件: KafkaRecordEmitter.java
private boolean flush(Map.Entry<Record, Future<RecordMetadata>> recordEntry) {
    try {
        recordEntry.getValue().get(); // wait
        if (++emitCount % 10_000 == 0) {
            LOGGER.info("Emitted {} records to Kafka Broker", emitCount);
            emitCount = 0;
        }
        return true;
    }
    catch (ExecutionException | InterruptedException e) {
        LOGGER.error("Failed to emit record {}", recordEntry.getKey(), e);
        return false;
    }
}
 
源代码29 项目: DBus   文件: ProgressBolt.java
private void sendFinishMsgToSpout(JSONObject reqJson) throws Exception {
    reqJson.put("type", FullPullConstants.COMMAND_FULL_PULL_FINISH_REQ);
    JSONObject wrapperJson = new JSONObject();
    wrapperJson.put(FullPullConstants.FULLPULL_REQ_PARAM, reqJson.toJSONString());
    ProducerRecord record = new ProducerRecord<>(dsName + "_callback", FullPullConstants.COMMAND_FULL_PULL_FINISH_REQ, wrapperJson.toString().getBytes());
    Future<RecordMetadata> future = byteProducer.send(record);
    logger.info("send full pull finish msg to pull spout offset is {}", future.get().offset());
}
 
源代码30 项目: SO   文件: VirtualObjectHandler.java
/**
 * publish a data.<BR/>
 * 큐로 전송
 * @param data data
 * @return result
 */
private Future<RecordMetadata> publishToMq(String data) {
    DefaultProducerHandler producerHandler = new DefaultProducerHandler(0, "devicecontrol");
    Future<RecordMetadata> result = producerHandler.send(data);
    producerHandler.close();
    return result;
}