下面列出了怎么用org.apache.kafka.common.errors.RecordTooLargeException的API类实例代码及写法,或者点击链接到github查看源代码。
private Callback callback(int superstep, K readOnlyKey, K vertex, List<Message> messages) {
return (metadata, error) -> {
if (error == null) {
try {
// Activate partition for next step
int p = vertexToPartition(vertex, serialized.keySerde().serializer(), numPartitions);
log.debug("Step {}, adding partition {} for vertex {}", superstep, p, vertex);
ZKUtils.addChild(curator, applicationId, new PregelState(State.RUNNING, superstep + 1, Stage.SEND), childPath(p));
Map<Integer, Long> endOffsets = lastWrittenOffsets.computeIfAbsent(superstep, k -> new ConcurrentHashMap<>());
endOffsets.merge(metadata.partition(), metadata.offset(), Math::max);
} catch (Exception e) {
throw toRuntimeException(e);
}
} else if (error instanceof RecordTooLargeException && messages.size() > 1) {
log.warn("Record too large, retrying with smaller messages");
for (Message message : messages) {
List<Message> singleton = Collections.singletonList(message);
Tuple3<Integer, K, List<Message>> tuple = new Tuple3<>(superstep + 1, readOnlyKey, singleton);
ProducerRecord<K, Tuple3<Integer, K, List<Message>>> record =
new ProducerRecord<>(workSetTopic, vertex, tuple);
producer.send(record, callback(superstep, readOnlyKey, vertex, singleton));
}
} else {
log.error("Failed to send record to {}: {}", workSetTopic, error);
}
};
}
@Test
public void testSynchronous_messageTooLarge() throws IOException {
kafkaAdminClient.createTopic(topic, 4, 1, new Properties());
Properties props = KafkaTests.getProps();
props.setProperty(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "10000");
props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "60000");
// create a payload that is too large
StringBuilder bldr = new StringBuilder();
for(int i = 0; i < 100000; i++) {
bldr.append(i);
}
List<ProducerRecord<String, String>> records = new ArrayList<>(2);
records.add(new ProducerRecord<>(topic, "key", bldr.toString()));
records.add(new ProducerRecord<>(topic, "key2", "small"));
boolean caughtRecordTooLargeException = false;
try (KafkaProducerWrapper<String, String> producer =
new KafkaProducerWrapper<>(new KafkaProducer<>(props))) {
producer.sendSynchronously(records);
} catch (KafkaExecutionException e) {
Throwable cause = e.getCause();
assertThat(cause, instanceOf(ExecutionException.class));
Throwable cause2 = cause.getCause();
assertThat(cause2, instanceOf(RecordTooLargeException.class));
caughtRecordTooLargeException = true;
}
assertThat(caughtRecordTooLargeException, is(true));
}
@Test
public void shouldFailWhenMessageIsBiggerThanMaxSize() throws Exception {
thrown.expect(RecordTooLargeException.class);
sender.close();
sender = sender.toBuilder().messageMaxBytes(1).build();
send(CLIENT_SPAN, CLIENT_SPAN).execute();
}
@Override
public List<Record> write(Stage.Context context) throws StageException {
// force all records in the buffer to be written out
producer.flush();
// make sure each record was written and handle exception if any
List<Integer> failedRecordIndices = new ArrayList<Integer>();
List<Exception> failedRecordExceptions = new ArrayList<Exception>();
List<Record> responseRecords = new ArrayList<>();
for (int i = 0; i < futureList.size(); i++) {
Future<RecordMetadata> f = futureList.get(i);
try {
RecordMetadata recordMetadata = f.get();
if (sendWriteResponse ) {
Record record = context.createRecord("responseRecord");
LinkedHashMap<String, Field> recordMetadataVal = new LinkedHashMap<>();
recordMetadataVal.put("offset", Field.create(recordMetadata.offset()));
recordMetadataVal.put("partition", Field.create(recordMetadata.partition()));
recordMetadataVal.put("topic", Field.create(recordMetadata.topic()));
record.set(Field.createListMap(recordMetadataVal));
responseRecords.add(record);
}
} catch (InterruptedException | ExecutionException e) {
Throwable actualCause = e.getCause();
if (actualCause != null && actualCause instanceof RecordTooLargeException) {
failedRecordIndices.add(i);
failedRecordExceptions.add((Exception)actualCause);
} else {
throw createWriteException(e);
}
}
}
futureList.clear();
if (!failedRecordIndices.isEmpty()) {
throw new StageException(KafkaErrors.KAFKA_69, failedRecordIndices, failedRecordExceptions);
}
return responseRecords;
}
@Test
public void testProducerLiveConfigReload() throws Exception {
String topic = "testProducerLiveConfigReload";
createTopic(topic, 1);
EmbeddableMario mario = new EmbeddableMario(null);
Random random = new Random();
// register kafka cluster to EmbeddableMario
KafkaClusterDescriptor kafkaClusterDescriptor = new KafkaClusterDescriptor(
null,
0,
"test",
"test",
"test",
zkConnect(),
bootstrapServers(),
"test",
0L
);
mario.addKafkaCluster(kafkaClusterDescriptor).get();
Properties extra = new Properties();
extra.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "" + 1500);
extra.setProperty(ProducerConfig.ACKS_CONFIG, "1");
extra.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
extra.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
Properties baseProducerConfig = getProducerProperties(extra);
LiKafkaInstrumentedProducerImpl<byte[], byte[]> producer = new LiKafkaInstrumentedProducerImpl<>(
baseProducerConfig,
Collections.emptyMap(),
(baseConfig, overrideConfig) -> new LiKafkaProducerImpl<>(LiKafkaClientsUtils.getConsolidatedProperties(baseConfig, overrideConfig)),
mario::getUrl);
byte[] key = new byte[500];
byte[] value = new byte[500];
random.nextBytes(key);
random.nextBytes(value);
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, 0, key, value);
RecordMetadata recordMetadata = producer.send(record).get();
Producer<byte[], byte[]> delegate = producer.getDelegate();
key = new byte[3000];
value = new byte[3000];
random.nextBytes(key);
random.nextBytes(value);
record = new ProducerRecord<>(topic, 0, key, value);
try {
producer.send(record).get();
Assert.fail("record expected to fail");
} catch (Exception e) {
Throwable root = Throwables.getRootCause(e);
Assert.assertTrue(root instanceof RecordTooLargeException, root.getClass() + " is not a RecordTooLargeException");
}
//install a new config policy, wait for the push
mario.setConfigPolicy(new ClientConfigRules(Collections.singletonList(
new ClientConfigRule(ClientPredicates.ALL, ImmutableMap.of("max.request.size", "" + 9000)))));
KafkaTestUtils.waitUntil("delegate recreated", () -> {
Producer<byte[], byte[]> delegateNow = producer.getDelegate();
return delegateNow != delegate;
}, 1, 2, TimeUnit.MINUTES, false);
producer.send(record).get(); //should succeed this time
producer.close(Duration.ofSeconds(30));
mario.close();
}
@Test
public void testCloseFromProduceCallbackOnSenderThread() throws Exception {
String topic = "testCloseFromProduceCallbackOnSenderThread";
createTopic(topic, 1);
Random random = new Random(666);
Properties extra = new Properties();
extra.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "" + 50000000); //~50MB (larger than broker-size setting)
extra.setProperty(ProducerConfig.ACKS_CONFIG, "-1");
extra.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
extra.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
Properties baseProducerConfig = getProducerProperties(extra);
LiKafkaInstrumentedProducerImpl<byte[], byte[]> producer = new LiKafkaInstrumentedProducerImpl<byte[], byte[]>(
baseProducerConfig,
Collections.emptyMap(),
(baseConfig, overrideConfig) -> new LiKafkaProducerImpl<byte[], byte[]>(LiKafkaClientsUtils.getConsolidatedProperties(baseConfig, overrideConfig)),
() -> "bogus",
10 //dont wait for a mario connection
);
byte[] key = new byte[3000];
byte[] value = new byte[49000000];
random.nextBytes(key);
random.nextBytes(value); //random data is incompressible, making sure our request is large
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, key, value);
AtomicReference<Throwable> issueRef = new AtomicReference<>();
Thread testThread = new Thread(new Runnable() {
@Override
public void run() {
try {
final Thread ourThread = Thread.currentThread();
Future<RecordMetadata> future = producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
//we expect a RecordTooLargeException. we also expect this to happen
//on the same thread.
if (Thread.currentThread() != ourThread) {
issueRef.compareAndSet(null,
new IllegalStateException("completion did not happen on caller thread by " + Thread.currentThread().getName())
);
}
producer.close(1, TimeUnit.SECONDS);
}
});
RecordMetadata recordMetadata = future.get(1, TimeUnit.MINUTES);
} catch (Throwable anything) {
issueRef.compareAndSet(null, anything);
}
}
}, "testCloseFromProduceCallbackOnSenderThread-thread");
testThread.setDaemon(true);
testThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
issueRef.compareAndSet(null, e);
}
});
testThread.start();
testThread.join(TimeUnit.MINUTES.toMillis(1));
Thread.State state = testThread.getState();
Assert.assertEquals(
state,
Thread.State.TERMINATED,
"thread was expected to finish, instead its " + state
);
Throwable issue = issueRef.get();
Throwable root = Throwables.getRootCause(issue);
Assert.assertTrue(root instanceof RecordTooLargeException, root.getMessage());
}