org.apache.kafka.clients.producer.RecordMetadata#offset ( )源码实例Demo

下面列出了org.apache.kafka.clients.producer.RecordMetadata#offset ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: kafka-monitor   文件: ProduceService.java
public void run() {
  try {
    long nextIndex = _nextIndexPerPartition.get(_partition).get();
    long currMs = System.currentTimeMillis();
    String message = Utils.jsonFromFields(_topic, nextIndex, currMs, _producerId, _recordSize);
    BaseProducerRecord record = new BaseProducerRecord(_topic, _partition, _key, message);
    RecordMetadata metadata = _producer.send(record, _sync);
    _sensors._produceDelay.record(System.currentTimeMillis() - currMs);
    _sensors._recordsProduced.record();
    _sensors._recordsProducedPerPartition.get(_partition).record();
    _sensors._produceErrorInLastSendPerPartition.put(_partition, false);
    if (nextIndex == -1 && _sync) {
      nextIndex = metadata.offset();
    } else {
      nextIndex = nextIndex + 1;
    }
    _nextIndexPerPartition.get(_partition).set(nextIndex);
  } catch (Exception e) {
    _sensors._produceError.record();
    _sensors._produceErrorPerPartition.get(_partition).record();
    _sensors._produceErrorInLastSendPerPartition.put(_partition, true);
    LOG.warn(_name + " failed to send message", e);
  }
}
 
源代码2 项目: kafka-backup   文件: BackupSourceTask.java
@Override
public void commitRecord(SourceRecord record, RecordMetadata metadata) {
    TopicPartition topicPartition = new TopicPartition(metadata.topic(), metadata.partition());
    long sourceOffset = (Long) record.sourceOffset().get(SOURCE_OFFSET_OFFSET);
    long targetOffset = metadata.offset();
    offsetSource.syncGroupForOffset(topicPartition, sourceOffset, targetOffset);
}
 
源代码3 项目: hermes   文件: KafkaFuture.java
@Override
public KafkaSendResult get() throws InterruptedException, ExecutionException {
	RecordMetadata recordMetadata = this.m_recordMetadata.get();
	KafkaSendResult sendResult = new KafkaSendResult(recordMetadata.topic(), recordMetadata.partition(),
	      recordMetadata.offset());
	return sendResult;
}
 
源代码4 项目: hermes   文件: KafkaFuture.java
@Override
public KafkaSendResult get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
	RecordMetadata recordMetadata = this.m_recordMetadata.get(timeout, unit);
	KafkaSendResult sendResult = new KafkaSendResult(recordMetadata.topic(), recordMetadata.partition(),
	      recordMetadata.offset());
	return sendResult;
}
 
源代码5 项目: DBus   文件: ControlMessageSender.java
public long send(String topic, ControlMessage msg) throws Exception {
    String key = msg.getType();
    String jsonMessage = msg.toJSONString();
    byte[] message = jsonMessage.getBytes();

    try {
        Future<RecordMetadata> result = producer.send(new ProducerRecord<>(topic, key, message), null);
        RecordMetadata recordMetadata = result.get();
        return recordMetadata.offset();
    } finally {
        producer.close();
    }
}
 
源代码6 项目: kafka-junit   文件: ProducedKafkaRecord.java
/**
 * Utility factory.
 * @param recordMetadata Metadata about the produced record.
 * @param producerRecord The original record that was produced.
 * @param <K> Type of key
 * @param <V> Type of message
 * @return A ProducedKafkaRecord that represents metadata about the original record, and the results of it being published.
 */
static <K,V> ProducedKafkaRecord<K,V> newInstance(
    final RecordMetadata recordMetadata,
    final ProducerRecord<K,V> producerRecord) {
    return new ProducedKafkaRecord<>(
        recordMetadata.topic(),
        recordMetadata.partition(),
        recordMetadata.offset(),
        producerRecord.key(),
        producerRecord.value()
    );
}
 
源代码7 项目: cerberus-source   文件: KafkaService.java
@Override
public AnswerItem<AppService> produceEvent(String topic, String key, String eventMessage,
        String bootstrapServers,
        List<AppServiceHeader> serviceHeader) throws InterruptedException, ExecutionException {

    MessageEvent message = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_PRODUCEKAFKA);;
    AnswerItem<AppService> result = new AnswerItem<>();
    AppService serviceREST = factoryAppService.create("", AppService.TYPE_KAFKA, AppService.METHOD_KAFKAPRODUCE, "", "", "", "", "", "", "", "", "", "", "",
            "", null, "", null, null);

    Properties props = new Properties();
    serviceHeader.add(factoryAppServiceHeader.create(null, "bootstrap.servers", bootstrapServers, "Y", 0, "", "", null, "", null));
    serviceHeader.add(factoryAppServiceHeader.create(null, "enable.idempotence", "true", "Y", 0, "", "", null, "", null));
    serviceHeader.add(factoryAppServiceHeader.create(null, "key.serializer", "org.apache.kafka.common.serialization.StringSerializer", "Y", 0, "", "", null, "", null));
    serviceHeader.add(factoryAppServiceHeader.create(null, "value.serializer", "org.apache.kafka.common.serialization.StringSerializer", "Y", 0, "", "", null, "", null));

    for (AppServiceHeader object : serviceHeader) {
        if (StringUtil.parseBoolean(object.getActive())) {
            props.put(object.getKey(), object.getValue());
        }
    }

    serviceREST.setServicePath(bootstrapServers);
    serviceREST.setKafkaTopic(topic);
    serviceREST.setKafkaKey(key);
    serviceREST.setServiceRequest(eventMessage);
    serviceREST.setHeaderList(serviceHeader);

    LOG.info("Open Producer : " + getKafkaConsumerKey(topic, bootstrapServers));
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    int partition = -1;
    long offset = -1;
    try {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, eventMessage);
        LOG.debug("Producing Kafka message - topic : " + topic + " key : " + key + " message : " + eventMessage);
        RecordMetadata metadata = producer.send(record).get(); //Wait for a responses
        partition = metadata.partition();
        offset = metadata.offset();
        LOG.debug("Produced Kafka message - topic : " + topic + " key : " + key + " partition : " + partition + " offset : " + offset);
        message = new MessageEvent(MessageEventEnum.ACTION_SUCCESS_CALLSERVICE_PRODUCEKAFKA);
    } catch (Exception ex) {
        message = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_PRODUCEKAFKA);
        message.setDescription(message.getDescription().replace("%EX%", ex.toString()));
        LOG.debug(ex, ex);
    } finally {
        if (producer != null) {
            producer.flush();
            producer.close();
            LOG.info("Closed Producer : " + getKafkaConsumerKey(topic, bootstrapServers));
        } else {
            LOG.info("Producer not opened : " + getKafkaConsumerKey(topic, bootstrapServers));
        }
    }

    serviceREST.setKafkaResponseOffset(offset);
    serviceREST.setKafkaResponsePartition(partition);

    serviceREST.setResponseHTTPBodyContentType(appServiceService.guessContentType(serviceREST, AppService.RESPONSEHTTPBODYCONTENTTYPE_JSON));

    result.setItem(serviceREST);
    message.setDescription(message.getDescription().replace("%SERVICEMETHOD%", AppService.METHOD_KAFKAPRODUCE));
    message.setDescription(message.getDescription().replace("%TOPIC%", topic));
    message.setDescription(message.getDescription().replace("%PART%", String.valueOf(partition)));
    message.setDescription(message.getDescription().replace("%OFFSET%", String.valueOf(offset)));
    result.setResultMessage(message);
    return result;
}