下面列出了org.apache.kafka.clients.producer.Producer#send ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void main(String[] args) {
String topic = "persistent://public/default/test";
Properties props = new Properties();
props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
Producer<Integer, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<Integer, String>(topic, i, Integer.toString(i)));
log.info("Message {} sent successfully", i);
}
producer.flush();
producer.close();
}
@Before
public void init() throws TimeoutException {
// there may exists other topics than these build in(configured in pom.xml) topics, but ignore them
// ----------------- Send sample data to TOPIC_IN start --------------------
String testID = "sampleTest" + new Random().nextInt();
List<Person> expectedPersons = Person.genRandomList(testID, 10);
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_HOST);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<Void, String> producer = new KafkaProducer<>(props);
for (Person person : expectedPersons) {
ProducerRecord<Void, String> message = new ProducerRecord<>(TOPIC_IN, person.toCSV(fieldDelimiter));
producer.send(message);
}
producer.close();
// ----------------- Send sample data to TOPIC_IN end --------------------
}
/**
* 在一个事务只有生产消息操作
*/
public static void onlyProduceInTransaction() {
Producer producer = buildProducer();
// 1.初始化事务
producer.initTransactions();
// 2.开启事务
producer.beginTransaction();
try {
// 3.kafka写操作集合
// 3.1 do业务逻辑
// 3.2 发送消息
producer.send(new ProducerRecord<String, String>("test", "transaction-data-1"));
producer.send(new ProducerRecord<String, String>("test", "transaction-data-2"));
// 3.3 do其他业务逻辑,还可以发送其他topic的消息。
// 4.事务提交
producer.commitTransaction();
} catch (Exception e) {
// 5.放弃事务
producer.abortTransaction();
}
}
protected void writeAlert(Action a) throws Exception {
Properties props = initKafkaProperties(a.getProperties());
Producer<String, String> producer = new KafkaProducer<>(props);
String topic = a.getProperties().getOrDefault(PROP_TOPIC,
HawkularProperties.getProperty(KAFKA_TOPIC, KAFKA_TOPIC_ENV, KAFKA_TOPIC_DEFAULT));
producer.send(new ProducerRecord<>(topic, a.getActionId(), transform(a)));
producer.close();
}
protected void produceMessagesWithoutCallback(Producer<String, String> producer, String topic)
throws InterruptedException {
List<Future<RecordMetadata>> futures = new ArrayList<>();
for (int i = 0; i < messageCount; i++) {
String v = String.valueOf(i);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, v, v);
Future<RecordMetadata> future = producer.send(record);
futures.add(future);
System.out.printf("\nrecord: %s, sent\n", record);
if (sendInterval > 0)
Thread.sleep(sendInterval);
}
long now = System.currentTimeMillis();
futures.forEach(f -> {
try {
long timeout = _waitTimeout - (System.currentTimeMillis() - now);
if (timeout <= 0)
timeout = 1;
RecordMetadata recordMetadata = f.get(timeout, TimeUnit.MILLISECONDS);
Assert.assertNotNull(recordMetadata);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
});
}
@Test
public void testConsumerBuilderWithoutDecorators() throws InterruptedException {
Producer<Integer, String> producer = createTracingProducer();
producer.send(new ProducerRecord<>("messages", 1, "test"));
producer.close();
assertEquals(1, mockTracer.finishedSpans().size());
ExecutorService executorService = Executors.newSingleThreadExecutor();
final CountDownLatch latch = new CountDownLatch(1);
executorService.execute(() -> {
Consumer<Integer, String> consumer = createConsumerWithDecorators(new ArrayList());
while (latch.getCount() > 0) {
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<Integer, String> record : records) {
SpanContext spanContext = TracingKafkaUtils
.extractSpanContext(record.headers(), mockTracer);
assertNotNull(spanContext);
assertEquals("test", record.value());
assertEquals((Integer) 1, record.key());
consumer.commitSync();
latch.countDown();
}
}
consumer.close();
});
assertTrue(latch.await(30, TimeUnit.SECONDS));
List<MockSpan> mockSpans = mockTracer.finishedSpans();
MockSpan span = mockSpans.get(1);
assertEquals("consumer", span.tags().get("span.kind"));
}
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//Ĭ��ֵΪ�ַ������л�
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//�����Զ������л�
// properties.put("value.serializer", "client.KafkaSerialization");
Producer<String, String> producer = null;
try {
producer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 100; i++) {
//TestMsg testMsg = new TestMsg(i, "hello".concat(i+""));
String msg = "Message " + i;
producer.send(new ProducerRecord<String, String>("HelloWorld", msg));
System.out.println("Sent:" + msg);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
@Test
public void testSimpleProducer() throws Exception {
String topic = "testSimpleProducer";
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Consumer<byte[]> pulsarConsumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName("my-subscription")
.subscribe();
Properties props = new Properties();
props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
Producer<Integer, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i));
}
producer.flush();
producer.close();
for (int i = 0; i < 10; i++) {
Message<byte[]> msg = pulsarConsumer.receive(1, TimeUnit.SECONDS);
assertEquals(new String(msg.getData()), "hello-" + i);
pulsarConsumer.acknowledge(msg);
}
}
public static void main(String[] args) {
// 1. 指定生产者的配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, HOST);
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, 0);
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// 2. 使用配置初始化 Kafka 生产者
Producer<String, String> producer = new KafkaProducer<>(properties);
try {
// 3. 使用 send 方法发送异步消息
for (int i = 0; i < 100; i++) {
String msg = "Message " + i;
producer.send(new ProducerRecord<>("HelloWorld", msg));
System.out.println("Sent:" + msg);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 4. 关闭生产者
producer.close();
}
}
public static void onEncodeError(final DbusMessage dbusMessage, final MetaVersion version, EncodeColumn column, Exception e) {
Producer<String, String> producer = null;
try {
logger.error("");
producer = createProducer();
TopicProvider provider = new DataOutputTopicProvider();
List<String> topics = provider.provideTopics(version.getSchema(), version.getTable());
String errorTopic = topics.get(0) + ".error";
// 将脱敏失败的数据写入到kafka
ProducerRecord<String, String> record = new ProducerRecord<>(errorTopic, buildKey(dbusMessage), dbusMessage.toString());
producer.send(record, (metadata, exception) -> {
if (exception != null) {
logger.error("Send message to 'error topic' error.{}", dbusMessage.toString(), exception);
}
});
// 发邮件给管理员
String content =
"您好:\n" +
" 报警类型: 数据脱敏异常\n" +
" 数据源:" + Utils.getDatasource().getDsName() + "\n" +
" 数据库:" + version.getSchema() + "\n" +
" 表名: " + version.getTable() + "\n" +
" 脱敏列:" + column.getFieldName() + "\n" +
" 异常信息:" + e.getMessage() + "\n" +
"请及时处理.\n";
writeEmailMessage("Dbus数据脱敏异常", content, version.getSchema(), producer);
} catch (Exception e1) {
logger.error("exception data process error.", e1);
} finally {
if (producer != null) producer.close();
}
}
@Ignore
@Test
public void simpleProducer() throws Exception {
Producer<String, String> producer = new KafkaProducer<>(producerProps);
for(int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
}
producer.close();
}
private static void sendRecords(Producer<String, byte[]> producer) throws IOException, InterruptedException {
String topic = "avro-topic";
int partition = 0;
while (true) {
for (int i = 1; i < 100; i++)
producer.send(new ProducerRecord<String, byte[]>(topic, partition, Integer.toString(0), record(i + "")));
Thread.sleep(1000);
}
}
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.99.100:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<>(props);
List<GeoLocation> geolocations = Arrays.asList(
new GeoLocation(38.6270, 90.1994),
new GeoLocation(93.9879, 76.9876), // invalid lat
new GeoLocation(41.8034, -88.1440),
new GeoLocation(40.9879, -200.9876), // invalid long
new GeoLocation(-93.9879, 76.9876), // invalid lat
new GeoLocation(9.5680, 77.9624),
new GeoLocation(13.0827, 80.2707),
new GeoLocation(40.9879, 200.9876), // invalid long
new GeoLocation(9.9252, 78.1198));
for(GeoLocation geolocation : geolocations) {
System.out.println("Sending geolocaiton [" + geolocation.toString() + "]");
ProducerRecord<String, String> record = new ProducerRecord<>(
"geolocationJob",
geolocation.toString());
producer.send(record);
}
producer.close();
}
@Test
public void testUnauthorizedWrite() throws Exception {
// Create the Producer
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:" + port);
producerProps.put("acks", "all");
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
producerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, clientKeystorePath);
producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "cspass");
producerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "ckpass");
producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
final Producer<String, String> producer = new KafkaProducer<>(producerProps);
// Send a message
Future<RecordMetadata> record =
producer.send(new ProducerRecord<String, String>("test", "somekey", "somevalue"));
producer.flush();
record.get();
try {
record = producer.send(new ProducerRecord<String, String>("dev", "somekey", "somevalue"));
producer.flush();
record.get();
} catch (Exception ex) {
Assert.assertTrue(ex.getMessage().contains("Not authorized to access topics"));
}
producer.close();
}
public static void onBuildMessageError(String errId, MetaVersion version, Exception e) {
Producer<String, String> producer = null;
try {
// 修改表的状态为:ABORT
BoltCommandHandlerHelper.changeDataTableStatus(version.getSchema(), version.getTable(), DataTable.STATUS_ABORT);
producer = createProducer();
// 发送control message 通知appender所有线程reload,同步状态
ControlMessage message = new ControlMessage(System.currentTimeMillis(), ControlType.APPENDER_RELOAD_CONFIG.name(), BoltCommandHandlerHelper.class.getName());
ProducerRecord<String, String> record = new ProducerRecord<>(Utils.getDatasource().getControlTopic(), message.getType(), message.toJSONString());
producer.send(record, (metadata, exception) -> {
if (exception != null) {
logger.error("Send control message error.{}", message.toJSONString(), exception);
}
});
// 发邮件
ControlMessage gm = new ControlMessage(System.currentTimeMillis(), ControlType.COMMON_EMAIL_MESSAGE.toString(), BoltCommandHandlerHelper.class.getName());
StringWriter sw = new StringWriter();
PrintWriter writer = new PrintWriter(sw);
e.printStackTrace(writer);
gm.addPayload("subject", "DBus生成消息异常报警");
gm.addPayload("contents", String.format("[%s]dbus-stream 生成dbus message失败:%s/%s/%s,原因:%s", errId, Utils.getDatasource().getDsName(), version.getSchema(), version.getTable(), sw));
gm.addPayload("datasource_schema", Utils.getDatasource().getDsName() + "/" + version.getSchema());
String topic = PropertiesHolder.getProperties(Constants.Properties.CONFIGURE, Constants.ConfigureKey.GLOBAL_EVENT_TOPIC);
record = new ProducerRecord<>(topic, gm.getType(), gm.toJSONString());
producer.send(record, (metadata, exception) -> {
if (exception != null) {
logger.error("Send global event error.{}", exception.getMessage());
}
});
} catch (Exception e1) {
logger.error("exception data process error.", e1);
} finally {
if (producer != null) producer.close();
}
}
public static void main(String[] args) {
String topic = "persistent://public/default/test-avro";
Properties props = new Properties();
props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
Bar bar = new Bar();
bar.setField1(true);
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(3);
Producer<Foo, Bar> producer = new KafkaProducer<>(props, fooSchema, barSchema);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<Foo, Bar>(topic, i, foo, bar));
log.info("Message {} sent successfully", i);
}
producer.flush();
producer.close();
}
public void run() {
Properties properties = new Properties();
if (mMetadataBrokerList == null || mMetadataBrokerList.isEmpty()) {
properties.put("bootstrap.servers", "localhost:9092");
} else {
properties.put("bootstrap.severs", mMetadataBrokerList);
}
properties.put("value.serializer", ByteArraySerializer.class);
properties.put("key.serializer", StringSerializer.class);
properties.put("acks", "1");
Producer<String, byte[]> producer = new KafkaProducer<>(properties);
TProtocolFactory protocol = null;
if(mType.equals("json")) {
protocol = new TSimpleJSONProtocol.Factory();
} else if (mType.equals("binary")) {
protocol = new TBinaryProtocol.Factory();
} else {
throw new RuntimeException("Undefined message encoding type: " + mType);
}
TSerializer serializer = new TSerializer(protocol);
for (int i = 0; i < mNumMessages; ++i) {
long time = (System.currentTimeMillis() - mTimeshift * 1000L) * 1000000L + i;
TestMessage testMessage = new TestMessage(time,
"some_value_" + i);
if (i % 2 == 0) {
testMessage.setEnumField(TestEnum.SOME_VALUE);
} else {
testMessage.setEnumField(TestEnum.SOME_OTHER_VALUE);
}
byte[] bytes;
try {
bytes = serializer.serialize(testMessage);
} catch(TException e) {
throw new RuntimeException("Failed to serialize message " + testMessage, e);
}
ProducerRecord<String, byte[]> data = new ProducerRecord<>(mTopic, Integer.toString(i), bytes);
producer.send(data);
}
producer.close();
}
@Test
public void testProducerConsumerMixedSchemaWithPulsarKafkaClient() throws Exception {
String topic = "testProducerConsumerMixedSchemaWithPulsarKafkaClient";
Schema<String> keySchema = new PulsarKafkaSchema<>(new StringSerializer(), new StringDeserializer());
JSONSchema<Foo> valueSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
Properties props = new Properties();
props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
@Cleanup
Consumer<String, Foo> consumer = new KafkaConsumer<>(props, keySchema, valueSchema);
consumer.subscribe(Arrays.asList(topic));
Producer<String, Foo> producer = new KafkaProducer<>(props, keySchema, valueSchema);
for (int i = 0; i < 10; i++) {
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(i);
producer.send(new ProducerRecord<>(topic, "hello" + i, foo));
}
producer.flush();
producer.close();
AtomicInteger received = new AtomicInteger();
while (received.get() < 10) {
ConsumerRecords<String, Foo> records = consumer.poll(100);
if (!records.isEmpty()) {
records.forEach(record -> {
String key = record.key();
Assert.assertEquals(key, "hello" + received.get());
Foo value = record.value();
Assert.assertEquals(value.getField1(), "field1");
Assert.assertEquals(value.getField2(), "field2");
Assert.assertEquals(value.getField3(), received.get());
received.incrementAndGet();
});
consumer.commitSync();
}
}
}
public static void main(String args[]) throws InterruptedException {
Random r = new Random();
long counter = 0;
String brokers = System.getenv("BROKERS");
String zookeepers = System.getenv("ZK");
String schemaregistry = System.getenv("SCHEMA_REGISTRY");
// Producer myProducer = getAvroProducer("192.168.99.100:9092", "http://192.168.99.100:8081");
// String zookeepers = "192.168.99.100:2181";
Producer myProducer = getStringAvroProducer(brokers, schemaregistry);
String topicName = "SalesStringExample";
Schema valueSchema = getSchema(VALUE_SCHEMA);
KafkaTools.createTopic(zookeepers, topicName, 3, 1);
while (true) {
Long randomInt = new Long(r.nextInt(10));
GenericRecord valueRecord = new GenericData.Record(valueSchema);
valueRecord.put("itemID", randomInt);
valueRecord.put("storeCode", "store-code-" + randomInt);
valueRecord.put("count", randomInt);
ProducerRecord newRecord = new ProducerRecord<String, GenericRecord>(topicName, "Key-" + randomInt, valueRecord);
Integer dice = r.nextInt(1 + (int) counter % 100);
if ((dice < 20) && (dice % 2 == 1)) {
// noop
} else {
myProducer.send(newRecord);
}
myProducer.flush();
Thread.sleep(randomInt);
counter++;
// Log out every 1K messages
if (counter % 1000 == 0) {
System.out.print(" . " + (counter / 1000) + "K");
}
}
}
@Test
public void testAuthorizedReadUsingTagPolicy() throws Exception {
// Create the Producer
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:" + port);
producerProps.put("acks", "all");
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
producerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, serviceKeystorePath);
producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "sspass");
producerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "skpass");
producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
final Producer<String, String> producer = new KafkaProducer<>(producerProps);
// Create the Consumer
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:" + port);
consumerProps.put("group.id", "test");
consumerProps.put("enable.auto.commit", "true");
consumerProps.put("auto.offset.reset", "earliest");
consumerProps.put("auto.commit.interval.ms", "1000");
consumerProps.put("session.timeout.ms", "30000");
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
consumerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
consumerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, clientKeystorePath);
consumerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "cspass");
consumerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "ckpass");
consumerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
consumerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Arrays.asList("messages"));
// Send a message
producer.send(new ProducerRecord<String, String>("messages", "somekey", "somevalue"));
producer.flush();
// Poll until we consume it
ConsumerRecord<String, String> record = null;
for (int i = 0; i < 1000; i++) {
ConsumerRecords<String, String> records = consumer.poll(100);
if (records.count() > 0) {
record = records.iterator().next();
break;
}
Thread.sleep(1000);
}
Assert.assertNotNull(record);
Assert.assertEquals("somevalue", record.value());
producer.close();
consumer.close();
}