下面列出了org.apache.kafka.clients.producer.BufferExhaustedException#org.apache.kafka.clients.producer.Producer 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Nullable
private Producer<String, String> takeUnderLock(final boolean canCreate) {
final Lock lock = canCreate ? rwLock.writeLock() : rwLock.readLock();
lock.lock();
try {
if (null != activeProducer) {
useCount.get(activeProducer).incrementAndGet();
return activeProducer;
} else if (canCreate) {
activeProducer = createProducerInstance();
useCount.put(activeProducer, new AtomicInteger(1));
LOG.info("New producer instance created: " + activeProducer);
return activeProducer;
} else {
return null;
}
} finally {
lock.unlock();
}
}
@Override
public <K, V, E> boolean send(Producer<K, V> producer, ProducerRecord<K, V> record, final E event,
final FailedDeliveryCallback<E> failedDeliveryCallback) {
try {
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
failedDeliveryCallback.onFailedDelivery(event, exception);
}
}
});
return true;
} catch (BufferExhaustedException | TimeoutException e) {
failedDeliveryCallback.onFailedDelivery(event, e);
return false;
}
}
/**
* Create a {@link Producer} that is able to write to a topic in Kafka.
*
* @param kafkaHostname - The Kafka broker hostname. (not null)
* @param kafkaPort - The Kafka broker port.
* @param keySerializerClass - Serializes the keys. (not null)
* @param valueSerializerClass - Serializes the values. (not null)
* @return A {@link Producer} that can be used to write records to a topic.
*/
private static <K, V> Producer<K, V> makeProducer(
final String kafkaHostname,
final int kakfaPort,
final Class<? extends Serializer<K>> keySerializerClass,
final Class<? extends Serializer<V>> valueSerializerClass) {
requireNonNull(kafkaHostname);
requireNonNull(keySerializerClass);
requireNonNull(valueSerializerClass);
final Properties producerProps = new Properties();
producerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaHostname + ":" + kakfaPort);
producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass.getName());
producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass.getName());
return new KafkaProducer<>(producerProps);
}
@Override
public Producer<byte[], byte[]> createProducer(Properties transportProps) {
VerifiableProperties transportProviderProperties = new VerifiableProperties(transportProps);
String clientId = transportProviderProperties.getString(ProducerConfig.CLIENT_ID_CONFIG);
String bootstrapServers = transportProviderProperties.getString(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
Properties producerConfig = transportProviderProperties.getDomainProperties(DOMAIN_PRODUCER);
Validate.notEmpty(clientId, "clientId cannot be empty.");
Validate.notEmpty(bootstrapServers, "bootstrapServers cannot be empty.");
producerConfig = buildProducerProperties(producerConfig, clientId, bootstrapServers, DEFAULT_ENABLE_LARGE_MESSAGE);
// Default DeSerializer for Key and Payload
producerConfig.putIfAbsent(LiKafkaProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getCanonicalName());
producerConfig.putIfAbsent(LiKafkaProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getCanonicalName());
return new LiKafkaProducerImpl<>(producerConfig);
}
public static void main(String[] args) throws ExecutionException, InterruptedException{
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "spark-producer-demo-client");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String,String> producer = new KafkaProducer<>(properties);
Random random = new Random();
while (true) {
int value = random.nextInt(10);
ProducerRecord<String, String> message =
new ProducerRecord<>(topic, value+"");
producer.send(message, (recordMetadata, e) -> {
if (recordMetadata != null) {
System.out.println(recordMetadata.topic() + "-" + recordMetadata.partition() + ":" +
recordMetadata.offset());
}
});
TimeUnit.SECONDS.sleep(1);
}
}
public static Producer buildProducer() {
// 1. 指定生产者的配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, HOST);
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, 1);
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "first-transactional");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// 2. 使用配置初始化 Kafka 生产者
Producer<String, String> producer = new KafkaProducer<>(properties);
return producer;
}
@Test
@SuppressWarnings("unchecked")
public void shouldThrowExceptionIfProducerFails() throws NotificationException,
ExecutionException, InterruptedException {
Properties configProperties = mock(Properties.class);
KafkaNotification kafkaNotification = new KafkaNotification(configProperties);
Producer producer = mock(Producer.class);
String topicName = kafkaNotification.getProducerTopicName(NotificationInterface.NotificationType.HOOK);
String message = "This is a test message";
Future returnValue = mock(Future.class);
when(returnValue.get()).thenThrow(new RuntimeException("Simulating exception"));
ProducerRecord expectedRecord = new ProducerRecord(topicName, message);
when(producer.send(expectedRecord)).thenReturn(returnValue);
try {
kafkaNotification.sendInternalToProducer(producer,
NotificationInterface.NotificationType.HOOK, Arrays.asList(new String[]{message}));
fail("Should have thrown NotificationException");
} catch (NotificationException e) {
assertEquals(e.getFailedMessages().size(), 1);
assertEquals(e.getFailedMessages().get(0), "This is a test message");
}
}
/**
* Send a message to the {@link KafkaCluster} on the given topic.
*/
public void sendMessage(String topic, String message) {
Optional<Entity> anyBrokerNodeInCluster = Iterables.tryFind(cluster.getCluster().getChildren(), Predicates.and(
Predicates.instanceOf(KafkaBroker.class),
EntityPredicates.attributeEqualTo(KafkaBroker.SERVICE_UP, true)));
if (anyBrokerNodeInCluster.isPresent()) {
KafkaBroker broker = (KafkaBroker)anyBrokerNodeInCluster.get();
Properties props = new Properties();
props.put("metadata.broker.list", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
props.put("bootstrap.servers", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
((KafkaZooKeeper)cluster.getZooKeeper()).createTopic(topic);
ProducerRecord<String, String> data = new ProducerRecord<>(topic, message);
producer.send(data);
producer.close();
} else {
throw new InvalidParameterException("No kafka broker node found");
}
}
/**
* Send mock message to kafka topic .
*/
public boolean mockMessage(String clusterAlias, String topic, String message) {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getKafkaBrokerServer(clusterAlias));
props.put(Kafka.KEY_SERIALIZER, StringSerializer.class.getCanonicalName());
props.put(Kafka.VALUE_SERIALIZER, StringSerializer.class.getCanonicalName());
props.put(Kafka.PARTITION_CLASS, KafkaPartitioner.class.getName());
if (SystemConfigUtils.getBooleanProperty(clusterAlias + ".kafka.eagle.sasl.enable")) {
sasl(props, clusterAlias);
}
if (SystemConfigUtils.getBooleanProperty(clusterAlias + ".kafka.eagle.ssl.enable")) {
ssl(props, clusterAlias);
}
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>(topic, new Date().getTime() + "", message));
producer.close();
return true;
}
@Before
public void init() throws TimeoutException {
// there may exists other topics than these build in(configured in pom.xml) topics, but ignore them
// ----------------- Send sample data to TOPIC_IN start --------------------
String testID = "sampleTest" + new Random().nextInt();
List<Person> expectedPersons = Person.genRandomList(testID, 10);
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_HOST);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<Void, String> producer = new KafkaProducer<>(props);
for (Person person : expectedPersons) {
ProducerRecord<Void, String> message = new ProducerRecord<>(TOPIC_IN, person.toCSV(fieldDelimiter));
producer.send(message);
}
producer.close();
// ----------------- Send sample data to TOPIC_IN end --------------------
}
private static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) {
// the fetched list is immutable, so we're creating a mutable copy in order to sort it
List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
@Override
public int compare(PartitionInfo o1, PartitionInfo o2) {
return Integer.compare(o1.partition(), o2.partition());
}
});
int[] partitions = new int[partitionsList.size()];
for (int i = 0; i < partitions.length; i++) {
partitions[i] = partitionsList.get(i).partition();
}
return partitions;
}
@Before
public void setup() {
// Make sure the topic that the change log uses exists.
final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
kafka.createTopic(changeLogTopic);
// Setup the QueryRepository used by the test.
final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
final Consumer<?, QueryChange> queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
queryRepo = new InMemoryQueryRepository(changeLog, Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS));
// Initialize the Statements Producer and the Results Consumer.
stmtProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityStatementSerializer.class);
resultConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, VisibilityBindingSetDeserializer.class);
}
@Test
void testTransactionalProducerCreation() {
assumeFalse(
System.getProperty("os.name").contains("Windows"),
"Transactional producers not supported on Windows"
);
ProducerFactory<String, String> producerFactory = transactionalProducerFactory(kafkaBroker, "xyz");
Producer<String, String> testProducer = producerFactory.createProducer();
testProducer.beginTransaction();
testProducer.commitTransaction();
assertFalse(testProducer.metrics().isEmpty());
cleanup(producerFactory, testProducer);
}
public void start() throws InterruptedException {
RandomGenerator random = RandomManager.getRandom();
Properties props = ConfigUtils.keyValueToProperties(
"bootstrap.servers", "localhost:" + kafkaPort,
"key.serializer", "org.apache.kafka.common.serialization.StringSerializer",
"value.serializer", "org.apache.kafka.common.serialization.StringSerializer",
"compression.type", "gzip",
"linger.ms", 0,
"batch.size", 0,
"acks", 1,
"max.request.size", 1 << 26 // TODO
);
try (Producer<String,String> producer = new KafkaProducer<>(props)) {
for (int i = 0; i < howMany; i++) {
Pair<String,String> datum = datumGenerator.generate(i, random);
ProducerRecord<String,String> record =
new ProducerRecord<>(topic, datum.getFirst(), datum.getSecond());
producer.send(record);
log.debug("Sent datum {} = {}", record.key(), record.value());
if (intervalMsec > 0) {
Thread.sleep(intervalMsec);
}
}
}
}
public Producer Create() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaAddress);
props.put(ProducerConfig.CLIENT_ID_CONFIG, IKafkaConfiguration.PRODUCER_CLIENT_ID);
// Key is set as long and Value is given by concrete implementation
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
Serializer serializer = getSerializer();
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializer.getClass().getName());
//TODO: Optimizing partition
// props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
return new KafkaProducer<>(props);
}
public static void writeEmailMessage(String subject, String contents, String dataSchema, Producer<String, String> producer) {
try {
// 发邮件
ControlMessage gm = new ControlMessage(System.currentTimeMillis(), ControlType.COMMON_EMAIL_MESSAGE.toString(), BoltCommandHandlerHelper.class.getName());
gm.addPayload("subject", subject);
gm.addPayload("contents", contents);
gm.addPayload("datasource_schema", Utils.getDatasource().getDsName() + "/" + dataSchema);
String topic = PropertiesHolder.getProperties(Constants.Properties.CONFIGURE, Constants.ConfigureKey.GLOBAL_EVENT_TOPIC);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, gm.getType(), gm.toJSONString());
producer.send(record, (metadata, exception) -> {
if (exception != null) {
logger.error("Send global event error.{}", exception.getMessage());
}
});
} catch (Exception e) {
logger.error("send email error. schema:{}, subject:{}, content:{}", dataSchema, subject, contents, e);
} finally {
if (producer != null) producer.close();
}
}
private String produceMessage(String topicName, Object msg, Boolean storeSchemaInHeader) {
String bootstrapServers = CLUSTER.bootstrapServers();
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.putAll(SCHEMA_REGISTRY_TEST_SERVER_CLIENT_WRAPPER.exportClientConf(true));
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
config.put(KafkaAvroSerializer.STORE_SCHEMA_VERSION_ID_IN_HEADER, storeSchemaInHeader.toString());
final Producer<String, Object> producer = new KafkaProducer<>(config);
final Callback callback = new ProducerCallback();
LOG.info("Sending message: [{}] to topic: [{}]", msg, topicName);
ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topicName, getKey(msg), msg);
producer.send(producerRecord, callback);
producer.flush();
LOG.info("Message successfully sent to topic: [{}]", topicName);
producer.close(5, TimeUnit.SECONDS);
return bootstrapServers;
}
@Test void shouldCreateMetersWithTags() {
try (Producer<String, String> producer = createProducer()) {
metrics = new KafkaClientMetrics(producer, tags);
MeterRegistry registry = new SimpleMeterRegistry();
metrics.bindTo(registry);
assertThat(registry.getMeters())
.hasSizeGreaterThan(0)
.extracting(meter -> meter.getId().getTag("app"))
.allMatch(s -> s.equals("myapp"));
}
}
@SuppressWarnings("deprecation")
@Bean
public Producer<byte[], byte[]> kafkaProducer(
@Value("${kafka.bootstrapServers}") String bootstrapServers,
@Value("${kafka.road.batch.size:16384}") int batchSize,
@Value("${kafka.road.linger.ms:0}") int lingerMs,
@Value("${kafka.road.buffer.memory:33554432}") long bufferMemory,
@Value("${kafka.road.acks:1}") String acks,
@Value("${kafka.road.compression:none}") String compressionType,
@Value("${kafka.producer.request.timeout.ms:10000}") int requestTimeout,
MeterRegistry registry) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.ACKS_CONFIG, acks);
props.put(ProducerConfig.RETRIES_CONFIG, 100);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
Producer<byte[], byte[]> producer = new KafkaProducer<>(props);
producer.metrics().forEach((metricName, metric) -> {
String name = "onramp_kafka_producer_" + metricName.group() + "_" + metricName.name();
registry.gauge(name, metric, m -> m.value());
});
return producer;
}
private PublisherLease createLease() {
final Producer<byte[], byte[]> producer = new KafkaProducer<>(kafkaProperties);
final PublisherLease lease = new PublisherLease(producer, maxMessageSize, maxAckWaitMillis, logger) {
@Override
public void close() {
if (isPoisoned() || isClosed()) {
super.close();
} else {
publisherQueue.offer(this);
}
}
};
return lease;
}
@SuppressWarnings("unchecked")
private static DefaultProducerFactory<String, byte[]> producerFactoryWithFencedExceptionOnAbort() {
DefaultProducerFactory<String, byte[]> producerFactory =
mock(DefaultProducerFactory.class, "FactoryForExceptionOnAbortTx");
Producer<String, byte[]> producer = mock(Producer.class, "ExceptionOnAbortTx");
when(producerFactory.confirmationMode()).thenReturn(ConfirmationMode.TRANSACTIONAL);
when(producerFactory.createProducer()).thenReturn(producer);
doThrow(RuntimeException.class).when(producer).abortTransaction();
return producerFactory;
}
@Path("/kafka/{topicName}")
@POST
@Produces(MediaType.APPLICATION_JSON)
public JsonObject post(@PathParam("topicName") String topicName, String message) throws Exception {
try (Producer<Integer, String> producer = CamelKafkaSupport.createProducer()) {
RecordMetadata meta = producer.send(new ProducerRecord<>(topicName, 1, message)).get();
return Json.createObjectBuilder()
.add("topicName", meta.topic())
.add("partition", meta.partition())
.add("offset", meta.offset())
.build();
}
}
public Producer getProducer(Properties props) {
if (producerMap.containsKey(props)) {
return producerMap.get(props);
} else {
Producer producer = new KafkaProducer<>(props);
producerMap.put(props, producer);
return producer;
}
}
private void sendItemRemoval(final Producer<String, AlarmTreeItem<?>> producer,
final String topic, final String path)
{
final String key = AlarmSystem.CONFIG_PREFIX + path;
final ProducerRecord<String, AlarmTreeItem<?>> record = new ProducerRecord<>(topic, partition, key, null);
producer.send(record);
}
@Profile("!test")
@Bean(destroyMethod = "shutdown")
public RpcClient rpcClient(Producer<String, Request> requestProducer, RequestResponseMatcher responseMatcher,
ServerResponseListener responseListener) {
KafkaRpcClient client = new KafkaRpcClient(REQUEST_TOPIC, RESPONSE_TOPIC, requestProducer, responseMatcher, responseListener);
client.start();
return client;
}
public static Producer<Integer, String> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "test");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<Integer, String>(props);
}
public static void main(String[] args) {
System.out.println("开始...");
Properties props = new Properties();
props.put("bootstrap.servers", "192.169.0.23:9092");
//The "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting.
//“所有”设置将导致记录的完整提交阻塞,最慢的,但最持久的设置。
props.put("acks", "all");
//如果请求失败,生产者也会自动重试,即使设置成0 the producer can automatically retry.
props.put("retries", 0);
//The producer maintains buffers of unsent records for each partition.
props.put("batch.size", 16384);
//默认立即发送,这里这是延时毫秒数
props.put("linger.ms", 1);
//生产者缓冲大小,当缓冲区耗尽后,额外的发送调用将被阻塞。时间超过max.block.ms将抛出TimeoutException
props.put("buffer.memory", 33554432);
//The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with their ProducerRecord into bytes.
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//创建kafka的生产者类
Producer<String, String> producer = new KafkaProducer<String, String>(props);
long startTime=System.currentTimeMillis();
producer.send(new ProducerRecord<String, String>("test1",1,startTime,"a","b"));
producer.close();
//生产者的主要方法
// close();//Close this producer.
// close(long timeout, TimeUnit timeUnit); //This method waits up to timeout for the producer to complete the sending of all incomplete requests.
// flush() ;所有缓存记录被立刻发送
// for(int i = 0; i < 100; i++){
// //这里平均写入4个分区
// producer.send(new ProducerRecord<String, String>("foo",i%4, Integer.toString(i), Integer.toString(i)));
// producer.close();
// }
System.out.println("结束");
}
/**
* Create a {@link Producer} that is able to write to a topic that is hosted within an embedded instance of Kafka.
*
* @param kafka - The Kafka rule used to connect to the embedded Kafka instance. (not null)
* @param keySerializerClass - Serializes the keys. (not null)
* @param valueSerializerClass - Serializes the values. (not null)
* @return A {@link Producer} that can be used to write records to a topic.
*/
public static <K, V> Producer<K, V> makeProducer(
final KafkaTestInstanceRule kafka,
final Class<? extends Serializer<K>> keySerializerClass,
final Class<? extends Serializer<V>> valueSerializerClass) {
requireNonNull(kafka);
requireNonNull(keySerializerClass);
requireNonNull(valueSerializerClass);
final Properties props = kafka.createBootstrapServerConfig();
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass.getName());
return new KafkaProducer<>(props);
}
@Override
public <K, V> Producer<K, V> newProducer(Map<String, Object> configs, Serializer<K> keySerializer,
Serializer<V> valueSerializer) {
ObjectExtension.requireNonNull(configs, "configs");
Properties properties = new Properties();
properties.putAll(configs);
return newProducer(properties, keySerializer, valueSerializer);
}
@Test
public void sameConfiguration() {
Properties props = KafkaTests.getProps();
props.setProperty(KAFKA_PRODUCER_CONCURRENCY, "1");
props.setProperty(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String, String> p1 = pool.getProducer(props);
Producer<String, String> p2 = pool.getProducer(props);
assertThat(p1, is(p2));
}