下面列出了org.springframework.boot.test.context.FilteredClassLoader#org.apache.kafka.streams.KafkaStreams 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Produces
@ApplicationScoped
public ExtReadOnlyKeyValueStore<String, Str.Data> storageKeyValueStore(
KafkaStreams streams,
HostInfo storageLocalHost,
StreamsProperties properties,
FilterPredicate<String, Str.Data> filterPredicate
) {
return new DistributedReadOnlyKeyValueStore<>(
streams,
storageLocalHost,
properties.getStorageStoreName(),
Serdes.String(), ProtoSerde.parsedWith(Str.Data.parser()),
new DefaultGrpcChannelProvider(),
true,
filterPredicate
);
}
@Produces
@ApplicationScoped
public ReadOnlyKeyValueStore<Long, Str.TupleValue> globalIdKeyValueStore(
KafkaStreams streams,
HostInfo storageLocalHost,
StreamsProperties properties
) {
return new DistributedReadOnlyKeyValueStore<>(
streams,
storageLocalHost,
properties.getGlobalIdStoreName(),
Serdes.Long(), ProtoSerde.parsedWith(Str.TupleValue.parser()),
new DefaultGrpcChannelProvider(),
true,
(filter, over, id, tuple) -> true
);
}
@Produces
@ApplicationScoped
@Current
public AsyncBiFunctionService<Void, Void, KafkaStreams.State> stateService(
KafkaStreams streams,
HostInfo storageLocalHost,
LocalService<AsyncBiFunctionService.WithSerdes<Void, Void, KafkaStreams.State>> localStateService
) {
return new DistributedAsyncBiFunctionService<>(
streams,
storageLocalHost,
"stateStore",
localStateService,
new DefaultGrpcChannelProvider()
);
}
@Test
public void notFoundWithNoResult(TestContext context){
KafkaStreams streamMock = mock(KafkaStreams.class);
ReadOnlyKeyValueStore<Object, Object> storeMock = mock(ReadOnlyKeyValueStore.class);
KeyValueIterator<Object, Object> iteratorMock = mock(KeyValueIterator.class);
when(streamMock.store(eq("store"), any(QueryableStoreType.class))).thenReturn(storeMock);
SimpleKeyValueIterator iterator = new SimpleKeyValueIterator();
when(storeMock.range(any(), any())).thenReturn(iterator);
rule.vertx().deployVerticle(new RangeKeyValueQueryVerticle("host", streamMock), context.asyncAssertSuccess(deployment->{
RangeKeyValueQuery query = new RangeKeyValueQuery("store", Serdes.String().getClass().getName(), Serdes.String().getClass().getName(), "key".getBytes(), "key".getBytes());
rule.vertx().eventBus().send(Config.RANGE_KEY_VALUE_QUERY_ADDRESS_PREFIX + "host", query, context.asyncAssertSuccess(reply ->{
context.assertTrue(reply.body() instanceof MultiValuedKeyValueQueryResponse);
MultiValuedKeyValueQueryResponse response = (MultiValuedKeyValueQueryResponse) reply.body();
context.assertEquals(0, response.getResults().size());
context.assertTrue(iterator.closed);
}));
}));
}
/**
* Query local state store to extract metrics
*
* @return local Metrics
*/
private Metrics getLocalMetrics() {
HostInfo thisInstance = GlobalAppState.getInstance().getHostPortInfo();
KafkaStreams ks = GlobalAppState.getInstance().getKafkaStreams();
String source = thisInstance.host() + ":" + thisInstance.port();
Metrics localMetrics = new Metrics();
ReadOnlyKeyValueStore<String, Double> averageStore = ks
.store(storeName,
QueryableStoreTypes.<String, Double>keyValueStore());
LOGGER.log(Level.INFO, "Entries in store {0}", averageStore.approximateNumEntries());
KeyValueIterator<String, Double> storeIterator = averageStore.all();
while (storeIterator.hasNext()) {
KeyValue<String, Double> kv = storeIterator.next();
localMetrics.add(source, kv.key, String.valueOf(kv.value));
}
LOGGER.log(Level.INFO, "Local store state {0}", localMetrics);
return localMetrics;
}
/**
* @param streams The {@link KafkaStreams} application
* @param localApplicationServer The {@link HostInfo} derived from the
* {@link StreamsConfig#APPLICATION_SERVER_CONFIG application.server}
* configuration property of local kafka streams node for the streams application.
* This is used to identify requests for local service, bypassing gRPC calls
* @param storeName The name of the store registered in the streams application and used for distribution
* of keys among kafka streams processing nodes.
* @param keySerde the {@link Serde} for keys of the service which are also the distribution keys of the
* corresponding store.
* @param grpcChannelProvider A function that establishes gRPC {@link Channel} to a remote service
* for the given {@link HostInfo} parameter
* @param parallel {@code true} if service calls that need to dispatch to many local services in
* the cluster are to be performed in parallel
*/
public DistributedService(
KafkaStreams streams,
HostInfo localApplicationServer,
String storeName,
Serde<K> keySerde,
Function<? super HostInfo, ? extends Channel> grpcChannelProvider,
boolean parallel
) {
this.streams = Objects.requireNonNull(streams, "streams");
this.localApplicationServer = Objects.requireNonNull(localApplicationServer, "localApplicationServer");
this.storeName = Objects.requireNonNull(storeName, "storeName");
this.keySerde = Objects.requireNonNull(keySerde, "keySerde");
this.grpcChannelProvider = Objects.requireNonNull(grpcChannelProvider, "grpcChannelProvider");
this.parallel = parallel;
}
/**
* @param streams The {@link KafkaStreams} application
* @param localApplicationServer The {@link HostInfo} derived from the
* {@link StreamsConfig#APPLICATION_SERVER_CONFIG application.server}
* configuration property of local kafka streams node for the streams application.
* This is used to identify requests for local store, bypassing gRPC calls
* @param storeName The name of the {@link ReadOnlyKeyValueStore} registered in the streams application
* @param keySerde The {@link Serde} for keys of the store
* @param valSerde The {@link Serde} for values of the store
* @param grpcChannelProvider A function that establishes gRPC {@link Channel} to a remote store service
* for the given {@link HostInfo} parameter
* @param parallel {@code true} if lookups that need to query many stores in the cluster are
* to be performed in parallel
* @param filterPredicate filter predicate to filter out keys and values
*/
public DistributedReadOnlyKeyValueStore(
KafkaStreams streams,
HostInfo localApplicationServer,
String storeName,
Serde<K> keySerde, Serde<V> valSerde,
Function<? super HostInfo, ? extends Channel> grpcChannelProvider,
boolean parallel,
FilterPredicate<K, V> filterPredicate
) {
super(
streams,
localApplicationServer,
storeName,
keySerde,
valSerde,
grpcChannelProvider,
parallel
);
this.filterPredicate = filterPredicate;
}
@Override
public void startStreamingDataSources(List<StreamingDataSource<?>> streamingDataSourceList) {
for (StreamingDataSource<?> streamingDataSource : streamingDataSourceList) {
((KafkaStreamingDataSource)streamingDataSource).setStreamingBuilder(streamsBuilder);
((KafkaStreamingDataSource)streamingDataSource).setTopic(topic);
streamingDataSource.run();
}
//streamingContext.start();
KafkaStreams streams= new KafkaStreams(streamsBuilder.build(),config);
try {
streams.start();;
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
Properties kafkaStreamProperties = new Properties();
kafkaStreamProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-wordCount");
kafkaStreamProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
kafkaStreamProperties.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
kafkaStreamProperties.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
kafkaStreamProperties.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();
KStreamBuilder streamTopology = new KStreamBuilder();
KStream<String, String> topicRecords = streamTopology.stream(stringSerde, stringSerde, "input");
KStream<String, Long> wordCounts = topicRecords
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, word) -> new KeyValue<>(word, word))
.countByKey("Count")
.toStream();
wordCounts.to(stringSerde, longSerde, "wordCount");
KafkaStreams streamManager = new KafkaStreams(streamTopology, kafkaStreamProperties);
streamManager.start();
Runtime.getRuntime().addShutdownHook(new Thread(streamManager::close));
}
public static void main(String[] args) {
// 1. 指定流的配置
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, HOST);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 设置流构造器
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("TextLinesTopic");
KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
// 根据流构造器和流配置初始化 Kafka 流
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
}
@Produces
@Singleton
public KafkaStreams storageStreams(
StreamsProperties properties,
ForeachAction<? super String, ? super Str.Data> dataDispatcher,
ArtifactTypeUtilProviderFactory factory
) {
Topology topology = new StreamsTopologyProvider(properties, dataDispatcher, factory).get();
KafkaStreams streams = new KafkaStreams(topology, properties.getProperties());
streams.setGlobalStateRestoreListener(new LoggingStateRestoreListener());
return streams;
}
public static void main( String[] args ) {
Properties streamsConfig = new Properties();
// The name must be unique on the Kafka cluster
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example");
// Brokers
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
// Zookeeper
//streamsConfig.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, args[1]);
// SerDes for key and values
streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Serdes for the word and count
Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> sentences = builder.stream(stringSerde, stringSerde, "test");
KStream<String, Long> wordCounts = sentences
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, word) -> new KeyValue<>(word, word))
.countByKey("Counts")
.toStream();
wordCounts.to(stringSerde, longSerde, "wordcounts");
KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
@Bean
public KafkaStreams kafkaStreams() {
StreamsBuilder streamsBuilder = new StreamsBuilder();
StatisticsKeeper statisticsBuilder = new StatisticsKeeper(streamsBuilder, stomp);
statisticsBuilder.build();
Topology topology = streamsBuilder.build();
KafkaStreamsStarter starter = new KafkaStreamsStarter(kafkaBootstrapAddress, topology, APP_ID);
starter.setKafkaTimeout(kafkaTimeout);
starter.setStreamsStartupTimeout(streamsStartupTimeout);
return starter.start();
}
@Produces
@Singleton
public LocalService<AsyncBiFunctionService.WithSerdes<Void, Void, KafkaStreams.State>> localStateService(
StateService localService
) {
return new LocalService<>(
StateService.NAME,
localService
);
}
public void start() {
KStreamBuilder builder = new KStreamBuilder();
Serde<UUID> keySerde = new FressianSerde();
Serde<Map> valSerde = new FressianSerde();
KStream<UUID, Map> commands = builder.stream(keySerde, valSerde, commandsTopic);
KStream<UUID, Map> customerEvents = commands
.filter((id, command) -> command.get(new Keyword("action")).equals(new Keyword("create-customer")))
.map((id, command) -> {
logger.debug("Command received");
Map userEvent = new HashMap(command);
userEvent.put(new Keyword("action"), new Keyword("customer-created"));
userEvent.put(new Keyword("parent"), id);
Map userValue = (Map) userEvent.get(new Keyword("data"));
userValue.put(new Keyword("id"), UUID.randomUUID());
return new KeyValue<>(UUID.randomUUID(), userEvent);
}).through(keySerde, valSerde, eventsTopic);
KStream<UUID, Map> customers = customerEvents
.map((id, event) -> {
Map customer = (Map) event.get(new Keyword("data"));
UUID customerId = (UUID) customer.get(new Keyword("id"));
return new KeyValue<UUID, Map>(customerId, customer);
});
customers.through(keySerde, valSerde, customersTopic);
StateStoreSupplier store = Stores.create("Customers")
.withKeys(keySerde)
.withValues(valSerde)
.persistent()
.build();
builder.addStateStore(store);
customers.process(customerStore, "Customers");
this.kafkaStreams = new KafkaStreams(builder, kafkaStreamsConfig);
this.kafkaStreams.start();
}
@Override
protected final void doStart() {
_streams = new KafkaStreams(topology(), _streamsConfiguration);
_streams.setUncaughtExceptionHandler((thread, throwable) -> {
_uncaughtException.compareAndSet(null, throwable);
_fatalErrorEncountered.set(true);
_streamsExceptionMeter.mark();
_streams.close(Duration.ofMillis(1));
});
_streams.setStateListener(this);
_streams.start();
notifyStarted();
}
@Test
public void should_create_spans_from_stream_with_tracing_filter_not_predicate_false() {
String inputTopic = testName.getMethodName() + "-input";
String outputTopic = testName.getMethodName() + "-output";
StreamsBuilder builder = new StreamsBuilder();
builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()))
.transform(kafkaStreamsTracing.filterNot("filterNot-2", (key, value) -> false))
.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
Topology topology = builder.build();
KafkaStreams streams = buildKafkaStreams(topology);
send(new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE));
waitForStreamToRun(streams);
MutableSpan spanInput = testSpanHandler.takeRemoteSpan(CONSUMER);
assertThat(spanInput.tags()).containsEntry("kafka.topic", inputTopic);
MutableSpan spanProcessor = testSpanHandler.takeLocalSpan();
assertChildOf(spanProcessor, spanInput);
assertThat(spanProcessor.tags()).containsEntry(KAFKA_STREAMS_FILTERED_TAG, "false");
// the filter transformer returns true so record is not dropped
MutableSpan spanOutput = testSpanHandler.takeRemoteSpan(PRODUCER);
assertThat(spanOutput.tags()).containsEntry("kafka.topic", outputTopic);
assertChildOf(spanOutput, spanProcessor);
streams.close();
streams.cleanUp();
}
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, AppConfigs.applicationID);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.bootstrapServers);
props.put(StreamsConfig.STATE_DIR_CONFIG, AppConfigs.stateStoreName);
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, Employee> KS0 = streamsBuilder.stream(AppConfigs.topicName,
Consumed.with(AppSerdes.String(), AppSerdes.Employee()));
KGroupedStream<String, Employee> KGS1 = KS0.groupBy(
(k, v) -> v.getDepartment(),
Serialized.with(AppSerdes.String(),
AppSerdes.Employee()));
KTable<String, DepartmentAggregate> KT2 = KGS1.aggregate(
//Initializer
() -> new DepartmentAggregate()
.withEmployeeCount(0)
.withTotalSalary(0)
.withAvgSalary(0D),
//Aggregator
(k, v, aggV) -> new DepartmentAggregate()
.withEmployeeCount(aggV.getEmployeeCount() + 1)
.withTotalSalary(aggV.getTotalSalary() + v.getSalary())
.withAvgSalary((aggV.getTotalSalary() + v.getSalary()) / (aggV.getEmployeeCount() + 1D)),
//Serializer
Materialized.<String, DepartmentAggregate, KeyValueStore<Bytes, byte[]>>as("agg-store")
.withKeySerde(AppSerdes.String())
.withValueSerde(AppSerdes.DepartmentAggregate())
);
KT2.toStream().foreach(
(k, v) -> System.out.println("Key = " + k + " Value = " + v.toString()));
KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
@Test
public void stopQuery() throws Exception {
// Test values.
final String ryaInstance = "rya";
final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true, false);
// Mock the streams factory so that we can tell if the stop function is invoked by the executor.
final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class);
final KafkaStreams queryJob = mock(KafkaStreams.class);
when(jobFactory.make(eq(ryaInstance), eq(query))).thenReturn(queryJob);
// Start the executor that will be tested.
final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
executor.startAndWait();
try {
// Tell the executor to start the query.
executor.startQuery(ryaInstance, query);
// Tell the executor to stop the query.
executor.stopQuery(query.getQueryId());
// Show a job was stopped for that query's ID.
verify(queryJob).close();
} finally {
executor.stopAndWait();
}
}
@Test
@Ignore("it needs to have kafka broker running on local")
public void shouldTestKafkaStreams() throws InterruptedException {
//given
String inputTopic = "inputTopic";
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Use a temporary directory for storing state, which will be automatically removed after the test.
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
//when
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream(inputTopic);
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
.groupBy((key, word) -> word)
.count();
wordCounts.foreach((word, count) -> System.out.println("word: " + word + " -> " + count));
String outputTopic = "outputTopic";
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
wordCounts.to(stringSerde, longSerde, outputTopic);
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
//then
Thread.sleep(30000);
streams.close();
}
private KafkaStreams buildStreams(
final OutputNode outputNode,
final StreamsBuilder builder,
final String applicationId,
final KsqlConfig ksqlConfig,
final Map<String, Object> overriddenProperties
) {
Map<String, Object> newStreamsProperties = ksqlConfig.getKsqlStreamConfigProps();
newStreamsProperties.putAll(overriddenProperties);
newStreamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
newStreamsProperties.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
ksqlConfig.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
);
newStreamsProperties.put(
StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
ksqlConfig.get(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)
);
newStreamsProperties.put(
StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
ksqlConfig.get(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)
);
final Integer timestampIndex = (Integer) ksqlConfig.get(KsqlConfig.KSQL_TIMESTAMP_COLUMN_INDEX);
if (timestampIndex != null && timestampIndex >= 0) {
outputNode.getSourceTimestampExtractionPolicy().applyTo(ksqlConfig, newStreamsProperties);
}
updateListProperty(
newStreamsProperties,
StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG),
ConsumerCollector.class.getCanonicalName()
);
updateListProperty(
newStreamsProperties,
StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG),
ProducerCollector.class.getCanonicalName()
);
return kafkaStreamsBuilder.buildKafkaStreams(builder, new StreamsConfig(newStreamsProperties));
}
GetRemoteServiceNamesCall(KafkaStreams traceStoreStream, String serviceName,
BiFunction<String, Integer, String> httpBaseUrl) {
super(traceStoreStream, REMOTE_SERVICE_NAMES_STORE_NAME, httpBaseUrl,
"/serviceNames/" + serviceName + "/remoteServiceNames", serviceName);
this.traceStoreStream = traceStoreStream;
this.serviceName = serviceName;
this.httpBaseUrl = httpBaseUrl;
}
GetTraceCall(KafkaStreams traceStoreStream,
BiFunction<String, Integer, String> httpBaseUrl,
String traceId) {
super(traceStoreStream, TRACES_STORE_NAME, httpBaseUrl, String.format("/traces/%s", traceId),
traceId);
this.traceStoreStream = traceStoreStream;
this.httpBaseUrl = httpBaseUrl;
this.traceId = traceId;
}
GetTraceManyCall(KafkaStreams traceStoreStream,
BiFunction<String, Integer, String> httpBaseUrl,
String traceIds) {
super(traceStoreStream, TRACES_STORE_NAME, httpBaseUrl, "/traceMany?traceIds=" + traceIds);
this.traceStoreStream = traceStoreStream;
this.httpBaseUrl = httpBaseUrl;
this.traceIds = traceIds;
}
@Test
public void should_create_spans_from_stream_with_tracing_peek() {
String inputTopic = testName.getMethodName() + "-input";
String outputTopic = testName.getMethodName() + "-output";
long now = System.currentTimeMillis();
StreamsBuilder builder = new StreamsBuilder();
builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()))
.transformValues(kafkaStreamsTracing.peek("peek-1", (key, value) -> {
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
tracing.tracer().currentSpan().annotate(now, "test");
}))
.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
Topology topology = builder.build();
KafkaStreams streams = buildKafkaStreams(topology);
send(new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE));
waitForStreamToRun(streams);
MutableSpan spanInput = testSpanHandler.takeRemoteSpan(CONSUMER);
assertThat(spanInput.tags()).containsEntry("kafka.topic", inputTopic);
MutableSpan spanProcessor = testSpanHandler.takeLocalSpan();
assertChildOf(spanProcessor, spanInput);
assertThat(spanProcessor.annotations()).contains(entry(now, "test"));
MutableSpan spanOutput = testSpanHandler.takeRemoteSpan(PRODUCER);
assertThat(spanOutput.tags()).containsEntry("kafka.topic", outputTopic);
assertChildOf(spanOutput, spanProcessor);
streams.close();
streams.cleanUp();
}
@Test void shouldCreateMetersWithTags() {
try (KafkaStreams kafkaStreams = createStreams()) {
metrics = new KafkaStreamsMetrics(kafkaStreams, tags);
MeterRegistry registry = new SimpleMeterRegistry();
metrics.bindTo(registry);
assertThat(registry.getMeters())
.hasSizeGreaterThan(0)
.extracting(meter -> meter.getId().getTag("app"))
.allMatch(s -> s.equals("myapp"));
}
}
public static void main(String[] args) throws Exception {
StreamsConfig streamsConfig = new StreamsConfig(getProperties());
Serde<String> stringSerde = Serdes.String();
Serde<StockPerformance> stockPerformanceSerde = StreamsSerdes.StockPerformanceSerde();
Serde<StockTransaction> stockTransactionSerde = StreamsSerdes.StockTransactionSerde();
StreamsBuilder builder = new StreamsBuilder();
String stocksStateStore = "stock-performance-store";
double differentialThreshold = 0.02;
KeyValueBytesStoreSupplier storeSupplier = Stores.lruMap(stocksStateStore, 100);
StoreBuilder<KeyValueStore<String, StockPerformance>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), stockPerformanceSerde);
builder.addStateStore(storeBuilder);
builder.stream("stock-transactions", Consumed.with(stringSerde, stockTransactionSerde))
.transform(() -> new StockPerformanceTransformer(stocksStateStore, differentialThreshold), stocksStateStore)
.print(Printed.<String, StockPerformance>toSysOut().withLabel("StockPerformance"));
//Uncomment this line and comment out the line above for writing to a topic
//.to(stringSerde, stockPerformanceSerde, "stock-performance");
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);
MockDataProducer.produceStockTransactionsWithKeyFunction(50, 50, 25, StockTransaction::getSymbol);
System.out.println("Stock Analysis KStream/Process API App Started");
kafkaStreams.cleanUp();
kafkaStreams.start();
Thread.sleep(70000);
System.out.println("Shutting down the Stock KStream/Process API Analysis App now");
kafkaStreams.close();
MockDataProducer.shutdown();
}
public void createStreamSystemOut(String inputTopic) {
StreamsBuilder builder = new StreamsBuilder();
builder.stream(inputTopic, Consumed.with(Serdes.String(), GenericSerdes.jsonNodeSerde())).process(() -> new LoggingProcessor<>());
KafkaStreams streams = new KafkaStreams(builder.build(), KafkaUtils.createKStreamProperties(getProcessConsumer().getIdProcess() + ProcessConstants.SYSOUT_PROCESS, getBootstrapServer()));
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
streams.start();
addStreams(streams);
}
private void runRecipe(final String configPath) throws IOException {
Properties envProps = this.loadEnvProperties(configPath);
Properties streamProps = this.buildStreamsProperties(envProps);
Topology topology = this.buildTopology(envProps, this.ticketSaleSerde(envProps));
this.createTopics(envProps);
final KafkaStreams streams = new KafkaStreams(topology, streamProps);
final CountDownLatch latch = new CountDownLatch(1);
// Attach shutdown handler to catch Control-C.
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
private void run() {
Properties envProps = this.loadEnvProperties();
Properties streamProps = this.buildStreamsProperties(envProps);
Topology topology = this.buildTopology(new StreamsBuilder(), envProps);
this.createTopics(envProps);
final KafkaStreams streams = new KafkaStreams(topology, streamProps);
final CountDownLatch latch = new CountDownLatch(1);
// Attach shutdown handler to catch Control-C.
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close(Duration.ofSeconds(5));
latch.countDown();
}
});
try {
streams.cleanUp();
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}