下面列出了org.apache.kafka.clients.producer.RecordMetadata#offset ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void run() {
try {
long nextIndex = _nextIndexPerPartition.get(_partition).get();
long currMs = System.currentTimeMillis();
String message = Utils.jsonFromFields(_topic, nextIndex, currMs, _producerId, _recordSize);
BaseProducerRecord record = new BaseProducerRecord(_topic, _partition, _key, message);
RecordMetadata metadata = _producer.send(record, _sync);
_sensors._produceDelay.record(System.currentTimeMillis() - currMs);
_sensors._recordsProduced.record();
_sensors._recordsProducedPerPartition.get(_partition).record();
_sensors._produceErrorInLastSendPerPartition.put(_partition, false);
if (nextIndex == -1 && _sync) {
nextIndex = metadata.offset();
} else {
nextIndex = nextIndex + 1;
}
_nextIndexPerPartition.get(_partition).set(nextIndex);
} catch (Exception e) {
_sensors._produceError.record();
_sensors._produceErrorPerPartition.get(_partition).record();
_sensors._produceErrorInLastSendPerPartition.put(_partition, true);
LOG.warn(_name + " failed to send message", e);
}
}
@Override
public void commitRecord(SourceRecord record, RecordMetadata metadata) {
TopicPartition topicPartition = new TopicPartition(metadata.topic(), metadata.partition());
long sourceOffset = (Long) record.sourceOffset().get(SOURCE_OFFSET_OFFSET);
long targetOffset = metadata.offset();
offsetSource.syncGroupForOffset(topicPartition, sourceOffset, targetOffset);
}
@Override
public KafkaSendResult get() throws InterruptedException, ExecutionException {
RecordMetadata recordMetadata = this.m_recordMetadata.get();
KafkaSendResult sendResult = new KafkaSendResult(recordMetadata.topic(), recordMetadata.partition(),
recordMetadata.offset());
return sendResult;
}
@Override
public KafkaSendResult get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
RecordMetadata recordMetadata = this.m_recordMetadata.get(timeout, unit);
KafkaSendResult sendResult = new KafkaSendResult(recordMetadata.topic(), recordMetadata.partition(),
recordMetadata.offset());
return sendResult;
}
public long send(String topic, ControlMessage msg) throws Exception {
String key = msg.getType();
String jsonMessage = msg.toJSONString();
byte[] message = jsonMessage.getBytes();
try {
Future<RecordMetadata> result = producer.send(new ProducerRecord<>(topic, key, message), null);
RecordMetadata recordMetadata = result.get();
return recordMetadata.offset();
} finally {
producer.close();
}
}
/**
* Utility factory.
* @param recordMetadata Metadata about the produced record.
* @param producerRecord The original record that was produced.
* @param <K> Type of key
* @param <V> Type of message
* @return A ProducedKafkaRecord that represents metadata about the original record, and the results of it being published.
*/
static <K,V> ProducedKafkaRecord<K,V> newInstance(
final RecordMetadata recordMetadata,
final ProducerRecord<K,V> producerRecord) {
return new ProducedKafkaRecord<>(
recordMetadata.topic(),
recordMetadata.partition(),
recordMetadata.offset(),
producerRecord.key(),
producerRecord.value()
);
}
@Override
public AnswerItem<AppService> produceEvent(String topic, String key, String eventMessage,
String bootstrapServers,
List<AppServiceHeader> serviceHeader) throws InterruptedException, ExecutionException {
MessageEvent message = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_PRODUCEKAFKA);;
AnswerItem<AppService> result = new AnswerItem<>();
AppService serviceREST = factoryAppService.create("", AppService.TYPE_KAFKA, AppService.METHOD_KAFKAPRODUCE, "", "", "", "", "", "", "", "", "", "", "",
"", null, "", null, null);
Properties props = new Properties();
serviceHeader.add(factoryAppServiceHeader.create(null, "bootstrap.servers", bootstrapServers, "Y", 0, "", "", null, "", null));
serviceHeader.add(factoryAppServiceHeader.create(null, "enable.idempotence", "true", "Y", 0, "", "", null, "", null));
serviceHeader.add(factoryAppServiceHeader.create(null, "key.serializer", "org.apache.kafka.common.serialization.StringSerializer", "Y", 0, "", "", null, "", null));
serviceHeader.add(factoryAppServiceHeader.create(null, "value.serializer", "org.apache.kafka.common.serialization.StringSerializer", "Y", 0, "", "", null, "", null));
for (AppServiceHeader object : serviceHeader) {
if (StringUtil.parseBoolean(object.getActive())) {
props.put(object.getKey(), object.getValue());
}
}
serviceREST.setServicePath(bootstrapServers);
serviceREST.setKafkaTopic(topic);
serviceREST.setKafkaKey(key);
serviceREST.setServiceRequest(eventMessage);
serviceREST.setHeaderList(serviceHeader);
LOG.info("Open Producer : " + getKafkaConsumerKey(topic, bootstrapServers));
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
int partition = -1;
long offset = -1;
try {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, eventMessage);
LOG.debug("Producing Kafka message - topic : " + topic + " key : " + key + " message : " + eventMessage);
RecordMetadata metadata = producer.send(record).get(); //Wait for a responses
partition = metadata.partition();
offset = metadata.offset();
LOG.debug("Produced Kafka message - topic : " + topic + " key : " + key + " partition : " + partition + " offset : " + offset);
message = new MessageEvent(MessageEventEnum.ACTION_SUCCESS_CALLSERVICE_PRODUCEKAFKA);
} catch (Exception ex) {
message = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_PRODUCEKAFKA);
message.setDescription(message.getDescription().replace("%EX%", ex.toString()));
LOG.debug(ex, ex);
} finally {
if (producer != null) {
producer.flush();
producer.close();
LOG.info("Closed Producer : " + getKafkaConsumerKey(topic, bootstrapServers));
} else {
LOG.info("Producer not opened : " + getKafkaConsumerKey(topic, bootstrapServers));
}
}
serviceREST.setKafkaResponseOffset(offset);
serviceREST.setKafkaResponsePartition(partition);
serviceREST.setResponseHTTPBodyContentType(appServiceService.guessContentType(serviceREST, AppService.RESPONSEHTTPBODYCONTENTTYPE_JSON));
result.setItem(serviceREST);
message.setDescription(message.getDescription().replace("%SERVICEMETHOD%", AppService.METHOD_KAFKAPRODUCE));
message.setDescription(message.getDescription().replace("%TOPIC%", topic));
message.setDescription(message.getDescription().replace("%PART%", String.valueOf(partition)));
message.setDescription(message.getDescription().replace("%OFFSET%", String.valueOf(offset)));
result.setResultMessage(message);
return result;
}