org.apache.kafka.clients.producer.BufferExhaustedException#org.apache.kafka.clients.producer.Producer源码实例Demo

下面列出了org.apache.kafka.clients.producer.BufferExhaustedException#org.apache.kafka.clients.producer.Producer 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: nakadi   文件: KafkaFactory.java
@Nullable
private Producer<String, String> takeUnderLock(final boolean canCreate) {
    final Lock lock = canCreate ? rwLock.writeLock() : rwLock.readLock();
    lock.lock();
    try {
        if (null != activeProducer) {
            useCount.get(activeProducer).incrementAndGet();
            return activeProducer;
        } else if (canCreate) {
            activeProducer = createProducerInstance();
            useCount.put(activeProducer, new AtomicInteger(1));
            LOG.info("New producer instance created: " + activeProducer);
            return activeProducer;
        } else {
            return null;
        }
    } finally {
        lock.unlock();
    }
}
 
@Override
public <K, V, E> boolean send(Producer<K, V> producer, ProducerRecord<K, V> record, final E event,
                              final FailedDeliveryCallback<E> failedDeliveryCallback) {
    try {
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null) {
                    failedDeliveryCallback.onFailedDelivery(event, exception);
                }
            }
        });
        return true;
    } catch (BufferExhaustedException | TimeoutException e) {
        failedDeliveryCallback.onFailedDelivery(event, e);
        return false;
    }
}
 
源代码3 项目: rya   文件: KafkaRyaStreamsClientFactory.java
/**
 * Create a {@link Producer} that is able to write to a topic in Kafka.
 *
 * @param kafkaHostname - The Kafka broker hostname. (not null)
 * @param kafkaPort - The Kafka broker port.
 * @param keySerializerClass - Serializes the keys. (not null)
 * @param valueSerializerClass - Serializes the values. (not null)
 * @return A {@link Producer} that can be used to write records to a topic.
 */
private static <K, V> Producer<K, V> makeProducer(
        final String kafkaHostname,
        final int kakfaPort,
        final Class<? extends Serializer<K>> keySerializerClass,
        final Class<? extends Serializer<V>> valueSerializerClass) {
    requireNonNull(kafkaHostname);
    requireNonNull(keySerializerClass);
    requireNonNull(valueSerializerClass);

    final Properties producerProps = new Properties();
    producerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaHostname + ":" + kakfaPort);
    producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass.getName());
    producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass.getName());
    return new KafkaProducer<>(producerProps);
}
 
源代码4 项目: brooklin   文件: LiKafkaProducerFactory.java
@Override
public Producer<byte[], byte[]> createProducer(Properties transportProps) {
  VerifiableProperties transportProviderProperties = new VerifiableProperties(transportProps);
  String clientId = transportProviderProperties.getString(ProducerConfig.CLIENT_ID_CONFIG);
  String bootstrapServers = transportProviderProperties.getString(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
  Properties producerConfig = transportProviderProperties.getDomainProperties(DOMAIN_PRODUCER);

  Validate.notEmpty(clientId, "clientId cannot be empty.");
  Validate.notEmpty(bootstrapServers, "bootstrapServers cannot be empty.");

  producerConfig = buildProducerProperties(producerConfig, clientId, bootstrapServers, DEFAULT_ENABLE_LARGE_MESSAGE);

  // Default DeSerializer for Key and Payload
  producerConfig.putIfAbsent(LiKafkaProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      ByteArraySerializer.class.getCanonicalName());
  producerConfig.putIfAbsent(LiKafkaProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      ByteArraySerializer.class.getCanonicalName());

  return new LiKafkaProducerImpl<>(producerConfig);
}
 
源代码5 项目: kafka_book_demo   文件: KafkaProducerDemo.java
public static void main(String[] args) throws ExecutionException, InterruptedException{
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
    properties.put(ProducerConfig.CLIENT_ID_CONFIG, "spark-producer-demo-client");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    Producer<String,String> producer = new KafkaProducer<>(properties);

    Random random = new Random();
    while (true) {
        int value = random.nextInt(10);
        ProducerRecord<String, String> message =
                new ProducerRecord<>(topic, value+"");
        producer.send(message, (recordMetadata, e) -> {
            if (recordMetadata != null) {
                System.out.println(recordMetadata.topic() + "-" + recordMetadata.partition() + ":" +
                        recordMetadata.offset());
            }
        });
        TimeUnit.SECONDS.sleep(1);
    }
}
 
源代码6 项目: javatech   文件: ProducerInTransaction.java
public static Producer buildProducer() {
	// 1. 指定生产者的配置
	Properties properties = new Properties();
	properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, HOST);
	properties.put(ProducerConfig.ACKS_CONFIG, "all");
	properties.put(ProducerConfig.RETRIES_CONFIG, 1);
	properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
	properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
	properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
	properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "first-transactional");
	properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
		"org.apache.kafka.common.serialization.StringSerializer");
	properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
		"org.apache.kafka.common.serialization.StringSerializer");

	// 2. 使用配置初始化 Kafka 生产者
	Producer<String, String> producer = new KafkaProducer<>(properties);
	return producer;
}
 
源代码7 项目: atlas   文件: KafkaNotificationMockTest.java
@Test
@SuppressWarnings("unchecked")
public void shouldThrowExceptionIfProducerFails() throws NotificationException,
        ExecutionException, InterruptedException {
    Properties configProperties = mock(Properties.class);
    KafkaNotification kafkaNotification = new KafkaNotification(configProperties);

    Producer producer = mock(Producer.class);
    String topicName = kafkaNotification.getProducerTopicName(NotificationInterface.NotificationType.HOOK);
    String message = "This is a test message";
    Future returnValue = mock(Future.class);
    when(returnValue.get()).thenThrow(new RuntimeException("Simulating exception"));
    ProducerRecord expectedRecord = new ProducerRecord(topicName, message);
    when(producer.send(expectedRecord)).thenReturn(returnValue);

    try {
        kafkaNotification.sendInternalToProducer(producer,
            NotificationInterface.NotificationType.HOOK, Arrays.asList(new String[]{message}));
        fail("Should have thrown NotificationException");
    } catch (NotificationException e) {
        assertEquals(e.getFailedMessages().size(), 1);
        assertEquals(e.getFailedMessages().get(0), "This is a test message");
    }
}
 
源代码8 项目: brooklyn-library   文件: KafkaSupport.java
/**
 * Send a message to the {@link KafkaCluster} on the given topic.
 */
public void sendMessage(String topic, String message) {
    Optional<Entity> anyBrokerNodeInCluster = Iterables.tryFind(cluster.getCluster().getChildren(), Predicates.and(
            Predicates.instanceOf(KafkaBroker.class),
            EntityPredicates.attributeEqualTo(KafkaBroker.SERVICE_UP, true)));
    if (anyBrokerNodeInCluster.isPresent()) {
        KafkaBroker broker = (KafkaBroker)anyBrokerNodeInCluster.get();

        Properties props = new Properties();

        props.put("metadata.broker.list", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
        props.put("bootstrap.servers", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        ((KafkaZooKeeper)cluster.getZooKeeper()).createTopic(topic);

        ProducerRecord<String, String> data = new ProducerRecord<>(topic, message);
        producer.send(data);
        producer.close();
    } else {
        throw new InvalidParameterException("No kafka broker node found");
    }
}
 
源代码9 项目: kafka-eagle   文件: KafkaServiceImpl.java
/**
 * Send mock message to kafka topic .
 */
public boolean mockMessage(String clusterAlias, String topic, String message) {
	Properties props = new Properties();
	props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getKafkaBrokerServer(clusterAlias));
	props.put(Kafka.KEY_SERIALIZER, StringSerializer.class.getCanonicalName());
	props.put(Kafka.VALUE_SERIALIZER, StringSerializer.class.getCanonicalName());
	props.put(Kafka.PARTITION_CLASS, KafkaPartitioner.class.getName());

	if (SystemConfigUtils.getBooleanProperty(clusterAlias + ".kafka.eagle.sasl.enable")) {
		sasl(props, clusterAlias);
	}
	if (SystemConfigUtils.getBooleanProperty(clusterAlias + ".kafka.eagle.ssl.enable")) {
		ssl(props, clusterAlias);
	}
	Producer<String, String> producer = new KafkaProducer<>(props);
	producer.send(new ProducerRecord<String, String>(topic, new Date().getTime() + "", message));
	producer.close();

	return true;
}
 
源代码10 项目: components   文件: KafkaDatasetOtherDelimTestIT.java
@Before
public void init() throws TimeoutException {
    // there may exists other topics than these build in(configured in pom.xml) topics, but ignore them

    // ----------------- Send sample data to TOPIC_IN start --------------------
    String testID = "sampleTest" + new Random().nextInt();

    List<Person> expectedPersons = Person.genRandomList(testID, 10);

    Properties props = new Properties();
    props.put("bootstrap.servers", BOOTSTRAP_HOST);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<Void, String> producer = new KafkaProducer<>(props);
    for (Person person : expectedPersons) {
        ProducerRecord<Void, String> message = new ProducerRecord<>(TOPIC_IN, person.toCSV(fieldDelimiter));
        producer.send(message);
    }
    producer.close();
    // ----------------- Send sample data to TOPIC_IN end --------------------
}
 
源代码11 项目: flink   文件: FlinkKafkaProducer.java
private static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) {
	// the fetched list is immutable, so we're creating a mutable copy in order to sort it
	List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));

	// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
	Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
		@Override
		public int compare(PartitionInfo o1, PartitionInfo o2) {
			return Integer.compare(o1.partition(), o2.partition());
		}
	});

	int[] partitions = new int[partitionsList.size()];
	for (int i = 0; i < partitions.length; i++) {
		partitions[i] = partitionsList.get(i).partition();
	}

	return partitions;
}
 
源代码12 项目: rya   文件: RunQueryCommandIT.java
@Before
public void setup() {
    // Make sure the topic that the change log uses exists.
    final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
    kafka.createTopic(changeLogTopic);

    // Setup the QueryRepository used by the test.
    final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
    final Consumer<?, QueryChange> queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
    final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
    queryRepo = new InMemoryQueryRepository(changeLog, Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS));

    // Initialize the Statements Producer and the Results Consumer.
    stmtProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityStatementSerializer.class);
    resultConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, VisibilityBindingSetDeserializer.class);
}
 
@Test
void testTransactionalProducerCreation() {
    assumeFalse(
            System.getProperty("os.name").contains("Windows"),
            "Transactional producers not supported on Windows"
    );

    ProducerFactory<String, String> producerFactory = transactionalProducerFactory(kafkaBroker, "xyz");
    Producer<String, String> testProducer = producerFactory.createProducer();

    testProducer.beginTransaction();
    testProducer.commitTransaction();
    assertFalse(testProducer.metrics().isEmpty());

    cleanup(producerFactory, testProducer);
}
 
源代码14 项目: oryx   文件: ProduceData.java
public void start() throws InterruptedException {
  RandomGenerator random = RandomManager.getRandom();

  Properties props = ConfigUtils.keyValueToProperties(
      "bootstrap.servers", "localhost:" + kafkaPort,
      "key.serializer", "org.apache.kafka.common.serialization.StringSerializer",
      "value.serializer", "org.apache.kafka.common.serialization.StringSerializer",
      "compression.type", "gzip",
      "linger.ms", 0,
      "batch.size", 0,
      "acks", 1,
      "max.request.size", 1 << 26 // TODO
  );
  try (Producer<String,String> producer = new KafkaProducer<>(props)) {
    for (int i = 0; i < howMany; i++) {
      Pair<String,String> datum = datumGenerator.generate(i, random);
      ProducerRecord<String,String> record =
          new ProducerRecord<>(topic, datum.getFirst(), datum.getSecond());
      producer.send(record);
      log.debug("Sent datum {} = {}", record.key(), record.value());
      if (intervalMsec > 0) {
        Thread.sleep(intervalMsec);
      }
    }
  }
}
 
源代码15 项目: alcor   文件: MessageProducerFactory.java
public Producer Create() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaAddress);
    props.put(ProducerConfig.CLIENT_ID_CONFIG, IKafkaConfiguration.PRODUCER_CLIENT_ID);

    // Key is set as long and Value is given by concrete implementation
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());

    Serializer serializer = getSerializer();
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializer.getClass().getName());

    //TODO: Optimizing partition
    // props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());

    return new KafkaProducer<>(props);
}
 
源代码16 项目: DBus   文件: BoltCommandHandlerHelper.java
public static void writeEmailMessage(String subject, String contents, String dataSchema, Producer<String, String> producer) {
    try {
        // 发邮件
        ControlMessage gm = new ControlMessage(System.currentTimeMillis(), ControlType.COMMON_EMAIL_MESSAGE.toString(), BoltCommandHandlerHelper.class.getName());

        gm.addPayload("subject", subject);
        gm.addPayload("contents", contents);
        gm.addPayload("datasource_schema", Utils.getDatasource().getDsName() + "/" + dataSchema);

        String topic = PropertiesHolder.getProperties(Constants.Properties.CONFIGURE, Constants.ConfigureKey.GLOBAL_EVENT_TOPIC);
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, gm.getType(), gm.toJSONString());
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                logger.error("Send global event error.{}", exception.getMessage());
            }
        });
    } catch (Exception e) {
        logger.error("send email error. schema:{}, subject:{}, content:{}", dataSchema, subject, contents, e);
    } finally {
        if (producer != null) producer.close();
    }
}
 
private String produceMessage(String topicName, Object msg, Boolean storeSchemaInHeader) {
    String bootstrapServers = CLUSTER.bootstrapServers();
    Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    config.putAll(SCHEMA_REGISTRY_TEST_SERVER_CLIENT_WRAPPER.exportClientConf(true));
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
    config.put(KafkaAvroSerializer.STORE_SCHEMA_VERSION_ID_IN_HEADER, storeSchemaInHeader.toString());

    final Producer<String, Object> producer = new KafkaProducer<>(config);
    final Callback callback = new ProducerCallback();
    LOG.info("Sending message: [{}] to topic: [{}]", msg, topicName);
    ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topicName, getKey(msg), msg);
    producer.send(producerRecord, callback);
    producer.flush();
    LOG.info("Message successfully sent to topic: [{}]", topicName);
    producer.close(5, TimeUnit.SECONDS);

    return bootstrapServers;
}
 
@Test void shouldCreateMetersWithTags() {
    try (Producer<String, String> producer = createProducer()) {
        metrics = new KafkaClientMetrics(producer, tags);
        MeterRegistry registry = new SimpleMeterRegistry();

        metrics.bindTo(registry);

        assertThat(registry.getMeters())
                .hasSizeGreaterThan(0)
                .extracting(meter -> meter.getId().getTag("app"))
                .allMatch(s -> s.equals("myapp"));
    }
}
 
源代码19 项目: data-highway   文件: OnrampServiceConfiguration.java
@SuppressWarnings("deprecation")
@Bean
public Producer<byte[], byte[]> kafkaProducer(
    @Value("${kafka.bootstrapServers}") String bootstrapServers,
    @Value("${kafka.road.batch.size:16384}") int batchSize,
    @Value("${kafka.road.linger.ms:0}") int lingerMs,
    @Value("${kafka.road.buffer.memory:33554432}") long bufferMemory,
    @Value("${kafka.road.acks:1}") String acks,
    @Value("${kafka.road.compression:none}") String compressionType,
    @Value("${kafka.producer.request.timeout.ms:10000}") int requestTimeout,
    MeterRegistry registry) {
  Properties props = new Properties();
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  props.put(ProducerConfig.ACKS_CONFIG, acks);
  props.put(ProducerConfig.RETRIES_CONFIG, 100);
  props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
  props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
  props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
  props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType);
  props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());

  Producer<byte[], byte[]> producer = new KafkaProducer<>(props);

  producer.metrics().forEach((metricName, metric) -> {
    String name = "onramp_kafka_producer_" + metricName.group() + "_" + metricName.name();
    registry.gauge(name, metric, m -> m.value());
  });

  return producer;
}
 
源代码20 项目: nifi   文件: PublisherPool.java
private PublisherLease createLease() {
    final Producer<byte[], byte[]> producer = new KafkaProducer<>(kafkaProperties);
    final PublisherLease lease = new PublisherLease(producer, maxMessageSize, maxAckWaitMillis, logger) {
        @Override
        public void close() {
            if (isPoisoned() || isClosed()) {
                super.close();
            } else {
                publisherQueue.offer(this);
            }
        }
    };

    return lease;
}
 
源代码21 项目: extension-kafka   文件: KafkaPublisherTest.java
@SuppressWarnings("unchecked")
private static DefaultProducerFactory<String, byte[]> producerFactoryWithFencedExceptionOnAbort() {
    DefaultProducerFactory<String, byte[]> producerFactory =
            mock(DefaultProducerFactory.class, "FactoryForExceptionOnAbortTx");
    Producer<String, byte[]> producer = mock(Producer.class, "ExceptionOnAbortTx");
    when(producerFactory.confirmationMode()).thenReturn(ConfirmationMode.TRANSACTIONAL);
    when(producerFactory.createProducer()).thenReturn(producer);
    doThrow(RuntimeException.class).when(producer).abortTransaction();
    return producerFactory;
}
 
源代码22 项目: camel-quarkus   文件: CamelKafkaResource.java
@Path("/kafka/{topicName}")
@POST
@Produces(MediaType.APPLICATION_JSON)
public JsonObject post(@PathParam("topicName") String topicName, String message) throws Exception {
    try (Producer<Integer, String> producer = CamelKafkaSupport.createProducer()) {
        RecordMetadata meta = producer.send(new ProducerRecord<>(topicName, 1, message)).get();

        return Json.createObjectBuilder()
                .add("topicName", meta.topic())
                .add("partition", meta.partition())
                .add("offset", meta.offset())
                .build();
    }
}
 
源代码23 项目: DBus   文件: KafkaContainer.java
public Producer getProducer(Properties props) {
    if (producerMap.containsKey(props)) {
        return producerMap.get(props);
    } else {
        Producer producer = new KafkaProducer<>(props);
        producerMap.put(props, producer);
        return producer;
    }
}
 
源代码24 项目: phoebus   文件: AlarmConfigProducerDemo.java
private void sendItemRemoval(final Producer<String, AlarmTreeItem<?>> producer,
        final String topic, final String path)
{
    final String key = AlarmSystem.CONFIG_PREFIX + path;
    final ProducerRecord<String, AlarmTreeItem<?>> record = new ProducerRecord<>(topic, partition, key, null);
    producer.send(record);
}
 
@Profile("!test")
@Bean(destroyMethod = "shutdown")
public RpcClient rpcClient(Producer<String, Request> requestProducer, RequestResponseMatcher responseMatcher,
                           ServerResponseListener responseListener) {
    KafkaRpcClient client = new KafkaRpcClient(REQUEST_TOPIC, RESPONSE_TOPIC, requestProducer, responseMatcher, responseListener);
    client.start();
    return client;
}
 
源代码26 项目: quarkus   文件: KafkaProducerManager.java
public static Producer<Integer, String> createProducer() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "test");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    return new KafkaProducer<Integer, String>(props);
}
 
源代码27 项目: java-study   文件: TestProducer.java
public static void main(String[] args) {
    	System.out.println("开始...");
         Properties props = new Properties();
         props.put("bootstrap.servers", "192.169.0.23:9092");
         //The "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting.
        //“所有”设置将导致记录的完整提交阻塞,最慢的,但最持久的设置。
         props.put("acks", "all");
         //如果请求失败,生产者也会自动重试,即使设置成0 the producer can automatically retry.
         props.put("retries", 0);

         //The producer maintains buffers of unsent records for each partition. 
         props.put("batch.size", 16384);
         //默认立即发送,这里这是延时毫秒数
         props.put("linger.ms", 1);
         //生产者缓冲大小,当缓冲区耗尽后,额外的发送调用将被阻塞。时间超过max.block.ms将抛出TimeoutException
         props.put("buffer.memory", 33554432);
         //The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with their ProducerRecord into bytes.
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

         //创建kafka的生产者类
         Producer<String, String> producer = new KafkaProducer<String, String>(props);
         long startTime=System.currentTimeMillis();
         producer.send(new ProducerRecord<String, String>("test1",1,startTime,"a","b"));
         producer.close();
         //生产者的主要方法
         // close();//Close this producer.
         //   close(long timeout, TimeUnit timeUnit); //This method waits up to timeout for the producer to complete the sending of all incomplete requests.
         //  flush() ;所有缓存记录被立刻发送
//         for(int i = 0; i < 100; i++){
//        	//这里平均写入4个分区
//             producer.send(new ProducerRecord<String, String>("foo",i%4, Integer.toString(i), Integer.toString(i)));
//             producer.close();
//         }
          System.out.println("结束");
    }
 
源代码28 项目: rya   文件: KafkaTestUtil.java
/**
 * Create a {@link Producer} that is able to write to a topic that is hosted within an embedded instance of Kafka.
 *
 * @param kafka - The Kafka rule used to connect to the embedded Kafka instance. (not null)
 * @param keySerializerClass - Serializes the keys. (not null)
 * @param valueSerializerClass - Serializes the values. (not null)
 * @return A {@link Producer} that can be used to write records to a topic.
 */
public static <K, V> Producer<K, V> makeProducer(
        final KafkaTestInstanceRule kafka,
        final Class<? extends Serializer<K>> keySerializerClass,
        final Class<? extends Serializer<V>> valueSerializerClass) {
    requireNonNull(kafka);
    requireNonNull(keySerializerClass);
    requireNonNull(valueSerializerClass);

    final Properties props = kafka.createBootstrapServerConfig();
    props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass.getName());
    props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass.getName());
    return new KafkaProducer<>(props);
}
 
源代码29 项目: kbear   文件: DefaultKafkaClientFactory.java
@Override
public <K, V> Producer<K, V> newProducer(Map<String, Object> configs, Serializer<K> keySerializer,
        Serializer<V> valueSerializer) {
    ObjectExtension.requireNonNull(configs, "configs");
    Properties properties = new Properties();
    properties.putAll(configs);
    return newProducer(properties, keySerializer, valueSerializer);
}
 
源代码30 项目: common-kafka   文件: KafkaProducerPoolTest.java
@Test
public void sameConfiguration() {
    Properties props = KafkaTests.getProps();
    props.setProperty(KAFKA_PRODUCER_CONCURRENCY, "1");
    props.setProperty(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    Producer<String, String> p1 = pool.getProducer(props);
    Producer<String, String> p2 = pool.getProducer(props);
    assertThat(p1, is(p2));
}