下面列出了org.apache.kafka.clients.producer.BufferExhaustedException#org.apache.kafka.clients.producer.RecordMetadata 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public CompletableFuture<RecordMetadata> apply(ProducerRecord<K, V> record) {
CompletableFuture<RecordMetadata> result = null;
try {
KafkaProducer<K, V> producer = getProducer();
result = new CFC(producer);
producer.send(record, (CFC) result);
} catch (Exception e) {
if (result != null) {
((CFC) result).onCompletion(null, e);
} else {
result = new CompletableFuture<>();
result.completeExceptionally(e);
}
}
return result;
}
/***
* send stat info to statistic topic, do not care about success or not.
* @param message
*/
private void sendTableStatInfo(StatMessage message) {
String key = String.format("%s.%s.%s.%s.%s", message.getDsName(), message.getSchemaName(), message.getTableName(),
message.getType(), message.getTxTimeMS());
String value = message.toJSONString();
Callback callback = new Callback() {
@Override
public void onCompletion(RecordMetadata ignored, Exception e) {
if (e != null) {
logger.error(String.format("Send statistic FAIL: toTopic=%s, key=%s", statTopic, key));
} else {
logger.info(String.format(" Send statistic successful: toTopic=%s, key=(%s)", statTopic, key));
}
}
};
Future<RecordMetadata> result = producer.send(new ProducerRecord<>(statTopic, key, value), callback);
}
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
try {
if (e == null) {
OpenTsdbMetricConverter
.incr(LoggingAuditClientMetrics.AUDIT_CLIENT_SENDER_KAFKA_EVENTS_ACKED, 1,
"host=" + host, "stage=" + stage.toString(),
"logName=" + event.getLoggingAuditHeaders().getLogName());
// if send is successful, remove the event from the map eventTriedCount if it was added
// LoggingAuditHeaders can uniquely identify an event.
eventTriedCount.remove(event.getLoggingAuditHeaders());
// if send out successfully, remove the partition from the badPartitions if it was added.
badPartitions.remove(recordMetadata.partition());
} else {
checkAndEnqueueWhenSendFailed();
}
} catch (Throwable t) {
LOG.warn("Exception throws in the callback. Drop this event {}", event, t);
OpenTsdbMetricConverter
.incr(LoggingAuditClientMetrics.AUDIT_CLIENT_SENDER_KAFKA_CALLBACK_EXCEPTION, 1,
"host=" + host, "stage=" + stage.toString(), "topic=" + topic);
}
}
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback,
SpanContext parent) {
/*
// Create wrappedRecord because headers can be read only in record (if record is sent second time)
ProducerRecord<K, V> wrappedRecord = new ProducerRecord<>(record.topic(),
record.partition(),
record.timestamp(),
record.key(),
record.value(),
record.headers());
*/
Span span = TracingKafkaUtils
.buildAndInjectSpan(record, tracer, producerSpanNameProvider, parent, spanDecorators);
try (Scope ignored = tracer.activateSpan(span)) {
Callback wrappedCallback = new TracingCallback(callback, span, tracer, spanDecorators);
return producer.send(record, wrappedCallback);
}
}
private ContextModelForMQ processContextModel(ContextModelForIf2 contextModelForIf, HttpServletRequest request) {
log.debug("input:ContextModelForIf: {}", contextModelForIf);
// create a message From ContextModelForMQ for messageQueue, publish to message queue
// ContextModelForIf --> ContextModelForMQ
ContextModelForMQ contextModelForMQ = ContextModelMapper2.toContextModelForMQ(contextModelForIf);
// tracking
TrackingEntity trackingEntity = (TrackingEntity) request.getSession().getAttribute("tracking");
trackingEntity.setSimulatorType(contextModelForIf.getSimulatorType()); // simulator type 지정
contextModelForMQ.setTrackingEntity(trackingEntity);
contextModelForMQ.addState(Const.CONTEXTMODEL_ID, contextModelForIf.getContextId());
contextModelForMQ.addState(Const.RESULT_CM_VALUE, contextModelForIf.getResultCmValue());
log.debug("converted:ContextModelForMQ: {}", contextModelForMQ);
//object to json
String contextModelForMqString = ContextModelMapper2.writeJsonString(contextModelForMQ);
log.debug("generated:ContextModelForMQ {}", contextModelForMqString);
//context model producer handler
DefaultProducerHandler producerHandler = new DefaultProducerHandler(0, "contextmodel");
Future<RecordMetadata> future = producerHandler.send(contextModelForMQ);
producerHandler.close();
log.debug("producer.send result: {}", future);
return contextModelForMQ;
}
/**
* Send a CruiseControlMetric to the Kafka topic.
* @param ccm the Cruise Control metric to send.
*/
public void sendCruiseControlMetric(CruiseControlMetric ccm) {
// Use topic name as key if existing so that the same sampler will be able to collect all the information
// of a topic.
String key = ccm.metricClassId() == CruiseControlMetric.MetricClassId.TOPIC_METRIC ?
((TopicMetric) ccm).topic() : Integer.toString(ccm.brokerId());
ProducerRecord<String, CruiseControlMetric> producerRecord =
new ProducerRecord<>(_cruiseControlMetricsTopic, null, ccm.time(), key, ccm);
LOG.debug("Sending Cruise Control metric {}.", ccm);
_producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
LOG.warn("Failed to send Cruise Control metric {}", ccm);
_numMetricSendFailure++;
}
}
});
}
/**
* Exports BindingSets to Kafka. The BindingSet and topic are extracted from
* the indicated BindingSetRecord and the BindingSet is then exported to the topic.
*/
@Override
public void exportNotification(final BindingSetRecord record) throws BindingSetRecordExportException {
try {
log.info("Exporting {} records to Kafka to topic: {}", record.getBindingSet().size(), record.getTopic());
final String bindingName = IncrementalUpdateConstants.PERIODIC_BIN_ID;
final BindingSet bindingSet = record.getBindingSet();
final String topic = record.getTopic();
final long binId = ((Literal) bindingSet.getValue(bindingName)).longValue();
final Future<RecordMetadata> future = producer
.send(new ProducerRecord<String, BindingSet>(topic, Long.toString(binId), bindingSet));
//wait for confirmation that results have been received
future.get(5, TimeUnit.SECONDS);
} catch (final Exception e) { // catch all possible exceptional behavior and throw as our checked exception.
throw new BindingSetRecordExportException(e.getMessage(), e);
}
}
public void rerunTopology(String topologyCode, String ctrlMsg) {
KafkaProducer<String, byte[]> producer = null;
try {
String topic = StringUtils.joinWith("_", topologyCode, "ctrl");
Properties props = zkService.getProperties(KeeperConstants.KEEPER_CTLMSG_PRODUCER_CONF);
Properties globalConf = zkService.getProperties(KeeperConstants.GLOBAL_CONF);
props.setProperty(GLOBAL_CONF_KEY_BOOTSTRAP_SERVERS, globalConf.getProperty(GLOBAL_CONF_KEY_BOOTSTRAP_SERVERS));
if (StringUtils.equals(SecurityConfProvider.getSecurityConf(zkService), Constants.SECURITY_CONFIG_TRUE_VALUE)) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
}
producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, byte[]>(topic, ctrlMsg.getBytes()), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
}
});
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (producer != null) producer.close();
}
}
public void sendMaasAppenderMessage(MaasAppenderMessage maasAppenderMessage) {
ControlMessage message = new ControlMessage(System.currentTimeMillis(), ControlType.G_MAAS_APPENDER_EVENT.toString(), "dbus-appender");
message.setPayload(JSONObject.parseObject(maasAppenderMessage.toString()));
String topic = PropertiesHolder.getProperties(Constants.Properties.CONFIGURE, Constants.ConfigureKey.GLOBAL_EVENT_TOPIC);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, message.getType(), message.toJSONString());
Future<RecordMetadata> future = producer.send(record, (metadata, exception) -> {
if (exception != null) {
logger.error("Send global event error.{}", exception.getMessage());
}
});
try {
future.get(10000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
@Override
public WriteResponse wrap(final RecordMetadata recordMetadata) {
return new WriteResponse<RecordMetadata>() {
@Override
public RecordMetadata getRawResponse() {
return recordMetadata;
}
@Override
public String getStringResponse() {
return recordMetadata.toString();
}
@Override
public long bytesWritten() {
// Don't know how many bytes were written
return -1;
}
};
}
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (metadata != null) {
if (!output.isKafkaBrokerUp) {
logger.info("Started writing to kafka. " + output.getShortDescription());
output.isKafkaBrokerUp = true;
}
output.incrementStat(1);
output.writeBytesMetric.value += message.length();
} else {
output.isKafkaBrokerUp = false;
String logKeyMessage = this.getClass().getSimpleName() + "_KAFKA_ASYNC_ERROR";
LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "Error sending message to Kafka. Async Callback", exception, logger,
Level.ERROR);
output.failedMessages.add(this);
}
}
@Test
public void testAuthorizedWrite() throws Exception {
// Create the Producer
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:" + port);
producerProps.put("acks", "all");
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
producerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, serviceKeystorePath);
producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "sspass");
producerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "skpass");
producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
final Producer<String, String> producer = new KafkaProducer<>(producerProps);
// Send a message
Future<RecordMetadata> record =
producer.send(new ProducerRecord<String, String>("dev", "somekey", "somevalue"));
producer.flush();
record.get();
producer.close();
}
@Override
public void process(WorkerRecord<String, String> record, RecordStatusObserver observer) {
logger.info("process(partition: {}, timestamp: {})", record.partition(), record.timestamp());
Future<RecordMetadata> future = taskProducer.send(new ProducerRecord<>(
OUTPUT_TOPIC,
record.partition(),
record.timestamp(),
record.key(),
record.value()));
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
observer.onFailure(new ProcessingFailureException("could not send " + record, e));
}
observer.onSuccess();
}
@Test
public void correlatorSuccess() {
var correlator = mock(EventCorrelator.class);
var correlationStrategy = new CorrelationStrategyImpl(correlator);
var underTest = new KafkaEventSender(config, correlationStrategy, converter, producer);
when(correlator.register(any())).thenReturn("correlationId");
var result = underTest.send(event);
verify(correlator).register(result);
verify(producer).send(recordCaptor.capture(), callbackCaptor.capture());
var record = recordCaptor.getValue();
assertThat(record.topic(), is("topic"));
assertThat(record.key(), is(avroKey));
assertThat(record.value(), is(avroValue));
assertThat(record.headers().toArray().length, is(1));
var callback = callbackCaptor.getValue();
assertThat(result.isDone(), is(false));
var recordMetadata = mock(RecordMetadata.class);
callback.onCompletion(recordMetadata, null);
assertThat(result.isDone(), is(false));
}
/**
* Topic topicName will be automatically created if it doesn't exist.
* @param topicName
* @param recordsToPublish
* @param timestamp
* @return
* @throws InterruptedException
* @throws TimeoutException
* @throws ExecutionException
*/
public Map<String, RecordMetadata> produceData(String topicName,
Map<String, GenericRow> recordsToPublish,
Serializer<GenericRow> serializer,
Long timestamp)
throws InterruptedException, TimeoutException, ExecutionException {
createTopic(topicName);
Properties producerConfig = properties();
KafkaProducer<String, GenericRow> producer =
new KafkaProducer<>(producerConfig, new StringSerializer(), serializer);
Map<String, RecordMetadata> result = new HashMap<>();
for (Map.Entry<String, GenericRow> recordEntry : recordsToPublish.entrySet()) {
String key = recordEntry.getKey();
Future<RecordMetadata> recordMetadataFuture
= producer.send(buildRecord(topicName, timestamp, recordEntry, key));
result.put(key,
recordMetadataFuture.get(TEST_RECORD_FUTURE_TIMEOUT_MS, TimeUnit.MILLISECONDS));
}
producer.close();
return result;
}
protected void sendTuple(T tuple)
{
if (alreadyInKafka(tuple)) {
return;
}
getProducer().send(new ProducerRecord<>(getTopic(), key, tuple), new Callback()
{
public void onCompletion(RecordMetadata metadata, Exception e)
{
if (e != null) {
logger.info("Wrting to Kafka failed with an exception {}" + e.getMessage());
throw new RuntimeException(e);
}
}
});
}
/**
* Topic topicName will be automatically created if it doesn't exist.
* @param topicName
* @param recordsToPublish
* @param schema
* @return
* @throws InterruptedException
* @throws TimeoutException
* @throws ExecutionException
*/
public Map<String, RecordMetadata> produceInputData(String topicName, Map<String, GenericRow> recordsToPublish, Schema schema)
throws InterruptedException, TimeoutException, ExecutionException {
KafkaProducer<String, GenericRow> producer =
new KafkaProducer<>(producerConfig, new StringSerializer(), new KsqlJsonSerializer(schema));
Map<String, RecordMetadata> result = new HashMap<>();
for (Map.Entry<String, GenericRow> recordEntry : recordsToPublish.entrySet()) {
String key = recordEntry.getKey();
ProducerRecord<String, GenericRow> producerRecord = new ProducerRecord<>(topicName, key, recordEntry.getValue());
Future<RecordMetadata> recordMetadataFuture = producer.send(producerRecord);
result.put(key, recordMetadataFuture.get(TEST_RECORD_FUTURE_TIMEOUT_MS, TimeUnit.MILLISECONDS));
}
producer.close();
return result;
}
public void run() {
int count = 0;
long startTime = System.currentTimeMillis();
DataSerializer<Counter> counterSerializer = new DataSerializer();
while (true) {
Counter counter = new Counter(count);
byte [] aux= counterSerializer.serializeMessage(counter,"avsc/Counter.avsc");
Future<RecordMetadata> send =
producer.send(new ProducerRecord<Integer, byte[]>
(topic, count, aux));
System.out.println("Kafka enviado : "+count);
count++;
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void reconsumeLater(ConsumerRecord<String, byte[]> consumeRecord) throws InterruptedException, ExecutionException {
// add all header to headList except RETRY_COUNT
Headers headers = consumeRecord.headers();
List<Header> headerList = new ArrayList<Header>(8);
Iterator<Header> iterator = headers.iterator();
Integer retryCount = -1;
boolean hasOrignalHeader = false;
while (iterator.hasNext()) {
Header next = iterator.next();
if (next.key().equals(RETRY_COUNT_KEY)) {
retryCount = serializer.deserialize(next.value());
continue;
}
if(next.key().equals(ORGINAL_TOPIC)){
hasOrignalHeader = true;
}
headerList.add(next);
}
// add RETRY_COUNT to header
retryCount++;
headerList.add(new RecordHeader(RETRY_COUNT_KEY, serializer.serialization(retryCount)));
if(!hasOrignalHeader){
headerList.add(new RecordHeader(ORGINAL_TOPIC, serializer.serialization(consumeRecord.topic())));
}
// send message to corresponding queue according to retry times
String retryTopic = calcRetryTopic(consumeRecord.topic(), retryCount);
ProducerRecord<String, byte[]> record = new ProducerRecord<>(retryTopic,
consumeRecord.partition() % retryQueuePartitionCount.get(retryTopic), null, consumeRecord.key(),
consumeRecord.value(), headerList);
Future<RecordMetadata> publishKafkaMessage = retryQueueMsgProducer.publishKafkaMessage(record);
publishKafkaMessage.get();
}
@Test
public void sendFailsReturnsFalse() {
KafkaProducer producer = mock(KafkaProducer.class);
publisher.realProducer = producer;
RecordMetadata metadata = new RecordMetadata(null, 0, 0,
0, Long.valueOf(0), 0, 0);
ArgumentCaptor<Callback> captor = ArgumentCaptor.forClass(Callback.class);
when(producer.send(any(), captor.capture())).then(
invocation -> {
captor.getValue().onCompletion(metadata, new TimeoutException("error"));
return new CompletableFuture();
});
String[] events = { "test" };
assertThat(publisher.publishEvents(false, null, events)).isFalse();
}
private CompletableFuture<RecordMetadata> makeSnapshot(long timestamp) {
return snapshotProducer.apply(new ProducerRecord<>(
snapshotTopic,
timestamp,
new StorageSnapshot(storage, global, artifactRules, globalRules, offset)
)).whenComplete((recordMeta, exception) -> {
if (exception != null) {
log.warn("Exception dumping automatic snapshot: ", exception);
} else {
log.info("Dumped automatic snapshot to {} ({} bytes)", recordMeta, recordMeta.serializedValueSize());
}
});
}
@Override
protected Future<Boolean> sendEncodedEvent(Event<byte[], byte[]> event, SchemaVersion schemaVersion)
throws InvalidKeyException {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(getRoad().getTopicName(), event.getKey(),
event.getMessage());
Future<RecordMetadata> future = kafkaProducer.send(record, this::updateMetrics);
return Futures.lazyTransform(future, metadata -> true);
}
private void updateMetrics(RecordMetadata metadata, Exception e) {
if (e == null) {
metrics.markSuccessMetrics(getRoad().getName(), metadata.serializedValueSize());
} else {
metrics.markFailureMetrics(getRoad().getName());
}
}
@Test public void on_completion_should_forward_then_finish_span() {
Span span = tracing.tracer().nextSpan().start();
Callback delegate = mock(Callback.class);
Callback tracingCallback = TracingCallback.create(delegate, span, currentTraceContext);
RecordMetadata md = createRecordMetadata();
tracingCallback.onCompletion(md, null);
verify(delegate).onCompletion(md, null);
assertThat(spans.get(0).finishTimestamp()).isNotZero();
}
@Override
public List<Record> write(Stage.Context context) throws StageException {
// force all records in the buffer to be written out
producer.flush();
// make sure each record was written and handle exception if any
List<Integer> failedRecordIndices = new ArrayList<Integer>();
List<Exception> failedRecordExceptions = new ArrayList<Exception>();
List<Record> responseRecords = new ArrayList<>();
for (int i = 0; i < futureList.size(); i++) {
Future<RecordMetadata> f = futureList.get(i);
try {
RecordMetadata recordMetadata = f.get();
if (sendWriteResponse ) {
Record record = context.createRecord("responseRecord");
LinkedHashMap<String, Field> recordMetadataVal = new LinkedHashMap<>();
recordMetadataVal.put("offset", Field.create(recordMetadata.offset()));
recordMetadataVal.put("partition", Field.create(recordMetadata.partition()));
recordMetadataVal.put("topic", Field.create(recordMetadata.topic()));
record.set(Field.createListMap(recordMetadataVal));
responseRecords.add(record);
}
} catch (InterruptedException | ExecutionException e) {
Throwable actualCause = e.getCause();
if (actualCause != null && actualCause instanceof RecordTooLargeException) {
failedRecordIndices.add(i);
failedRecordExceptions.add((Exception)actualCause);
} else {
throw createWriteException(e);
}
}
}
futureList.clear();
if (!failedRecordIndices.isEmpty()) {
throw new StageException(KafkaErrors.KAFKA_69, failedRecordIndices, failedRecordExceptions);
}
return responseRecords;
}
public Future<RecordMetadata> send(T value) {
try {
ObjectMapper objectMapper = new ObjectMapper().configure(SerializationFeature.INDENT_OUTPUT, true).setSerializationInclusion(JsonInclude.Include.NON_NULL);
return this.send(this.topic/*getTopic()*/, objectMapper.writeValueAsString(value));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return null;
}
RecordMetadata createRecordMetadata() {
TopicPartition tp = new TopicPartition("foo", 0);
long timestamp = 2340234L;
int keySize = 3;
int valueSize = 5;
Long checksum = 908923L;
return new RecordMetadata(tp, -1L, -1L, timestamp, checksum, keySize, valueSize);
}
private boolean flush(Map.Entry<Record, Future<RecordMetadata>> recordEntry) {
try {
recordEntry.getValue().get(); // wait
if (++emitCount % 10_000 == 0) {
LOGGER.info("Emitted {} records to Kafka Broker", emitCount);
emitCount = 0;
}
return true;
}
catch (ExecutionException | InterruptedException e) {
LOGGER.error("Failed to emit record {}", recordEntry.getKey(), e);
return false;
}
}
private void sendFinishMsgToSpout(JSONObject reqJson) throws Exception {
reqJson.put("type", FullPullConstants.COMMAND_FULL_PULL_FINISH_REQ);
JSONObject wrapperJson = new JSONObject();
wrapperJson.put(FullPullConstants.FULLPULL_REQ_PARAM, reqJson.toJSONString());
ProducerRecord record = new ProducerRecord<>(dsName + "_callback", FullPullConstants.COMMAND_FULL_PULL_FINISH_REQ, wrapperJson.toString().getBytes());
Future<RecordMetadata> future = byteProducer.send(record);
logger.info("send full pull finish msg to pull spout offset is {}", future.get().offset());
}
/**
* publish a data.<BR/>
* 큐로 전송
* @param data data
* @return result
*/
private Future<RecordMetadata> publishToMq(String data) {
DefaultProducerHandler producerHandler = new DefaultProducerHandler(0, "devicecontrol");
Future<RecordMetadata> result = producerHandler.send(data);
producerHandler.close();
return result;
}