类org.apache.kafka.streams.kstream.Grouped源码实例Demo

下面列出了怎么用org.apache.kafka.streams.kstream.Grouped的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: kafka-tutorials   文件: AggregatingSum.java
public Topology buildTopology(Properties envProps,
                              final SpecificAvroSerde<TicketSale> ticketSaleSerde) {
  final StreamsBuilder builder = new StreamsBuilder();

  final String inputTopic = envProps.getProperty("input.topic.name");
  final String outputTopic = envProps.getProperty("output.topic.name");

  builder.stream(inputTopic, Consumed.with(Serdes.String(), ticketSaleSerde))
      // Set key to title and value to ticket value
      .map((k, v) -> new KeyValue<>((String) v.getTitle(), (Integer) v.getTicketTotalValue()))
      // Group by title
      .groupByKey(Grouped.with(Serdes.String(), Serdes.Integer()))
      // Apply SUM aggregation
      .reduce(Integer::sum)
      // Write to stream specified by outputTopic
      .toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Integer()));

  return builder.build();
}
 
源代码2 项目: kafka-tutorials   文件: AggregatingCount.java
public Topology buildTopology(Properties envProps,
                              final SpecificAvroSerde<TicketSale> ticketSaleSerde) {
  final StreamsBuilder builder = new StreamsBuilder();

  final String inputTopic = envProps.getProperty("input.topic.name");
  final String outputTopic = envProps.getProperty("output.topic.name");

  builder.stream(inputTopic, Consumed.with(Serdes.String(), ticketSaleSerde))
      // Set key to title and value to ticket value
      .map((k, v) -> new KeyValue<>((String) v.getTitle(), (Integer) v.getTicketTotalValue()))
      // Group by title
      .groupByKey(Grouped.with(Serdes.String(), Serdes.Integer()))
      // Apply COUNT method
      .count()
      // Write to stream specified by outputTopic
      .toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));

  return builder.build();
}
 
源代码3 项目: micronaut-kafka   文件: WordCountStream.java
@Singleton
@Named(STREAM_WORD_COUNT)
KStream<String, String> wordCountStream(ConfiguredStreamBuilder builder) { // <3>
    // set default serdes
    Properties props = builder.getConfiguration();
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    KStream<String, String> source = builder
            .stream(INPUT);

    KTable<String, Long> groupedByWord = source
            .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
            .groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String()))
            //Store the result in a store for lookup later
            .count(Materialized.as(WORD_COUNT_STORE)); // <4>

    groupedByWord
            //convert to stream
            .toStream()
            //send to output using specific serdes
            .to(OUTPUT, Produced.with(Serdes.String(), Serdes.Long()));

    return source;
}
 
源代码4 项目: kafka-graphs   文件: KGraph.java
public <T> KGraph<K, VV, EV> joinWithEdgesOnSource(KTable<K, T> inputDataSet,
                                                   final EdgeJoinFunction<EV, T> edgeJoinFunction) {

    KTable<Edge<K>, EV> resultedEdges = edgesGroupedBySource()
        .leftJoin(inputDataSet,
            new ApplyLeftJoinToEdgeValuesOnEitherSourceOrTarget<>(edgeJoinFunction),
            Materialized.with(keySerde(), new KryoSerde<>()))
        .toStream()
        .flatMap((k, edgeWithValues) -> {
            List<KeyValue<Edge<K>, EV>> edges = new ArrayList<>();
            for (EdgeWithValue<K, EV> edge : edgeWithValues) {
                edges.add(new KeyValue<>(new Edge<>(edge.source(), edge.target()), edge.value()));
            }
            return edges;
        })
        .groupByKey(Grouped.with(new KryoSerde<>(), edgeValueSerde()))
        .<EV>reduce((v1, v2) -> v2, Materialized.<Edge<K>, EV, KeyValueStore<Bytes, byte[]>>as(
            generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(edgeValueSerde()));

    return new KGraph<>(this.vertices, resultedEdges, serialized);
}
 
源代码5 项目: kafka-graphs   文件: KGraph.java
public <T> KGraph<K, VV, EV> joinWithEdgesOnTarget(KTable<K, T> inputDataSet,
                                                   final EdgeJoinFunction<EV, T> edgeJoinFunction) {

    KTable<Edge<K>, EV> resultedEdges = edgesGroupedByTarget()
        .leftJoin(inputDataSet,
            new ApplyLeftJoinToEdgeValuesOnEitherSourceOrTarget<>(edgeJoinFunction),
            Materialized.with(keySerde(), new KryoSerde<>()))
        .toStream()
        .flatMap((k, edgeWithValues) -> {
            List<KeyValue<Edge<K>, EV>> edges = new ArrayList<>();
            for (EdgeWithValue<K, EV> edge : edgeWithValues) {
                edges.add(new KeyValue<>(new Edge<>(edge.source(), edge.target()), edge.value()));
            }
            return edges;
        })
        .groupByKey(Grouped.with(new KryoSerde<>(), edgeValueSerde()))
        .<EV>reduce((v1, v2) -> v2, Materialized.<Edge<K>, EV, KeyValueStore<Bytes, byte[]>>as(
            generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(edgeValueSerde()));

    return new KGraph<>(vertices, resultedEdges, serialized);
}
 
源代码6 项目: kafka-graphs   文件: KGraph.java
public KGraph<K, VV, EV> subgraph(Predicate<K, VV> vertexFilter, Predicate<Edge<K>, EV> edgeFilter) {
    KTable<K, VV> filteredVertices = vertices.filter(vertexFilter);

    KTable<Edge<K>, EV> remainingEdges = edgesBySource()
        .join(filteredVertices, (e, v) -> e, Joined.with(keySerde(), new KryoSerde<>(), vertexValueSerde()))
        .map((k, edge) -> new KeyValue<>(edge.target(), edge))
        .join(filteredVertices, (e, v) -> e, Joined.with(keySerde(), new KryoSerde<>(), vertexValueSerde()))
        .map((k, edge) -> new KeyValue<>(new Edge<>(edge.source(), edge.target()), edge.value()))
        .groupByKey(Grouped.with(new KryoSerde<>(), edgeValueSerde()))
        .reduce((v1, v2) -> v2, Materialized.with(new KryoSerde<>(), edgeValueSerde()));

    KTable<Edge<K>, EV> filteredEdges = remainingEdges
        .filter(edgeFilter, Materialized.<Edge<K>, EV, KeyValueStore<Bytes, byte[]>>as(generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(edgeValueSerde()));

    return new KGraph<>(filteredVertices, filteredEdges, serialized);
}
 
@StreamListener
@SendTo("output")
public KStream<?, WordCount> process(
		@Input("input") KStream<Object, String> input) {

	return input
			.flatMapValues(
					value -> Arrays.asList(value.toLowerCase().split("\\W+")))
			.map((key, value) -> new KeyValue<>(value, value))
			.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
			.windowedBy(TimeWindows.of(Duration.ofSeconds(5))).count(Materialized.as("foo-WordCounts"))
			.toStream()
			.map((key, value) -> new KeyValue<>(null,
					new WordCount(key.key(), value,
							new Date(key.window().start()),
							new Date(key.window().end()))));
}
 
@Bean
public Function<KStream<Object, Sensor>, KStream<String, Long>> process() {

    Map<String, Object> configs = new HashMap<>();
    configs.put("valueClass", Sensor.class);
    configs.put("contentType", "application/*+avro");
    customSerde.configure(configs, false);

    return input -> input
            .map((key, value) -> {

                String newKey = "v1";
                if (value.getId().toString().endsWith("v2")) {
                    newKey = "v2";
                }
                return new KeyValue<>(newKey, value);
            })
            .groupByKey(Grouped.with(Serdes.String(), customSerde))
            .count(Materialized.as(STORE_NAME))
            .toStream();
}
 
@Bean
public Consumer<KStream<String, DomainEvent>> aggregate() {

	ObjectMapper mapper = new ObjectMapper();
	Serde<DomainEvent> domainEventSerde = new JsonSerde<>( DomainEvent.class, mapper );

	return input -> input
			.groupBy(
					(s, domainEvent) -> domainEvent.boardUuid,
					Grouped.with(null, domainEventSerde))
			.aggregate(
					String::new,
					(s, domainEvent, board) -> board.concat(domainEvent.eventType),
					Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("test-events-snapshots")
							.withKeySerde(Serdes.String()).
							withValueSerde(Serdes.String())
			);
}
 
public Topology getTopology() {
    final StreamsBuilder builder = new StreamsBuilder();

    // Click Events
    final KStream<Integer, ClickEvent> clickEvents = builder.stream(this.clickInputTopic,
            Consumed.with(Serdes.Integer(), new JsonSerde<>(ClickEvent.class)));

    final KTable<Windowed<Integer>, Long> counts = clickEvents
            .selectKey(((key, value) -> value.getStatus()))
            .filter(((key, value) -> key >= 400))
            .groupByKey(Grouped.with(Serdes.Integer(), new JsonSerde<>(ClickEvent.class)))
            .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))  // 1 Minute in ms
            .count();

    // Status codes
    final KTable<Integer, StatusCode> statusCodes = builder.table(this.statusInputTopic,
            Consumed.with(Serdes.Integer(), new JsonSerde<>(StatusCode.class)));

    // Join
    final KStream<Integer, ErrorOutput> errors = counts.toStream()
            .map((key, value) -> KeyValue.pair(
                    key.key(),
                    new ErrorOutput(key.key(), value, key.window().start(), null /*empty definition*/)))
            .join(statusCodes,
                    (countRecord, code) -> new ErrorOutput(
                            countRecord.getStatusCode(), countRecord.getCount(), countRecord.getTime(), code.getDefinition()),
                    Joined.valueSerde(new JsonSerde<>(ErrorOutput.class)));
    errors.to(this.errorOutputTopic);

    // Send alert if more than 5x a certain error code per minute
    errors.filter((key, errorOutput) -> errorOutput.getCount() > 5L).to(this.alertTopic);

    return builder.build();
}
 
源代码11 项目: kafka-graphs   文件: KGraph.java
private KTable<K, Iterable<EdgeWithValue<K, EV>>> edgesGroupedBy(Function<Edge<K>, K> fun) {
    return edges()
        .groupBy(new GroupEdges(fun), Grouped.with(keySerde(), new KryoSerde<>()))
        .aggregate(
            HashSet::new,
            (aggKey, value, aggregate) -> {
                ((Set<EdgeWithValue<K, EV>>) aggregate).add(value);
                return aggregate;
            },
            (aggKey, value, aggregate) -> {
                ((Set<EdgeWithValue<K, EV>>) aggregate).remove(value);
                return aggregate;
            },
            Materialized.with(keySerde(), new KryoSerde<>()));
}
 
源代码12 项目: kafka-graphs   文件: KGraph.java
public static <K, VV, EV> KGraph<K, VV, EV> fromEdges(
    KTable<Edge<K>, EV> edges,
    ValueMapper<K, VV> vertexValueInitializer,
    GraphSerialized<K, VV, EV> serialized) {

    KTable<K, VV> vertices = edges
        .toStream()
        .flatMap(new EmitSrcAndTarget<>(vertexValueInitializer))
        .groupByKey(Grouped.with(serialized.keySerde(), new KryoSerde<>()))
        .<VV>reduce((v1, v2) -> v2,
            Materialized.with(serialized.keySerde(), serialized.vertexValueSerde()));

    return new KGraph<>(vertices, edges, serialized);
}
 
源代码13 项目: kafka-graphs   文件: KGraph.java
public KGraph<K, VV, EV> filterOnVertices(Predicate<K, VV> vertexFilter) {
    KTable<K, VV> filteredVertices = vertices.filter(vertexFilter);

    KTable<Edge<K>, EV> remainingEdges = edgesBySource()
        .join(filteredVertices, (e, v) -> e, Joined.with(keySerde(), new KryoSerde<>(), vertexValueSerde()))
        .map((k, edge) -> new KeyValue<>(edge.target(), edge))
        .join(filteredVertices, (e, v) -> e, Joined.with(keySerde(), new KryoSerde<>(), vertexValueSerde()))
        .map((k, edge) -> new KeyValue<>(new Edge<>(edge.source(), edge.target()), edge.value()))
        .groupByKey(Grouped.with(new KryoSerde<>(), edgeValueSerde()))
        .reduce((v1, v2) -> v2, Materialized.<Edge<K>, EV, KeyValueStore<Bytes, byte[]>>as(generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(edgeValueSerde()));

    return new KGraph<>(filteredVertices, remainingEdges, serialized);
}
 
源代码14 项目: kafka-graphs   文件: KGraph.java
public KGraph<K, VV, EV> undirected() {

        KTable<Edge<K>, EV> undirectedEdges = edges
            .toStream()
            .flatMap(new UndirectEdges<>())
            .groupByKey(Grouped.with(new KryoSerde<>(), serialized.edgeValueSerde()))
            .reduce((v1, v2) -> v2, Materialized.<Edge<K>, EV, KeyValueStore<Bytes, byte[]>>as(generateStoreName())
                .withKeySerde(new KryoSerde<>()).withValueSerde(serialized.edgeValueSerde()));

        return new KGraph<>(vertices, undirectedEdges, serialized);
    }
 
源代码15 项目: kafka-graphs   文件: SummaryBulkAggregation.java
@SuppressWarnings("unchecked")
@Override
public KTable<Windowed<Short>, T> run(final KStream<Edge<K>, EV> edgeStream) {

    //For parallel window support we key the edge stream by partition and apply a parallel fold per partition.
    //Finally, we merge all locally combined results into our final graph aggregation property.
    KTable<Windowed<Short>, S> partialAgg = edgeStream
        .groupByKey(Grouped.with(new KryoSerde<>(), new KryoSerde<>()))
        .windowedBy(TimeWindows.of(Duration.ofMillis(timeMillis)))
        .aggregate(this::initialValue, new PartialAgg<>(updateFun()))
        .toStream()
        .groupBy((k, v) -> GLOBAL_KEY)
        .windowedBy(TimeWindows.of(Duration.ofMillis(timeMillis)))
        .reduce(combineFun())
        .mapValues(aggregator(edgeStream), Materialized.<Windowed<Short>, S, KeyValueStore<Bytes, byte[]>>
            as(KGraph.generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(new KryoSerde<>()));

    if (transform() != null) {
        return partialAgg.mapValues(
            transform(),
            Materialized.<Windowed<Short>, T, KeyValueStore<Bytes, byte[]>>
                as(KGraph.generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(new KryoSerde<>())
        );
    }

    return (KTable<Windowed<Short>, T>) partialAgg;
}
 
@StreamListener("input")
@SendTo("output")
public KStream<Integer, Long> process(KStream<Object, Product> input) {
	return input.filter((key, product) -> product.getId() == 123)
			.map((key, value) -> new KeyValue<>(value, value))
			.groupByKey(Grouped.with(new JsonSerde<>(Product.class),
					new JsonSerde<>(Product.class)))
			.windowedBy(TimeWindows.of(5000))
			.count(Materialized.as("id-count-store-x")).toStream()
			.map((key, value) -> new KeyValue<>(key.key().id, value));
}
 
@Bean
public Function<KStream<Object, String>, KStream<String, WordCount>> process() {

	return input -> input
			.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
			.map((key, value) -> new KeyValue<>(value, value))
			.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
			.windowedBy(TimeWindows.of(5000))
			.count(Materialized.as("foo-WordCounts"))
			.toStream()
			.map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
					new Date(key.window().start()), new Date(key.window().end()))));
}
 
@Bean
public Function<KStream<String, Long>, Function<KTable<String, String>, KStream<String, Long>>> process() {
	return userClicksStream -> (userRegionsTable -> (userClicksStream
			.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
							"UNKNOWN" : region, clicks),
					Joined.with(Serdes.String(), Serdes.Long(), null))
			.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
					regionWithClicks.getClicks()))
			.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
			.reduce(Long::sum)
			.toStream()));
}
 
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
	return (userClicksStream, userRegionsTable) -> (userClicksStream
			.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
							"UNKNOWN" : region, clicks),
					Joined.with(Serdes.String(), Serdes.Long(), null))
			.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
					regionWithClicks.getClicks()))
			.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
			.reduce(Long::sum)
			.toStream());
}
 
@Bean
public Function<KStream<Bytes, String>, KStream<Bytes, WordCount>> process() {

	return input -> input
			.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
			.map((key, value) -> new KeyValue<>(value, value))
			.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
			.windowedBy(TimeWindows.of(Duration.ofMillis(WINDOW_SIZE_MS)))
			.count(Materialized.as("WordCounts-1"))
			.toStream()
			.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
 
@Bean
public Function<KStream<Object, String>,KStream<?, WordCount>> process() {

	return input -> input
			.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
			.map((key, value) -> new KeyValue<>(value, value))
			.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
			.windowedBy(TimeWindows.of(5000))
			.count(Materialized.as("WordCounts-1"))
			.toStream()
			.map((key, value) -> new KeyValue<>(null,
					new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
 
@Bean
public Function<KStream<Object, Product>, KStream<Integer, ProductStatus>> process() {
	return input -> input
			.filter((key, product) -> productIds().contains(product.getId()))
			.map((key, value) -> new KeyValue<>(value, value))
			.groupByKey(Grouped.with(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class)))
			.windowedBy(TimeWindows.of(Duration.ofSeconds(60)))
			.count(Materialized.as("product-counts"))
			.toStream()
			.map((key, value) -> new KeyValue<>(key.key().id, new ProductStatus(key.key().id,
					value, Instant.ofEpochMilli(key.window().start()).atZone(ZoneId.systemDefault()).toLocalTime(),
					Instant.ofEpochMilli(key.window().end()).atZone(ZoneId.systemDefault()).toLocalTime())));
}
 
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {

	return input -> input
			.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
			.map((key, value) -> new KeyValue<>(value, value))
			.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
			.windowedBy(TimeWindows.of(Duration.ofSeconds(60)))
			.count(Materialized.as("WordCounts-1"))
			.toStream()
			.map((key, value) -> new KeyValue<>(null,
					new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
 
@Bean
public BiFunction<KStream<String, Long>, GlobalKTable<String, String>, KStream<String, Long>> process() {

	return (userClicksStream, userRegionsTable) -> userClicksStream
			.leftJoin(userRegionsTable,
					(name,value) -> name,
					(clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks)
					)
			.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks()))
			.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
			.reduce((firstClicks, secondClicks) -> firstClicks + secondClicks)
			.toStream();
}
 
@Bean
public Function<KStream<ProductKey, InventoryUpdateEvent>, KStream<ProductKey, InventoryCountEvent>> process() {
    return input -> input
            .groupByKey(Grouped.with(keySerde, updateEventSerde))
            .aggregate(InventoryCountEvent::new,
                    (key, updateEvent, summaryEvent) -> inventoryCountUpdateEventUpdater.apply(updateEvent, summaryEvent),
                      Materialized.<ProductKey, InventoryCountEvent>as(storeSupplier)
                            .withKeySerde(keySerde)
                            .withValueSerde(countEventSerde))
            .toStream().peek((k, v) -> logger.debug("aggregated count key {} {}", k.getProductCode(), v.getCount()));
}
 
@Bean
public Function<KStream<Object, Product>, KStream<Integer, Long>> process() {

	return input -> input
			.filter((key, product) -> productIds().contains(product.getId()))
			.map((key, value) -> new KeyValue<>(value.id, value))
			.groupByKey(Grouped.with(Serdes.Integer(), new JsonSerde<>(Product.class)))
			.count(Materialized.<Integer, Long, KeyValueStore<Bytes, byte[]>>as(STORE_NAME)
				.withKeySerde(Serdes.Integer())
				.withValueSerde(Serdes.Long()))
			.toStream();
}
 
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {

	return (userClicksStream, userRegionsTable) -> userClicksStream
			.leftJoin(userRegionsTable,
					(clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks),
					Joined.with(Serdes.String(), Serdes.Long(), null))
			.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks()))
			.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
			.reduce((firstClicks, secondClicks) -> firstClicks + secondClicks)
			.toStream();
}
 
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {

	return input -> input
			.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
			.map((key, value) -> new KeyValue<>(value, value))
			.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
			.windowedBy(TimeWindows.of(Duration.ofSeconds(20)))
			.count(Materialized.as("WordCounts-1"))
			.toStream()
			.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
 
public Topology buildTopology(Properties envProps) {
  final StreamsBuilder builder = new StreamsBuilder();
  final String inputTopic = envProps.getProperty("input.topic.name");
  final String outputTopic = envProps.getProperty("output.topic.name");
  final String joinTopic = envProps.getProperty("join.topic.name");

  final Serde<Long> longSerde = Serdes.Long();
  final Serde<String> stringSerde = Serdes.String();

  final boolean addFilter = Boolean.parseBoolean(envProps.getProperty("add.filter"));
  final boolean addNames = Boolean.parseBoolean(envProps.getProperty("add.names"));

  KStream<Long, String> inputStream = builder.stream(inputTopic, Consumed.with(longSerde, stringSerde))
                                                .selectKey((k, v) -> Long.parseLong(v.substring(0, 1)));
  if (addFilter) {
    inputStream = inputStream.filter((k, v) -> k != 100L);
  }

  final KStream<Long, String> joinedStream;
  final KStream<Long, Long> countStream;

  if (!addNames) {
       countStream = inputStream.groupByKey(Grouped.with(longSerde, stringSerde))
                                  .count()
                                  .toStream();

      joinedStream = inputStream.join(countStream, (v1, v2) -> v1 + v2.toString(),
                                                            JoinWindows.of(Duration.ofMillis(100)),
                                                            StreamJoined.with(longSerde, stringSerde, longSerde));
  } else {
      countStream = inputStream.groupByKey(Grouped.with("count", longSerde, stringSerde))
                                 .count(Materialized.as("the-counting-store"))
                                 .toStream();

      joinedStream = inputStream.join(countStream, (v1, v2) -> v1 + v2.toString(),
                                                            JoinWindows.of(Duration.ofMillis(100)),
                                                            StreamJoined.with(longSerde, stringSerde, longSerde)
                                                                        .withName("join").withStoreName("the-join-store"));
  }

  joinedStream.to(joinTopic, Produced.with(longSerde, stringSerde));
  countStream.map((k,v) -> KeyValue.pair(k.toString(), v.toString())).to(outputTopic, Produced.with(stringSerde, stringSerde));


  return builder.build();
}
 
源代码30 项目: kafka-encryption   文件: SerdesPair.java
/**
 * Build a {@link Grouped} using the keySerde and valueSerde of the pair
 */
public Grouped<K, V> toGrouped() {
    return Grouped.with(keySerde, valueSerde);
}