下面列出了怎么用org.apache.kafka.streams.kstream.Grouped的API类实例代码及写法,或者点击链接到github查看源代码。
public Topology buildTopology(Properties envProps,
final SpecificAvroSerde<TicketSale> ticketSaleSerde) {
final StreamsBuilder builder = new StreamsBuilder();
final String inputTopic = envProps.getProperty("input.topic.name");
final String outputTopic = envProps.getProperty("output.topic.name");
builder.stream(inputTopic, Consumed.with(Serdes.String(), ticketSaleSerde))
// Set key to title and value to ticket value
.map((k, v) -> new KeyValue<>((String) v.getTitle(), (Integer) v.getTicketTotalValue()))
// Group by title
.groupByKey(Grouped.with(Serdes.String(), Serdes.Integer()))
// Apply SUM aggregation
.reduce(Integer::sum)
// Write to stream specified by outputTopic
.toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Integer()));
return builder.build();
}
public Topology buildTopology(Properties envProps,
final SpecificAvroSerde<TicketSale> ticketSaleSerde) {
final StreamsBuilder builder = new StreamsBuilder();
final String inputTopic = envProps.getProperty("input.topic.name");
final String outputTopic = envProps.getProperty("output.topic.name");
builder.stream(inputTopic, Consumed.with(Serdes.String(), ticketSaleSerde))
// Set key to title and value to ticket value
.map((k, v) -> new KeyValue<>((String) v.getTitle(), (Integer) v.getTicketTotalValue()))
// Group by title
.groupByKey(Grouped.with(Serdes.String(), Serdes.Integer()))
// Apply COUNT method
.count()
// Write to stream specified by outputTopic
.toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
return builder.build();
}
@Singleton
@Named(STREAM_WORD_COUNT)
KStream<String, String> wordCountStream(ConfiguredStreamBuilder builder) { // <3>
// set default serdes
Properties props = builder.getConfiguration();
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStream<String, String> source = builder
.stream(INPUT);
KTable<String, Long> groupedByWord = source
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String()))
//Store the result in a store for lookup later
.count(Materialized.as(WORD_COUNT_STORE)); // <4>
groupedByWord
//convert to stream
.toStream()
//send to output using specific serdes
.to(OUTPUT, Produced.with(Serdes.String(), Serdes.Long()));
return source;
}
public <T> KGraph<K, VV, EV> joinWithEdgesOnSource(KTable<K, T> inputDataSet,
final EdgeJoinFunction<EV, T> edgeJoinFunction) {
KTable<Edge<K>, EV> resultedEdges = edgesGroupedBySource()
.leftJoin(inputDataSet,
new ApplyLeftJoinToEdgeValuesOnEitherSourceOrTarget<>(edgeJoinFunction),
Materialized.with(keySerde(), new KryoSerde<>()))
.toStream()
.flatMap((k, edgeWithValues) -> {
List<KeyValue<Edge<K>, EV>> edges = new ArrayList<>();
for (EdgeWithValue<K, EV> edge : edgeWithValues) {
edges.add(new KeyValue<>(new Edge<>(edge.source(), edge.target()), edge.value()));
}
return edges;
})
.groupByKey(Grouped.with(new KryoSerde<>(), edgeValueSerde()))
.<EV>reduce((v1, v2) -> v2, Materialized.<Edge<K>, EV, KeyValueStore<Bytes, byte[]>>as(
generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(edgeValueSerde()));
return new KGraph<>(this.vertices, resultedEdges, serialized);
}
public <T> KGraph<K, VV, EV> joinWithEdgesOnTarget(KTable<K, T> inputDataSet,
final EdgeJoinFunction<EV, T> edgeJoinFunction) {
KTable<Edge<K>, EV> resultedEdges = edgesGroupedByTarget()
.leftJoin(inputDataSet,
new ApplyLeftJoinToEdgeValuesOnEitherSourceOrTarget<>(edgeJoinFunction),
Materialized.with(keySerde(), new KryoSerde<>()))
.toStream()
.flatMap((k, edgeWithValues) -> {
List<KeyValue<Edge<K>, EV>> edges = new ArrayList<>();
for (EdgeWithValue<K, EV> edge : edgeWithValues) {
edges.add(new KeyValue<>(new Edge<>(edge.source(), edge.target()), edge.value()));
}
return edges;
})
.groupByKey(Grouped.with(new KryoSerde<>(), edgeValueSerde()))
.<EV>reduce((v1, v2) -> v2, Materialized.<Edge<K>, EV, KeyValueStore<Bytes, byte[]>>as(
generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(edgeValueSerde()));
return new KGraph<>(vertices, resultedEdges, serialized);
}
public KGraph<K, VV, EV> subgraph(Predicate<K, VV> vertexFilter, Predicate<Edge<K>, EV> edgeFilter) {
KTable<K, VV> filteredVertices = vertices.filter(vertexFilter);
KTable<Edge<K>, EV> remainingEdges = edgesBySource()
.join(filteredVertices, (e, v) -> e, Joined.with(keySerde(), new KryoSerde<>(), vertexValueSerde()))
.map((k, edge) -> new KeyValue<>(edge.target(), edge))
.join(filteredVertices, (e, v) -> e, Joined.with(keySerde(), new KryoSerde<>(), vertexValueSerde()))
.map((k, edge) -> new KeyValue<>(new Edge<>(edge.source(), edge.target()), edge.value()))
.groupByKey(Grouped.with(new KryoSerde<>(), edgeValueSerde()))
.reduce((v1, v2) -> v2, Materialized.with(new KryoSerde<>(), edgeValueSerde()));
KTable<Edge<K>, EV> filteredEdges = remainingEdges
.filter(edgeFilter, Materialized.<Edge<K>, EV, KeyValueStore<Bytes, byte[]>>as(generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(edgeValueSerde()));
return new KGraph<>(filteredVertices, filteredEdges, serialized);
}
@StreamListener
@SendTo("output")
public KStream<?, WordCount> process(
@Input("input") KStream<Object, String> input) {
return input
.flatMapValues(
value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(5))).count(Materialized.as("foo-WordCounts"))
.toStream()
.map((key, value) -> new KeyValue<>(null,
new WordCount(key.key(), value,
new Date(key.window().start()),
new Date(key.window().end()))));
}
@Bean
public Function<KStream<Object, Sensor>, KStream<String, Long>> process() {
Map<String, Object> configs = new HashMap<>();
configs.put("valueClass", Sensor.class);
configs.put("contentType", "application/*+avro");
customSerde.configure(configs, false);
return input -> input
.map((key, value) -> {
String newKey = "v1";
if (value.getId().toString().endsWith("v2")) {
newKey = "v2";
}
return new KeyValue<>(newKey, value);
})
.groupByKey(Grouped.with(Serdes.String(), customSerde))
.count(Materialized.as(STORE_NAME))
.toStream();
}
@Bean
public Consumer<KStream<String, DomainEvent>> aggregate() {
ObjectMapper mapper = new ObjectMapper();
Serde<DomainEvent> domainEventSerde = new JsonSerde<>( DomainEvent.class, mapper );
return input -> input
.groupBy(
(s, domainEvent) -> domainEvent.boardUuid,
Grouped.with(null, domainEventSerde))
.aggregate(
String::new,
(s, domainEvent, board) -> board.concat(domainEvent.eventType),
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("test-events-snapshots")
.withKeySerde(Serdes.String()).
withValueSerde(Serdes.String())
);
}
public Topology getTopology() {
final StreamsBuilder builder = new StreamsBuilder();
// Click Events
final KStream<Integer, ClickEvent> clickEvents = builder.stream(this.clickInputTopic,
Consumed.with(Serdes.Integer(), new JsonSerde<>(ClickEvent.class)));
final KTable<Windowed<Integer>, Long> counts = clickEvents
.selectKey(((key, value) -> value.getStatus()))
.filter(((key, value) -> key >= 400))
.groupByKey(Grouped.with(Serdes.Integer(), new JsonSerde<>(ClickEvent.class)))
.windowedBy(TimeWindows.of(Duration.ofMinutes(1))) // 1 Minute in ms
.count();
// Status codes
final KTable<Integer, StatusCode> statusCodes = builder.table(this.statusInputTopic,
Consumed.with(Serdes.Integer(), new JsonSerde<>(StatusCode.class)));
// Join
final KStream<Integer, ErrorOutput> errors = counts.toStream()
.map((key, value) -> KeyValue.pair(
key.key(),
new ErrorOutput(key.key(), value, key.window().start(), null /*empty definition*/)))
.join(statusCodes,
(countRecord, code) -> new ErrorOutput(
countRecord.getStatusCode(), countRecord.getCount(), countRecord.getTime(), code.getDefinition()),
Joined.valueSerde(new JsonSerde<>(ErrorOutput.class)));
errors.to(this.errorOutputTopic);
// Send alert if more than 5x a certain error code per minute
errors.filter((key, errorOutput) -> errorOutput.getCount() > 5L).to(this.alertTopic);
return builder.build();
}
private KTable<K, Iterable<EdgeWithValue<K, EV>>> edgesGroupedBy(Function<Edge<K>, K> fun) {
return edges()
.groupBy(new GroupEdges(fun), Grouped.with(keySerde(), new KryoSerde<>()))
.aggregate(
HashSet::new,
(aggKey, value, aggregate) -> {
((Set<EdgeWithValue<K, EV>>) aggregate).add(value);
return aggregate;
},
(aggKey, value, aggregate) -> {
((Set<EdgeWithValue<K, EV>>) aggregate).remove(value);
return aggregate;
},
Materialized.with(keySerde(), new KryoSerde<>()));
}
public static <K, VV, EV> KGraph<K, VV, EV> fromEdges(
KTable<Edge<K>, EV> edges,
ValueMapper<K, VV> vertexValueInitializer,
GraphSerialized<K, VV, EV> serialized) {
KTable<K, VV> vertices = edges
.toStream()
.flatMap(new EmitSrcAndTarget<>(vertexValueInitializer))
.groupByKey(Grouped.with(serialized.keySerde(), new KryoSerde<>()))
.<VV>reduce((v1, v2) -> v2,
Materialized.with(serialized.keySerde(), serialized.vertexValueSerde()));
return new KGraph<>(vertices, edges, serialized);
}
public KGraph<K, VV, EV> filterOnVertices(Predicate<K, VV> vertexFilter) {
KTable<K, VV> filteredVertices = vertices.filter(vertexFilter);
KTable<Edge<K>, EV> remainingEdges = edgesBySource()
.join(filteredVertices, (e, v) -> e, Joined.with(keySerde(), new KryoSerde<>(), vertexValueSerde()))
.map((k, edge) -> new KeyValue<>(edge.target(), edge))
.join(filteredVertices, (e, v) -> e, Joined.with(keySerde(), new KryoSerde<>(), vertexValueSerde()))
.map((k, edge) -> new KeyValue<>(new Edge<>(edge.source(), edge.target()), edge.value()))
.groupByKey(Grouped.with(new KryoSerde<>(), edgeValueSerde()))
.reduce((v1, v2) -> v2, Materialized.<Edge<K>, EV, KeyValueStore<Bytes, byte[]>>as(generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(edgeValueSerde()));
return new KGraph<>(filteredVertices, remainingEdges, serialized);
}
public KGraph<K, VV, EV> undirected() {
KTable<Edge<K>, EV> undirectedEdges = edges
.toStream()
.flatMap(new UndirectEdges<>())
.groupByKey(Grouped.with(new KryoSerde<>(), serialized.edgeValueSerde()))
.reduce((v1, v2) -> v2, Materialized.<Edge<K>, EV, KeyValueStore<Bytes, byte[]>>as(generateStoreName())
.withKeySerde(new KryoSerde<>()).withValueSerde(serialized.edgeValueSerde()));
return new KGraph<>(vertices, undirectedEdges, serialized);
}
@SuppressWarnings("unchecked")
@Override
public KTable<Windowed<Short>, T> run(final KStream<Edge<K>, EV> edgeStream) {
//For parallel window support we key the edge stream by partition and apply a parallel fold per partition.
//Finally, we merge all locally combined results into our final graph aggregation property.
KTable<Windowed<Short>, S> partialAgg = edgeStream
.groupByKey(Grouped.with(new KryoSerde<>(), new KryoSerde<>()))
.windowedBy(TimeWindows.of(Duration.ofMillis(timeMillis)))
.aggregate(this::initialValue, new PartialAgg<>(updateFun()))
.toStream()
.groupBy((k, v) -> GLOBAL_KEY)
.windowedBy(TimeWindows.of(Duration.ofMillis(timeMillis)))
.reduce(combineFun())
.mapValues(aggregator(edgeStream), Materialized.<Windowed<Short>, S, KeyValueStore<Bytes, byte[]>>
as(KGraph.generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(new KryoSerde<>()));
if (transform() != null) {
return partialAgg.mapValues(
transform(),
Materialized.<Windowed<Short>, T, KeyValueStore<Bytes, byte[]>>
as(KGraph.generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(new KryoSerde<>())
);
}
return (KTable<Windowed<Short>, T>) partialAgg;
}
@StreamListener("input")
@SendTo("output")
public KStream<Integer, Long> process(KStream<Object, Product> input) {
return input.filter((key, product) -> product.getId() == 123)
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Grouped.with(new JsonSerde<>(Product.class),
new JsonSerde<>(Product.class)))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("id-count-store-x")).toStream()
.map((key, value) -> new KeyValue<>(key.key().id, value));
}
@Bean
public Function<KStream<Object, String>, KStream<String, WordCount>> process() {
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("foo-WordCounts"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))));
}
@Bean
public Function<KStream<String, Long>, Function<KTable<String, String>, KStream<String, Long>>> process() {
return userClicksStream -> (userRegionsTable -> (userClicksStream
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream()));
}
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream());
}
@Bean
public Function<KStream<Bytes, String>, KStream<Bytes, WordCount>> process() {
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofMillis(WINDOW_SIZE_MS)))
.count(Materialized.as("WordCounts-1"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
@Bean
public Function<KStream<Object, String>,KStream<?, WordCount>> process() {
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("WordCounts-1"))
.toStream()
.map((key, value) -> new KeyValue<>(null,
new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
@Bean
public Function<KStream<Object, Product>, KStream<Integer, ProductStatus>> process() {
return input -> input
.filter((key, product) -> productIds().contains(product.getId()))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Grouped.with(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class)))
.windowedBy(TimeWindows.of(Duration.ofSeconds(60)))
.count(Materialized.as("product-counts"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key().id, new ProductStatus(key.key().id,
value, Instant.ofEpochMilli(key.window().start()).atZone(ZoneId.systemDefault()).toLocalTime(),
Instant.ofEpochMilli(key.window().end()).atZone(ZoneId.systemDefault()).toLocalTime())));
}
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(60)))
.count(Materialized.as("WordCounts-1"))
.toStream()
.map((key, value) -> new KeyValue<>(null,
new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
@Bean
public BiFunction<KStream<String, Long>, GlobalKTable<String, String>, KStream<String, Long>> process() {
return (userClicksStream, userRegionsTable) -> userClicksStream
.leftJoin(userRegionsTable,
(name,value) -> name,
(clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks)
)
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce((firstClicks, secondClicks) -> firstClicks + secondClicks)
.toStream();
}
@Bean
public Function<KStream<ProductKey, InventoryUpdateEvent>, KStream<ProductKey, InventoryCountEvent>> process() {
return input -> input
.groupByKey(Grouped.with(keySerde, updateEventSerde))
.aggregate(InventoryCountEvent::new,
(key, updateEvent, summaryEvent) -> inventoryCountUpdateEventUpdater.apply(updateEvent, summaryEvent),
Materialized.<ProductKey, InventoryCountEvent>as(storeSupplier)
.withKeySerde(keySerde)
.withValueSerde(countEventSerde))
.toStream().peek((k, v) -> logger.debug("aggregated count key {} {}", k.getProductCode(), v.getCount()));
}
@Bean
public Function<KStream<Object, Product>, KStream<Integer, Long>> process() {
return input -> input
.filter((key, product) -> productIds().contains(product.getId()))
.map((key, value) -> new KeyValue<>(value.id, value))
.groupByKey(Grouped.with(Serdes.Integer(), new JsonSerde<>(Product.class)))
.count(Materialized.<Integer, Long, KeyValueStore<Bytes, byte[]>>as(STORE_NAME)
.withKeySerde(Serdes.Integer())
.withValueSerde(Serdes.Long()))
.toStream();
}
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
return (userClicksStream, userRegionsTable) -> userClicksStream
.leftJoin(userRegionsTable,
(clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce((firstClicks, secondClicks) -> firstClicks + secondClicks)
.toStream();
}
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(20)))
.count(Materialized.as("WordCounts-1"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
public Topology buildTopology(Properties envProps) {
final StreamsBuilder builder = new StreamsBuilder();
final String inputTopic = envProps.getProperty("input.topic.name");
final String outputTopic = envProps.getProperty("output.topic.name");
final String joinTopic = envProps.getProperty("join.topic.name");
final Serde<Long> longSerde = Serdes.Long();
final Serde<String> stringSerde = Serdes.String();
final boolean addFilter = Boolean.parseBoolean(envProps.getProperty("add.filter"));
final boolean addNames = Boolean.parseBoolean(envProps.getProperty("add.names"));
KStream<Long, String> inputStream = builder.stream(inputTopic, Consumed.with(longSerde, stringSerde))
.selectKey((k, v) -> Long.parseLong(v.substring(0, 1)));
if (addFilter) {
inputStream = inputStream.filter((k, v) -> k != 100L);
}
final KStream<Long, String> joinedStream;
final KStream<Long, Long> countStream;
if (!addNames) {
countStream = inputStream.groupByKey(Grouped.with(longSerde, stringSerde))
.count()
.toStream();
joinedStream = inputStream.join(countStream, (v1, v2) -> v1 + v2.toString(),
JoinWindows.of(Duration.ofMillis(100)),
StreamJoined.with(longSerde, stringSerde, longSerde));
} else {
countStream = inputStream.groupByKey(Grouped.with("count", longSerde, stringSerde))
.count(Materialized.as("the-counting-store"))
.toStream();
joinedStream = inputStream.join(countStream, (v1, v2) -> v1 + v2.toString(),
JoinWindows.of(Duration.ofMillis(100)),
StreamJoined.with(longSerde, stringSerde, longSerde)
.withName("join").withStoreName("the-join-store"));
}
joinedStream.to(joinTopic, Produced.with(longSerde, stringSerde));
countStream.map((k,v) -> KeyValue.pair(k.toString(), v.toString())).to(outputTopic, Produced.with(stringSerde, stringSerde));
return builder.build();
}
/**
* Build a {@link Grouped} using the keySerde and valueSerde of the pair
*/
public Grouped<K, V> toGrouped() {
return Grouped.with(keySerde, valueSerde);
}