org.apache.kafka.clients.producer.internals.DefaultPartitioner#org.apache.kafka.common.serialization.StringSerializer源码实例Demo

下面列出了org.apache.kafka.clients.producer.internals.DefaultPartitioner#org.apache.kafka.common.serialization.StringSerializer 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: java-study   文件: KafkaProducerTest.java
public KafkaProducerTest(String topicName) {
	Properties props = new Properties();
	props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
	//acks=0:如果设置为0,生产者不会等待kafka的响应。
	//acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。
	//acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。
	props.put("acks", "all");
	//配置为大于0的值的话,客户端会在消息发送失败时重新发送。
	props.put("retries", 0);
	//当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率
	props.put("batch.size", 16384);
	props.put("key.serializer", StringSerializer.class.getName());
	props.put("value.serializer", StringSerializer.class.getName());
	this.producer = new KafkaProducer<String, String>(props);
	this.topic = topicName;
}
 
public static void main(String[] args) {
    String brokerList = "192.168.0.101:9092";
    String topic = "topic.serialization";
    Properties properties = new Properties();
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    // 自定义的 ProtostuffSerializer
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProtostuffSerializer.class.getName());
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

    KafkaProducer<String, Company> producer = new KafkaProducer<>(properties);

    Company company = Company.builder().name("whirly").address("中国").build();
    ProducerRecord<String, Company> record = new ProducerRecord<>(topic, company);
    try {
        producer.send(record).get();
    }catch (Exception e) {
        e.printStackTrace();
    }finally {
        producer.close();
    }
}
 
@Bean
public PatchSetEmitter roadModificationEmitter(
    @Value("${kafka.bootstrapServers}") String bootstrapServers,
    @Value("${kafka.road.modification.topic}") String topic,
    ObjectMapper mapper) {

  Map<String, Object> producerProps = new HashMap<>();
  producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  producerProps.put(ProducerConfig.RETRIES_CONFIG, 1);

  Producer<String, String> kafkaProducer = new KafkaProducer<>(producerProps);

  return new KafkaPatchSetEmitter(topic, kafkaProducer, mapper);
}
 
源代码4 项目: rya   文件: RunQueryCommandIT.java
@Before
public void setup() {
    // Make sure the topic that the change log uses exists.
    final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
    kafka.createTopic(changeLogTopic);

    // Setup the QueryRepository used by the test.
    final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
    final Consumer<?, QueryChange> queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
    final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
    queryRepo = new InMemoryQueryRepository(changeLog, Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS));

    // Initialize the Statements Producer and the Results Consumer.
    stmtProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityStatementSerializer.class);
    resultConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, VisibilityBindingSetDeserializer.class);
}
 
源代码5 项目: feast   文件: FeatureStreamConfig.java
@Bean
public KafkaTemplate<String, FeatureSetProto.FeatureSetSpec> specKafkaTemplate(
    FeastProperties feastProperties) {
  StreamProperties streamProperties = feastProperties.getStream();
  Map<String, Object> props = new HashMap<>();

  props.put(
      ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
      streamProperties.getOptions().getBootstrapServers());

  KafkaTemplate<String, FeatureSetProto.FeatureSetSpec> t =
      new KafkaTemplate<>(
          new DefaultKafkaProducerFactory<>(
              props, new StringSerializer(), new KafkaSerialization.ProtoSerializer<>()));
  t.setDefaultTopic(streamProperties.getSpecsOptions().getSpecsTopic());
  return t;
}
 
源代码6 项目: rya   文件: KafkaQueryChangeLogFactory.java
/**
 * Creates an instance of {@link KafkaQueryChangeLog} using a new {@link Producer} and {@link Consumer}.
 *
 * @param bootstrapServers - Indicates which instance of Kafka that will be connected to. (not null)
 * @param topic - The topic the QueryChangeLog is persisted to. (not null)
 * @return A new instance of {@link KafkaQueryChangeLog}.
 */
public static KafkaQueryChangeLog make(
        final String bootstrapServers,
        final String topic) {
    requireNonNull(bootstrapServers);
    requireNonNull(topic);

    final Properties producerProperties = new Properties();
    producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName());

    final Properties consumerProperties = new Properties();
    consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName());

    final Producer<?, QueryChange> producer = new KafkaProducer<>(producerProperties);
    final Consumer<?, QueryChange> consumer = new KafkaConsumer<>(consumerProperties);
    return new KafkaQueryChangeLog(producer, consumer, topic);
}
 
源代码7 项目: datacollector   文件: KafkaProducer09IT.java
@Test
public void testKafkaProducer09Write() throws IOException, StageException {

  final String message = "Hello StreamSets";

  HashMap<String, Object> kafkaProducerConfigs = new HashMap<>();
  kafkaProducerConfigs.put("retries", 0);
  kafkaProducerConfigs.put("batch.size", 100);
  kafkaProducerConfigs.put("linger.ms", 0);
  kafkaProducerConfigs.put(KafkaConstants.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  kafkaProducerConfigs.put(KafkaConstants.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);

  String topic = getNextTopic();
  SdcKafkaProducer sdcKafkaProducer = createSdcKafkaProducer(port, kafkaProducerConfigs);
  sdcKafkaProducer.init();
  sdcKafkaProducer.enqueueMessage(topic, message.getBytes(), "0");
  sdcKafkaProducer.write(null);

  verify(topic, 1, "localhost:" + port, message);
}
 
@Before
public void setUp() {
    Properties properties = StreamsTestUtils.getStreamsConfig("integrationTest",
            EMBEDDED_KAFKA.bootstrapServers(),
            STRING_SERDE_CLASSNAME,
            STRING_SERDE_CLASSNAME,
            new Properties());
    properties.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
    
    streamsConfig = new StreamsConfig(properties);

    producerConfig = TestUtils.producerConfig(EMBEDDED_KAFKA.bootstrapServers(),
            StringSerializer.class,
            StringSerializer.class);

    consumerConfig = TestUtils.consumerConfig(EMBEDDED_KAFKA.bootstrapServers(),
            StringDeserializer.class,
            StringDeserializer.class);
}
 
public KafkaOperationRepository createKafkaOperationRepository(ObjectMapper objectMapper) {
    KafkaProducer<String, Operation> operationsKafka = new KafkaProducer<>(
            kafkaProperties.buildProducerProperties(),
            new StringSerializer(),
            new JsonSerializer<>(objectMapper)
    );
    KafkaProducer<String, PublishedEventWrapper> eventsKafka = new KafkaProducer<>(
            kafkaProperties.buildProducerProperties(),
            new StringSerializer(),
            new JsonSerializer<>(objectMapper)
    );
    return new KafkaOperationRepository(
            operationContext,
            userContext,
            operationsKafka,
            eventsKafka,
            kafkaProperties.getConsumer().getGroupId()
    );
}
 
源代码10 项目: rya   文件: PeriodicNotificationApplicationIT.java
@Before
public void init() throws Exception {
    final String topic = rule.getKafkaTopicName();
    rule.createTopic(topic);

    //get user specified props and update with the embedded kafka bootstrap servers and rule generated topic
    props = getProps();
    props.setProperty(NOTIFICATION_TOPIC, topic);
    props.setProperty(KAFKA_BOOTSTRAP_SERVERS, bootstrapServers);
    conf = new PeriodicNotificationApplicationConfiguration(props);

    //create Kafka Producer
    kafkaProps = getKafkaProperties(conf);
    producer = new KafkaProducer<>(kafkaProps, new StringSerializer(), new CommandNotificationSerializer());

    //extract kafka specific properties from application config
    app = PeriodicNotificationApplicationFactory.getPeriodicApplication(conf);
    registrar = new KafkaNotificationRegistrationClient(conf.getNotificationTopic(), producer);
}
 
源代码11 项目: apm-agent-java   文件: KafkaLegacyClientIT.java
@BeforeClass
public static void setup() {
    // confluent versions 5.3.x correspond Kafka versions 2.3.x -
    // https://docs.confluent.io/current/installation/versions-interoperability.html#cp-and-apache-ak-compatibility
    kafka = new KafkaContainer("5.3.0");
    kafka.start();
    kafkaPort = kafka.getMappedPort(KafkaContainer.KAFKA_PORT);
    bootstrapServers = kafka.getBootstrapServers();
    consumerThread = new Consumer();
    consumerThread.start();
    replyConsumer = createKafkaConsumer();
    replyConsumer.subscribe(Collections.singletonList(REPLY_TOPIC));
    producer = new KafkaProducer<>(
        ImmutableMap.of(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
            ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString(),
            // This should guarantee that records are batched, as long as they are sent within the configured duration
            ProducerConfig.LINGER_MS_CONFIG, 50
        ),
        new StringSerializer(),
        new StringSerializer()
    );
}
 
public KafkaKeyValueProducerPusher(String brokers, String topic, Optional<Config> kafkaConfig) {
  this.closer = Closer.create();

  this.topic = topic;

  Properties props = new Properties();
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
  props.put(ProducerConfig.ACKS_CONFIG, "all");
  props.put(ProducerConfig.RETRIES_CONFIG, 3);
  //To guarantee ordered delivery, the maximum in flight requests must be set to 1.
  props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
  props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, true);

  // add the kafka scoped config. if any of the above are specified then they are overridden
  if (kafkaConfig.isPresent()) {
    props.putAll(ConfigUtils.configToProperties(kafkaConfig.get()));
    this.numFuturesToBuffer = ConfigUtils.getLong(kafkaConfig.get(), MAX_NUM_FUTURES_TO_BUFFER_KEY, DEFAULT_MAX_NUM_FUTURES_TO_BUFFER);
  }

  this.producer = createProducer(props);
}
 
源代码13 项目: KafkaExample   文件: WordCountTopology.java
public static void main(String[] args) throws IOException {
		Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka0:19092");
        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "zookeeper0:12181/kafka");
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
		
		TopologyBuilder builder = new TopologyBuilder();
		builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "words")
				.addProcessor("WordCountProcessor", WordCountProcessor::new, "SOURCE")
				.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "WordCountProcessor")
//				.connectProcessorAndStateStores("WordCountProcessor", "Counts")
				.addSink("SINK", "count", new StringSerializer(), new IntegerSerializer(), "WordCountProcessor");
		
        KafkaStreams stream = new KafkaStreams(builder, props);
        stream.start();
        System.in.read();
        stream.close();
        stream.cleanUp();
	}
 
源代码14 项目: pulsar   文件: ProducerExample.java
public static void main(String[] args) {
    String topic = "persistent://public/default/test";

    Properties props = new Properties();
    props.put("bootstrap.servers", "pulsar://localhost:6650");
    props.put("key.serializer", IntegerSerializer.class.getName());
    props.put("value.serializer", StringSerializer.class.getName());

    Producer<Integer, String> producer = new KafkaProducer<>(props);

    for (int i = 0; i < 10; i++) {
        producer.send(new ProducerRecord<Integer, String>(topic, i, Integer.toString(i)));
        log.info("Message {} sent successfully", i);
    }

    producer.flush();
    producer.close();
}
 
源代码15 项目: kafka-webview   文件: WebKafkaConsumerTest.java
public void publishDummyData() {
    final String topic = "TestTopic";

    // Create publisher
    final Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    final KafkaProducer<String, String> producer = new KafkaProducer<>(config);
    for (int charCode = 65; charCode < 91; charCode++) {
        final char[] key = new char[1];
        key[0] = (char) charCode;

        producer.send(new ProducerRecord<>(topic, new String(key), new String(key)));
    }
    producer.flush();
    producer.close();
}
 
源代码16 项目: mdw   文件: KafkaProducerTest.java
public static void main(String[] args) {
    KafkaProducerTest kafkaTest = new KafkaProducerTest();

    kafkaTest.runtimeContext = new MockRuntimeContext("KafkaTest Activity");
    PropertyManager.getInstance().setStringProperty(PropertyNames.MDW_ASSET_LOCATION, "C:\\workspaces\\mdw6\\mdw-workflow\\assets");

    KafkaAdapter kAdapter = new KafkaAdapter();
    kAdapter.prepare(kafkaTest.runtimeContext);
    Properties producerProps = new Properties();

    //NOTE: coma separated list of server:port ex. localhost:9092,localhost:9093
    String server = "<host>:<port>";
    System.out.println("BOOTSTRAP_SERVERS_CONFIG : " + server);
    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
    producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaMDWProducer");
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerProps.put("timeout.ms", "100");

    Map<String, String> recordProps = new HashMap<String, String>();
    recordProps.put(KafkaAdapter.KAFKA_TOPIC_NAME, "testMdwInbound");
    String key = "" + System.currentTimeMillis();
    recordProps.put(KafkaAdapter.RECORD_KEY, key);
    recordProps.put(KafkaAdapter.RECORD_PARTITION,  "0");

    String request= "KafkaTest-Request:" + key;
    try {
        kAdapter.init(producerProps);
        kAdapter.directInvoke(request, 0, recordProps);
    }
    catch (AdapterException | ConnectionException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}
 
源代码17 项目: spring-kafka   文件: SenderConfig.java
@Bean
public Map<String, Object> producerConfigs() {
  Map<String, Object> props = new HashMap<>();

  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class);

  return props;
}
 
源代码18 项目: rya   文件: TopologyFactory.java
@Override
public void meet(final Reduced node) throws TopologyBuilderException {
    // This indicates we're outputting VisibilityStatements.
    sinkEntry = new SinkEntry<>(
            new StatementOutputFormatterSupplier(),
            new StringSerializer(),
            new VisibilityStatementSerializer());
    super.meet(node);
}
 
源代码19 项目: java-study   文件: KafkaProducerTest.java
/**
 * @param topic      消息名称
 * @param
 */
public KafkaProducerTest(String topic) {
	Properties props = new Properties();
	props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
	props.put("acks", "all");
	props.put("retries", 0);
	props.put("batch.size", 16384);
	props.put("linger.ms", 1);
	props.put("buffer.memory", 33554432);
	props.put("key.serializer", StringSerializer.class.getName());
	props.put("value.serializer", StringSerializer.class.getName());
	this.producer = new KafkaProducer<String, String>(props);
	this.topic = topic;
}
 
源代码20 项目: kafka-streams   文件: PurchaseProcessorDriver.java
public static void main(String[] args) throws Exception {

        StreamsConfig streamingConfig = new StreamsConfig(getProperties());

        JsonDeserializer<Purchase> purchaseJsonDeserializer = new JsonDeserializer<>(Purchase.class);
        JsonSerializer<Purchase> purchaseJsonSerializer = new JsonSerializer<>();
        JsonSerializer<RewardAccumulator> rewardAccumulatorJsonSerializer = new JsonSerializer<>();
        JsonSerializer<PurchasePattern> purchasePatternJsonSerializer = new JsonSerializer<>();

        StringDeserializer stringDeserializer = new StringDeserializer();
        StringSerializer stringSerializer = new StringSerializer();

        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.addSource("SOURCE", stringDeserializer, purchaseJsonDeserializer, "src-topic")

                .addProcessor("PROCESS", CreditCardAnonymizer::new, "SOURCE")
                .addProcessor("PROCESS2", PurchasePatterns::new, "PROCESS")
                .addProcessor("PROCESS3", CustomerRewards::new, "PROCESS")

                .addSink("SINK", "patterns", stringSerializer, purchasePatternJsonSerializer, "PROCESS2")
                .addSink("SINK2", "rewards",stringSerializer, rewardAccumulatorJsonSerializer, "PROCESS3")
                .addSink("SINK3", "purchases", stringSerializer, purchaseJsonSerializer, "PROCESS");

        System.out.println("Starting PurchaseProcessor Example");
        KafkaStreams streaming = new KafkaStreams(topologyBuilder, streamingConfig);
        streaming.start();
        System.out.println("Now started PurchaseProcessor Example");

    }
 
源代码21 项目: mdw   文件: KafkaAdapter.java
private static Producer<Object, Object> createProducer() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "localhost:9092,localhost:9093,localhost:9094");
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaMDWProducer");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            LongSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class.getName());
    return new KafkaProducer<>(props);
}
 
源代码22 项目: ja-micro   文件: KafkaPublisherFactory.java
public Map<String, String> getDefaultProperties() {
    Map<String, String> retval = new HashMap<>();
    retval.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(FeatureFlags.getKafkaRequestTimeoutMs(serviceProperties)));
    retval.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Integer.toString(FeatureFlags.getKafkaMaxBlockMs(serviceProperties)));
    retval.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    retval.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    retval.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, SixtPartitioner.class.getName());
    retval.put(ProducerConfig.RETRIES_CONFIG, "3");
    retval.put(ProducerConfig.ACKS_CONFIG, "all");
    return retval;
}
 
@Before
public void init(TestContext ctx) {
  final Properties config = kafkaCluster.useTo().getProducerProperties("testTransactional_producer");
  config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "producer-1");
  config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
  config.put(ProducerConfig.ACKS_CONFIG, "all");

  producer = producer(Vertx.vertx(), config);
  producer.exceptionHandler(ctx::fail);
}
 
源代码24 项目: piper   文件: KafkaMessageBrokerConfiguration.java
@Bean
public ProducerFactory<String, Object> producerFactory(ObjectMapper aObjectMapper, KafkaProperties aKafkaProperties) {
  return new DefaultKafkaProducerFactory<>(
      producerConfigs(aKafkaProperties),
      new StringSerializer(),
      new JsonSerializer<>(aObjectMapper));
}
 
源代码25 项目: spring-kafka   文件: SenderConfig.java
@Bean
public Map<String, Object> producerConfigs() {
  Map<String, Object> props = new HashMap<>();

  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class);

  return props;
}
 
源代码26 项目: rya   文件: ListQueryCommandIT.java
@Before
public void setup() {
    // Make sure the topic that the change log uses exists.
    final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
    kafka.createTopic(changeLogTopic);

    // Setup the QueryRepository used by the test.
    final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
    final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
    final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
    queryRepo = new InMemoryQueryRepository(changeLog, Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS));
}
 
@Override
public int sendMessagesPlain(long timeoutMs) {
    String clientName = "sender-plain-" + clusterName;
    CompletableFuture<Integer> resultPromise = new CompletableFuture<>();
    IntPredicate msgCntPredicate = x -> x == messageCount;

    KafkaClientProperties properties = this.clientProperties;

    if (properties == null || properties.getProperties().isEmpty()) {
        properties = new KafkaClientProperties.KafkaClientPropertiesBuilder()
            .withNamespaceName(namespaceName)
            .withClusterName(clusterName)
            .withSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT)
            .withBootstrapServerConfig(getExternalBootstrapConnect(namespaceName, clusterName))
            .withKeySerializerConfig(StringSerializer.class)
            .withValueSerializerConfig(StringSerializer.class)
            .withClientIdConfig(kafkaUsername + "-producer")
            .withSaslMechanism(OAuthBearerLoginModule.OAUTHBEARER_MECHANISM)
            .withSaslLoginCallbackHandlerClass()
            .withSharedProperties()
            .withSaslJassConfig(this.clientId, this.clientSecretName, this.oauthTokenEndpointUri)
            .build();
    }

    try (Producer plainProducer = new Producer(properties, resultPromise, msgCntPredicate, topicName, clientName, partition)) {

        plainProducer.getVertx().deployVerticle(plainProducer);

        return plainProducer.getResultPromise().get(timeoutMs, TimeUnit.MILLISECONDS);
    } catch (InterruptedException | ExecutionException | TimeoutException e) {
        e.printStackTrace();
        throw new WaitException(e);
    }
}
 
源代码28 项目: smallrye-reactive-messaging   文件: KafkaUsage.java
public void produceDoubles(int messageCount, Runnable completionCallback,
        Supplier<ProducerRecord<String, Double>> messageSupplier) {
    Serializer<String> keySer = new StringSerializer();
    Serializer<Double> valSer = new DoubleSerializer();
    String randomId = UUID.randomUUID().toString();
    this.produce(randomId, messageCount, keySer, valSer, completionCallback, messageSupplier);
}
 
public static void main(String[] args) {

        StreamsConfig streamingConfig = new StreamsConfig(getProperties());

        TopologyBuilder builder = new TopologyBuilder();

        JsonSerializer<StockTransactionSummary> stockTxnSummarySerializer = new JsonSerializer<>();
        JsonDeserializer<StockTransactionSummary> stockTxnSummaryDeserializer = new JsonDeserializer<>(StockTransactionSummary.class);
        JsonDeserializer<StockTransaction> stockTxnDeserializer = new JsonDeserializer<>(StockTransaction.class);
        JsonSerializer<StockTransaction> stockTxnJsonSerializer = new JsonSerializer<>();
        StringSerializer stringSerializer = new StringSerializer();
        StringDeserializer stringDeserializer = new StringDeserializer();

        Serde<StockTransactionSummary> stockTransactionSummarySerde = Serdes.serdeFrom(stockTxnSummarySerializer,stockTxnSummaryDeserializer);

        builder.addSource("stocks-source", stringDeserializer, stockTxnDeserializer, "stocks")
                       .addProcessor("summary", StockSummaryProcessor::new, "stocks-source")
                       .addStateStore(Stores.create("stock-transactions").withStringKeys()
                               .withValues(stockTransactionSummarySerde).inMemory().maxEntries(100).build(),"summary")
                       .addSink("sink", "stocks-out", stringSerializer,stockTxnJsonSerializer,"stocks-source")
                       .addSink("sink-2", "transaction-summary", stringSerializer, stockTxnSummarySerializer, "summary");

        System.out.println("Starting StockSummaryStatefulProcessor Example");
        KafkaStreams streaming = new KafkaStreams(builder, streamingConfig);
        streaming.start();
        System.out.println("StockSummaryStatefulProcessor Example now started");

    }
 
源代码30 项目: eventeum   文件: KafkaConfiguration.java
@Bean
public ProducerFactory<String, EventeumMessage> eventeumProducerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, settings.getBootstrapAddresses());
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
    configProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, settings.getRequestTimeoutMsConfig());
    configProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, settings.getRetryBackoffMsConfig());
    configProps.put("retries", settings.getRetries());
    if ("PLAINTEXT".equals(settings.getSecurityProtocol())) {
        configurePlaintextSecurityProtocol(configProps);
    }
    return new DefaultKafkaProducerFactory<>(configProps);
}