类org.apache.kafka.common.errors.RecordTooLargeException源码实例Demo

下面列出了怎么用org.apache.kafka.common.errors.RecordTooLargeException的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: kafka-graphs   文件: PregelComputation.java
private Callback callback(int superstep, K readOnlyKey, K vertex, List<Message> messages) {
    return (metadata, error) -> {
        if (error == null) {
            try {
                // Activate partition for next step
                int p = vertexToPartition(vertex, serialized.keySerde().serializer(), numPartitions);
                log.debug("Step {}, adding partition {} for vertex {}", superstep, p, vertex);
                ZKUtils.addChild(curator, applicationId, new PregelState(State.RUNNING, superstep + 1, Stage.SEND), childPath(p));

                Map<Integer, Long> endOffsets = lastWrittenOffsets.computeIfAbsent(superstep, k -> new ConcurrentHashMap<>());
                endOffsets.merge(metadata.partition(), metadata.offset(), Math::max);
            } catch (Exception e) {
                throw toRuntimeException(e);
            }
        } else if (error instanceof RecordTooLargeException && messages.size() > 1) {
            log.warn("Record too large, retrying with smaller messages");
            for (Message message : messages) {
                List<Message> singleton = Collections.singletonList(message);
                Tuple3<Integer, K, List<Message>> tuple = new Tuple3<>(superstep + 1, readOnlyKey, singleton);
                ProducerRecord<K, Tuple3<Integer, K, List<Message>>> record =
                    new ProducerRecord<>(workSetTopic, vertex, tuple);
                producer.send(record, callback(superstep, readOnlyKey, vertex, singleton));
            }
        } else {
            log.error("Failed to send record to {}: {}", workSetTopic, error);
        }
    };
}
 
源代码2 项目: common-kafka   文件: KafkaProducerWrapperTest.java
@Test
public void testSynchronous_messageTooLarge() throws IOException {
    kafkaAdminClient.createTopic(topic, 4, 1, new Properties());

    Properties props = KafkaTests.getProps();
    props.setProperty(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "10000");
    props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "60000");

    // create a payload that is too large
    StringBuilder bldr = new StringBuilder();
    for(int i = 0; i < 100000; i++) {
        bldr.append(i);
    }

    List<ProducerRecord<String, String>> records = new ArrayList<>(2);
    records.add(new ProducerRecord<>(topic, "key", bldr.toString()));
    records.add(new ProducerRecord<>(topic, "key2", "small"));
    boolean caughtRecordTooLargeException = false;
    try (KafkaProducerWrapper<String, String> producer =
                 new KafkaProducerWrapper<>(new KafkaProducer<>(props))) {
        producer.sendSynchronously(records);
    } catch (KafkaExecutionException e) {
        Throwable cause = e.getCause();

        assertThat(cause, instanceOf(ExecutionException.class));

        Throwable cause2 = cause.getCause();

        assertThat(cause2, instanceOf(RecordTooLargeException.class));

        caughtRecordTooLargeException = true;
    }

    assertThat(caughtRecordTooLargeException, is(true));
}
 
源代码3 项目: zipkin-reporter-java   文件: ITKafkaSender.java
@Test
public void shouldFailWhenMessageIsBiggerThanMaxSize() throws Exception {
  thrown.expect(RecordTooLargeException.class);
  sender.close();
  sender = sender.toBuilder().messageMaxBytes(1).build();

  send(CLIENT_SPAN, CLIENT_SPAN).execute();
}
 
源代码4 项目: datacollector   文件: BaseKafkaProducer09.java
@Override
public List<Record> write(Stage.Context context) throws StageException {
  // force all records in the buffer to be written out
  producer.flush();
  // make sure each record was written and handle exception if any
  List<Integer> failedRecordIndices = new ArrayList<Integer>();
  List<Exception> failedRecordExceptions = new ArrayList<Exception>();
  List<Record> responseRecords = new ArrayList<>();
  for (int i = 0; i < futureList.size(); i++) {
    Future<RecordMetadata> f = futureList.get(i);
    try {
      RecordMetadata recordMetadata = f.get();
      if (sendWriteResponse ) {
        Record record = context.createRecord("responseRecord");
        LinkedHashMap<String, Field> recordMetadataVal = new LinkedHashMap<>();
        recordMetadataVal.put("offset", Field.create(recordMetadata.offset()));
        recordMetadataVal.put("partition", Field.create(recordMetadata.partition()));
        recordMetadataVal.put("topic", Field.create(recordMetadata.topic()));
        record.set(Field.createListMap(recordMetadataVal));
        responseRecords.add(record);
      }
    } catch (InterruptedException | ExecutionException e) {
      Throwable actualCause = e.getCause();
      if (actualCause != null && actualCause instanceof RecordTooLargeException) {
        failedRecordIndices.add(i);
        failedRecordExceptions.add((Exception)actualCause);
      } else {
        throw createWriteException(e);
      }
    }
  }
  futureList.clear();
  if (!failedRecordIndices.isEmpty()) {
    throw new StageException(KafkaErrors.KAFKA_69, failedRecordIndices, failedRecordExceptions);
  }
  return responseRecords;
}
 
@Test
public void testProducerLiveConfigReload() throws Exception {
  String topic = "testProducerLiveConfigReload";
  createTopic(topic, 1);
  EmbeddableMario mario = new EmbeddableMario(null);
  Random random = new Random();

  // register kafka cluster to EmbeddableMario
  KafkaClusterDescriptor kafkaClusterDescriptor = new KafkaClusterDescriptor(
      null,
      0,
      "test",
      "test",
      "test",
      zkConnect(),
      bootstrapServers(),
      "test",
      0L
  );
  mario.addKafkaCluster(kafkaClusterDescriptor).get();

  Properties extra = new Properties();
  extra.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "" + 1500);
  extra.setProperty(ProducerConfig.ACKS_CONFIG, "1");
  extra.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
  extra.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
  Properties baseProducerConfig = getProducerProperties(extra);
  LiKafkaInstrumentedProducerImpl<byte[], byte[]> producer = new LiKafkaInstrumentedProducerImpl<>(
      baseProducerConfig,
      Collections.emptyMap(),
      (baseConfig, overrideConfig) -> new LiKafkaProducerImpl<>(LiKafkaClientsUtils.getConsolidatedProperties(baseConfig, overrideConfig)),
      mario::getUrl);

  byte[] key = new byte[500];
  byte[] value = new byte[500];
  random.nextBytes(key);
  random.nextBytes(value);
  ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, 0, key, value);
  RecordMetadata recordMetadata = producer.send(record).get();

  Producer<byte[], byte[]> delegate = producer.getDelegate();

  key = new byte[3000];
  value = new byte[3000];
  random.nextBytes(key);
  random.nextBytes(value);
  record = new ProducerRecord<>(topic, 0, key, value);
  try {
    producer.send(record).get();
    Assert.fail("record expected to fail");
  } catch (Exception e) {
    Throwable root = Throwables.getRootCause(e);
    Assert.assertTrue(root instanceof RecordTooLargeException, root.getClass() + " is not a RecordTooLargeException");
  }

  //install a new config policy, wait for the push
  mario.setConfigPolicy(new ClientConfigRules(Collections.singletonList(
      new ClientConfigRule(ClientPredicates.ALL, ImmutableMap.of("max.request.size", "" + 9000)))));

  KafkaTestUtils.waitUntil("delegate recreated", () -> {
    Producer<byte[], byte[]> delegateNow = producer.getDelegate();
    return delegateNow != delegate;
  }, 1, 2, TimeUnit.MINUTES, false);

  producer.send(record).get(); //should succeed this time

  producer.close(Duration.ofSeconds(30));
  mario.close();
}
 
@Test
public void testCloseFromProduceCallbackOnSenderThread() throws Exception {
  String topic = "testCloseFromProduceCallbackOnSenderThread";
  createTopic(topic, 1);

  Random random = new Random(666);
  Properties extra = new Properties();
  extra.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "" + 50000000); //~50MB (larger than broker-size setting)
  extra.setProperty(ProducerConfig.ACKS_CONFIG, "-1");
  extra.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
  extra.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
  Properties baseProducerConfig = getProducerProperties(extra);
  LiKafkaInstrumentedProducerImpl<byte[], byte[]> producer = new LiKafkaInstrumentedProducerImpl<byte[], byte[]>(
      baseProducerConfig,
      Collections.emptyMap(),
      (baseConfig, overrideConfig) -> new LiKafkaProducerImpl<byte[], byte[]>(LiKafkaClientsUtils.getConsolidatedProperties(baseConfig, overrideConfig)),
      () -> "bogus",
      10 //dont wait for a mario connection
  );

  byte[] key = new byte[3000];
  byte[] value = new byte[49000000];
  random.nextBytes(key);
  random.nextBytes(value); //random data is incompressible, making sure our request is large
  ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, key, value);

  AtomicReference<Throwable> issueRef = new AtomicReference<>();
  Thread testThread = new Thread(new Runnable() {
    @Override
    public void run() {
      try {
        final Thread ourThread = Thread.currentThread();
        Future<RecordMetadata> future = producer.send(record, new Callback() {
          @Override
          public void onCompletion(RecordMetadata metadata, Exception exception) {
            //we expect a RecordTooLargeException. we also expect this to happen
            //on the same thread.
            if (Thread.currentThread() != ourThread) {
              issueRef.compareAndSet(null,
                  new IllegalStateException("completion did not happen on caller thread by " + Thread.currentThread().getName())
              );
            }
            producer.close(1, TimeUnit.SECONDS);
          }
        });
        RecordMetadata recordMetadata = future.get(1, TimeUnit.MINUTES);
      } catch (Throwable anything) {
        issueRef.compareAndSet(null, anything);
      }
    }
  }, "testCloseFromProduceCallbackOnSenderThread-thread");
  testThread.setDaemon(true);
  testThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
    @Override
    public void uncaughtException(Thread t, Throwable e) {
      issueRef.compareAndSet(null, e);
    }
  });
  testThread.start();

  testThread.join(TimeUnit.MINUTES.toMillis(1));
  Thread.State state = testThread.getState();
  Assert.assertEquals(
      state,
      Thread.State.TERMINATED,
      "thread was expected to finish, instead its " + state
  );
  Throwable issue = issueRef.get();
  Throwable root = Throwables.getRootCause(issue);
  Assert.assertTrue(root instanceof RecordTooLargeException, root.getMessage());
}
 
 类所在包
 类方法
 同包方法