下面列出了org.apache.kafka.clients.producer.internals.DefaultPartitioner#org.apache.kafka.common.serialization.StringSerializer 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public KafkaProducerTest(String topicName) {
Properties props = new Properties();
props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
//acks=0:如果设置为0,生产者不会等待kafka的响应。
//acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。
//acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。
props.put("acks", "all");
//配置为大于0的值的话,客户端会在消息发送失败时重新发送。
props.put("retries", 0);
//当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率
props.put("batch.size", 16384);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
this.producer = new KafkaProducer<String, String>(props);
this.topic = topicName;
}
public static void main(String[] args) {
String brokerList = "192.168.0.101:9092";
String topic = "topic.serialization";
Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 自定义的 ProtostuffSerializer
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProtostuffSerializer.class.getName());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
KafkaProducer<String, Company> producer = new KafkaProducer<>(properties);
Company company = Company.builder().name("whirly").address("中国").build();
ProducerRecord<String, Company> record = new ProducerRecord<>(topic, company);
try {
producer.send(record).get();
}catch (Exception e) {
e.printStackTrace();
}finally {
producer.close();
}
}
@Bean
public PatchSetEmitter roadModificationEmitter(
@Value("${kafka.bootstrapServers}") String bootstrapServers,
@Value("${kafka.road.modification.topic}") String topic,
ObjectMapper mapper) {
Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.RETRIES_CONFIG, 1);
Producer<String, String> kafkaProducer = new KafkaProducer<>(producerProps);
return new KafkaPatchSetEmitter(topic, kafkaProducer, mapper);
}
@Before
public void setup() {
// Make sure the topic that the change log uses exists.
final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
kafka.createTopic(changeLogTopic);
// Setup the QueryRepository used by the test.
final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
final Consumer<?, QueryChange> queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
queryRepo = new InMemoryQueryRepository(changeLog, Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS));
// Initialize the Statements Producer and the Results Consumer.
stmtProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityStatementSerializer.class);
resultConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, VisibilityBindingSetDeserializer.class);
}
@Bean
public KafkaTemplate<String, FeatureSetProto.FeatureSetSpec> specKafkaTemplate(
FeastProperties feastProperties) {
StreamProperties streamProperties = feastProperties.getStream();
Map<String, Object> props = new HashMap<>();
props.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
streamProperties.getOptions().getBootstrapServers());
KafkaTemplate<String, FeatureSetProto.FeatureSetSpec> t =
new KafkaTemplate<>(
new DefaultKafkaProducerFactory<>(
props, new StringSerializer(), new KafkaSerialization.ProtoSerializer<>()));
t.setDefaultTopic(streamProperties.getSpecsOptions().getSpecsTopic());
return t;
}
/**
* Creates an instance of {@link KafkaQueryChangeLog} using a new {@link Producer} and {@link Consumer}.
*
* @param bootstrapServers - Indicates which instance of Kafka that will be connected to. (not null)
* @param topic - The topic the QueryChangeLog is persisted to. (not null)
* @return A new instance of {@link KafkaQueryChangeLog}.
*/
public static KafkaQueryChangeLog make(
final String bootstrapServers,
final String topic) {
requireNonNull(bootstrapServers);
requireNonNull(topic);
final Properties producerProperties = new Properties();
producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName());
final Properties consumerProperties = new Properties();
consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName());
final Producer<?, QueryChange> producer = new KafkaProducer<>(producerProperties);
final Consumer<?, QueryChange> consumer = new KafkaConsumer<>(consumerProperties);
return new KafkaQueryChangeLog(producer, consumer, topic);
}
@Test
public void testKafkaProducer09Write() throws IOException, StageException {
final String message = "Hello StreamSets";
HashMap<String, Object> kafkaProducerConfigs = new HashMap<>();
kafkaProducerConfigs.put("retries", 0);
kafkaProducerConfigs.put("batch.size", 100);
kafkaProducerConfigs.put("linger.ms", 0);
kafkaProducerConfigs.put(KafkaConstants.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaProducerConfigs.put(KafkaConstants.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
String topic = getNextTopic();
SdcKafkaProducer sdcKafkaProducer = createSdcKafkaProducer(port, kafkaProducerConfigs);
sdcKafkaProducer.init();
sdcKafkaProducer.enqueueMessage(topic, message.getBytes(), "0");
sdcKafkaProducer.write(null);
verify(topic, 1, "localhost:" + port, message);
}
@Before
public void setUp() {
Properties properties = StreamsTestUtils.getStreamsConfig("integrationTest",
EMBEDDED_KAFKA.bootstrapServers(),
STRING_SERDE_CLASSNAME,
STRING_SERDE_CLASSNAME,
new Properties());
properties.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
streamsConfig = new StreamsConfig(properties);
producerConfig = TestUtils.producerConfig(EMBEDDED_KAFKA.bootstrapServers(),
StringSerializer.class,
StringSerializer.class);
consumerConfig = TestUtils.consumerConfig(EMBEDDED_KAFKA.bootstrapServers(),
StringDeserializer.class,
StringDeserializer.class);
}
public KafkaOperationRepository createKafkaOperationRepository(ObjectMapper objectMapper) {
KafkaProducer<String, Operation> operationsKafka = new KafkaProducer<>(
kafkaProperties.buildProducerProperties(),
new StringSerializer(),
new JsonSerializer<>(objectMapper)
);
KafkaProducer<String, PublishedEventWrapper> eventsKafka = new KafkaProducer<>(
kafkaProperties.buildProducerProperties(),
new StringSerializer(),
new JsonSerializer<>(objectMapper)
);
return new KafkaOperationRepository(
operationContext,
userContext,
operationsKafka,
eventsKafka,
kafkaProperties.getConsumer().getGroupId()
);
}
@Before
public void init() throws Exception {
final String topic = rule.getKafkaTopicName();
rule.createTopic(topic);
//get user specified props and update with the embedded kafka bootstrap servers and rule generated topic
props = getProps();
props.setProperty(NOTIFICATION_TOPIC, topic);
props.setProperty(KAFKA_BOOTSTRAP_SERVERS, bootstrapServers);
conf = new PeriodicNotificationApplicationConfiguration(props);
//create Kafka Producer
kafkaProps = getKafkaProperties(conf);
producer = new KafkaProducer<>(kafkaProps, new StringSerializer(), new CommandNotificationSerializer());
//extract kafka specific properties from application config
app = PeriodicNotificationApplicationFactory.getPeriodicApplication(conf);
registrar = new KafkaNotificationRegistrationClient(conf.getNotificationTopic(), producer);
}
@BeforeClass
public static void setup() {
// confluent versions 5.3.x correspond Kafka versions 2.3.x -
// https://docs.confluent.io/current/installation/versions-interoperability.html#cp-and-apache-ak-compatibility
kafka = new KafkaContainer("5.3.0");
kafka.start();
kafkaPort = kafka.getMappedPort(KafkaContainer.KAFKA_PORT);
bootstrapServers = kafka.getBootstrapServers();
consumerThread = new Consumer();
consumerThread.start();
replyConsumer = createKafkaConsumer();
replyConsumer.subscribe(Collections.singletonList(REPLY_TOPIC));
producer = new KafkaProducer<>(
ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString(),
// This should guarantee that records are batched, as long as they are sent within the configured duration
ProducerConfig.LINGER_MS_CONFIG, 50
),
new StringSerializer(),
new StringSerializer()
);
}
public KafkaKeyValueProducerPusher(String brokers, String topic, Optional<Config> kafkaConfig) {
this.closer = Closer.create();
this.topic = topic;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
//To guarantee ordered delivery, the maximum in flight requests must be set to 1.
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, true);
// add the kafka scoped config. if any of the above are specified then they are overridden
if (kafkaConfig.isPresent()) {
props.putAll(ConfigUtils.configToProperties(kafkaConfig.get()));
this.numFuturesToBuffer = ConfigUtils.getLong(kafkaConfig.get(), MAX_NUM_FUTURES_TO_BUFFER_KEY, DEFAULT_MAX_NUM_FUTURES_TO_BUFFER);
}
this.producer = createProducer(props);
}
public static void main(String[] args) throws IOException {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka0:19092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "zookeeper0:12181/kafka");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "words")
.addProcessor("WordCountProcessor", WordCountProcessor::new, "SOURCE")
.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "WordCountProcessor")
// .connectProcessorAndStateStores("WordCountProcessor", "Counts")
.addSink("SINK", "count", new StringSerializer(), new IntegerSerializer(), "WordCountProcessor");
KafkaStreams stream = new KafkaStreams(builder, props);
stream.start();
System.in.read();
stream.close();
stream.cleanUp();
}
public static void main(String[] args) {
String topic = "persistent://public/default/test";
Properties props = new Properties();
props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
Producer<Integer, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<Integer, String>(topic, i, Integer.toString(i)));
log.info("Message {} sent successfully", i);
}
producer.flush();
producer.close();
}
public void publishDummyData() {
final String topic = "TestTopic";
// Create publisher
final Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
final KafkaProducer<String, String> producer = new KafkaProducer<>(config);
for (int charCode = 65; charCode < 91; charCode++) {
final char[] key = new char[1];
key[0] = (char) charCode;
producer.send(new ProducerRecord<>(topic, new String(key), new String(key)));
}
producer.flush();
producer.close();
}
public static void main(String[] args) {
KafkaProducerTest kafkaTest = new KafkaProducerTest();
kafkaTest.runtimeContext = new MockRuntimeContext("KafkaTest Activity");
PropertyManager.getInstance().setStringProperty(PropertyNames.MDW_ASSET_LOCATION, "C:\\workspaces\\mdw6\\mdw-workflow\\assets");
KafkaAdapter kAdapter = new KafkaAdapter();
kAdapter.prepare(kafkaTest.runtimeContext);
Properties producerProps = new Properties();
//NOTE: coma separated list of server:port ex. localhost:9092,localhost:9093
String server = "<host>:<port>";
System.out.println("BOOTSTRAP_SERVERS_CONFIG : " + server);
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaMDWProducer");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put("timeout.ms", "100");
Map<String, String> recordProps = new HashMap<String, String>();
recordProps.put(KafkaAdapter.KAFKA_TOPIC_NAME, "testMdwInbound");
String key = "" + System.currentTimeMillis();
recordProps.put(KafkaAdapter.RECORD_KEY, key);
recordProps.put(KafkaAdapter.RECORD_PARTITION, "0");
String request= "KafkaTest-Request:" + key;
try {
kAdapter.init(producerProps);
kAdapter.directInvoke(request, 0, recordProps);
}
catch (AdapterException | ConnectionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class);
return props;
}
@Override
public void meet(final Reduced node) throws TopologyBuilderException {
// This indicates we're outputting VisibilityStatements.
sinkEntry = new SinkEntry<>(
new StatementOutputFormatterSupplier(),
new StringSerializer(),
new VisibilityStatementSerializer());
super.meet(node);
}
/**
* @param topic 消息名称
* @param
*/
public KafkaProducerTest(String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
this.producer = new KafkaProducer<String, String>(props);
this.topic = topic;
}
public static void main(String[] args) throws Exception {
StreamsConfig streamingConfig = new StreamsConfig(getProperties());
JsonDeserializer<Purchase> purchaseJsonDeserializer = new JsonDeserializer<>(Purchase.class);
JsonSerializer<Purchase> purchaseJsonSerializer = new JsonSerializer<>();
JsonSerializer<RewardAccumulator> rewardAccumulatorJsonSerializer = new JsonSerializer<>();
JsonSerializer<PurchasePattern> purchasePatternJsonSerializer = new JsonSerializer<>();
StringDeserializer stringDeserializer = new StringDeserializer();
StringSerializer stringSerializer = new StringSerializer();
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.addSource("SOURCE", stringDeserializer, purchaseJsonDeserializer, "src-topic")
.addProcessor("PROCESS", CreditCardAnonymizer::new, "SOURCE")
.addProcessor("PROCESS2", PurchasePatterns::new, "PROCESS")
.addProcessor("PROCESS3", CustomerRewards::new, "PROCESS")
.addSink("SINK", "patterns", stringSerializer, purchasePatternJsonSerializer, "PROCESS2")
.addSink("SINK2", "rewards",stringSerializer, rewardAccumulatorJsonSerializer, "PROCESS3")
.addSink("SINK3", "purchases", stringSerializer, purchaseJsonSerializer, "PROCESS");
System.out.println("Starting PurchaseProcessor Example");
KafkaStreams streaming = new KafkaStreams(topologyBuilder, streamingConfig);
streaming.start();
System.out.println("Now started PurchaseProcessor Example");
}
private static Producer<Object, Object> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092,localhost:9093,localhost:9094");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaMDWProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
return new KafkaProducer<>(props);
}
public Map<String, String> getDefaultProperties() {
Map<String, String> retval = new HashMap<>();
retval.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(FeatureFlags.getKafkaRequestTimeoutMs(serviceProperties)));
retval.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Integer.toString(FeatureFlags.getKafkaMaxBlockMs(serviceProperties)));
retval.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
retval.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
retval.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, SixtPartitioner.class.getName());
retval.put(ProducerConfig.RETRIES_CONFIG, "3");
retval.put(ProducerConfig.ACKS_CONFIG, "all");
return retval;
}
@Before
public void init(TestContext ctx) {
final Properties config = kafkaCluster.useTo().getProducerProperties("testTransactional_producer");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "producer-1");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
config.put(ProducerConfig.ACKS_CONFIG, "all");
producer = producer(Vertx.vertx(), config);
producer.exceptionHandler(ctx::fail);
}
@Bean
public ProducerFactory<String, Object> producerFactory(ObjectMapper aObjectMapper, KafkaProperties aKafkaProperties) {
return new DefaultKafkaProducerFactory<>(
producerConfigs(aKafkaProperties),
new StringSerializer(),
new JsonSerializer<>(aObjectMapper));
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class);
return props;
}
@Before
public void setup() {
// Make sure the topic that the change log uses exists.
final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
kafka.createTopic(changeLogTopic);
// Setup the QueryRepository used by the test.
final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
queryRepo = new InMemoryQueryRepository(changeLog, Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS));
}
@Override
public int sendMessagesPlain(long timeoutMs) {
String clientName = "sender-plain-" + clusterName;
CompletableFuture<Integer> resultPromise = new CompletableFuture<>();
IntPredicate msgCntPredicate = x -> x == messageCount;
KafkaClientProperties properties = this.clientProperties;
if (properties == null || properties.getProperties().isEmpty()) {
properties = new KafkaClientProperties.KafkaClientPropertiesBuilder()
.withNamespaceName(namespaceName)
.withClusterName(clusterName)
.withSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT)
.withBootstrapServerConfig(getExternalBootstrapConnect(namespaceName, clusterName))
.withKeySerializerConfig(StringSerializer.class)
.withValueSerializerConfig(StringSerializer.class)
.withClientIdConfig(kafkaUsername + "-producer")
.withSaslMechanism(OAuthBearerLoginModule.OAUTHBEARER_MECHANISM)
.withSaslLoginCallbackHandlerClass()
.withSharedProperties()
.withSaslJassConfig(this.clientId, this.clientSecretName, this.oauthTokenEndpointUri)
.build();
}
try (Producer plainProducer = new Producer(properties, resultPromise, msgCntPredicate, topicName, clientName, partition)) {
plainProducer.getVertx().deployVerticle(plainProducer);
return plainProducer.getResultPromise().get(timeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
throw new WaitException(e);
}
}
public void produceDoubles(int messageCount, Runnable completionCallback,
Supplier<ProducerRecord<String, Double>> messageSupplier) {
Serializer<String> keySer = new StringSerializer();
Serializer<Double> valSer = new DoubleSerializer();
String randomId = UUID.randomUUID().toString();
this.produce(randomId, messageCount, keySer, valSer, completionCallback, messageSupplier);
}
public static void main(String[] args) {
StreamsConfig streamingConfig = new StreamsConfig(getProperties());
TopologyBuilder builder = new TopologyBuilder();
JsonSerializer<StockTransactionSummary> stockTxnSummarySerializer = new JsonSerializer<>();
JsonDeserializer<StockTransactionSummary> stockTxnSummaryDeserializer = new JsonDeserializer<>(StockTransactionSummary.class);
JsonDeserializer<StockTransaction> stockTxnDeserializer = new JsonDeserializer<>(StockTransaction.class);
JsonSerializer<StockTransaction> stockTxnJsonSerializer = new JsonSerializer<>();
StringSerializer stringSerializer = new StringSerializer();
StringDeserializer stringDeserializer = new StringDeserializer();
Serde<StockTransactionSummary> stockTransactionSummarySerde = Serdes.serdeFrom(stockTxnSummarySerializer,stockTxnSummaryDeserializer);
builder.addSource("stocks-source", stringDeserializer, stockTxnDeserializer, "stocks")
.addProcessor("summary", StockSummaryProcessor::new, "stocks-source")
.addStateStore(Stores.create("stock-transactions").withStringKeys()
.withValues(stockTransactionSummarySerde).inMemory().maxEntries(100).build(),"summary")
.addSink("sink", "stocks-out", stringSerializer,stockTxnJsonSerializer,"stocks-source")
.addSink("sink-2", "transaction-summary", stringSerializer, stockTxnSummarySerializer, "summary");
System.out.println("Starting StockSummaryStatefulProcessor Example");
KafkaStreams streaming = new KafkaStreams(builder, streamingConfig);
streaming.start();
System.out.println("StockSummaryStatefulProcessor Example now started");
}
@Bean
public ProducerFactory<String, EventeumMessage> eventeumProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, settings.getBootstrapAddresses());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
configProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, settings.getRequestTimeoutMsConfig());
configProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, settings.getRetryBackoffMsConfig());
configProps.put("retries", settings.getRetries());
if ("PLAINTEXT".equals(settings.getSecurityProtocol())) {
configurePlaintextSecurityProtocol(configProps);
}
return new DefaultKafkaProducerFactory<>(configProps);
}