org.springframework.boot.test.context.FilteredClassLoader#org.apache.kafka.streams.KafkaStreams源码实例Demo

下面列出了org.springframework.boot.test.context.FilteredClassLoader#org.apache.kafka.streams.KafkaStreams 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Produces
@ApplicationScoped
public ExtReadOnlyKeyValueStore<String, Str.Data> storageKeyValueStore(
    KafkaStreams streams,
    HostInfo storageLocalHost,
    StreamsProperties properties,
    FilterPredicate<String, Str.Data> filterPredicate
) {
    return new DistributedReadOnlyKeyValueStore<>(
        streams,
        storageLocalHost,
        properties.getStorageStoreName(),
        Serdes.String(), ProtoSerde.parsedWith(Str.Data.parser()),
        new DefaultGrpcChannelProvider(),
        true,
        filterPredicate
    );
}
 
@Produces
@ApplicationScoped
public ReadOnlyKeyValueStore<Long, Str.TupleValue> globalIdKeyValueStore(
    KafkaStreams streams,
    HostInfo storageLocalHost,
    StreamsProperties properties
) {
    return new DistributedReadOnlyKeyValueStore<>(
        streams,
        storageLocalHost,
        properties.getGlobalIdStoreName(),
        Serdes.Long(), ProtoSerde.parsedWith(Str.TupleValue.parser()),
        new DefaultGrpcChannelProvider(),
        true,
        (filter, over, id, tuple) -> true
    );
}
 
@Produces
@ApplicationScoped
@Current
public AsyncBiFunctionService<Void, Void, KafkaStreams.State> stateService(
    KafkaStreams streams,
    HostInfo storageLocalHost,
    LocalService<AsyncBiFunctionService.WithSerdes<Void, Void, KafkaStreams.State>> localStateService
) {
    return new DistributedAsyncBiFunctionService<>(
        streams,
        storageLocalHost,
        "stateStore",
        localStateService,
        new DefaultGrpcChannelProvider()
    );
}
 
源代码4 项目: kiqr   文件: RangeKeyValuesQueryVerticleTest.java
@Test
public void notFoundWithNoResult(TestContext context){
    KafkaStreams streamMock = mock(KafkaStreams.class);
    ReadOnlyKeyValueStore<Object, Object> storeMock = mock(ReadOnlyKeyValueStore.class);
    KeyValueIterator<Object, Object> iteratorMock = mock(KeyValueIterator.class);
    when(streamMock.store(eq("store"), any(QueryableStoreType.class))).thenReturn(storeMock);
    SimpleKeyValueIterator iterator = new SimpleKeyValueIterator();
    when(storeMock.range(any(), any())).thenReturn(iterator);


    rule.vertx().deployVerticle(new RangeKeyValueQueryVerticle("host", streamMock), context.asyncAssertSuccess(deployment->{

        RangeKeyValueQuery query = new RangeKeyValueQuery("store", Serdes.String().getClass().getName(), Serdes.String().getClass().getName(), "key".getBytes(), "key".getBytes());

        rule.vertx().eventBus().send(Config.RANGE_KEY_VALUE_QUERY_ADDRESS_PREFIX + "host", query, context.asyncAssertSuccess(reply ->{

            context.assertTrue(reply.body() instanceof MultiValuedKeyValueQueryResponse);
            MultiValuedKeyValueQueryResponse response = (MultiValuedKeyValueQueryResponse) reply.body();
            context.assertEquals(0, response.getResults().size());
            context.assertTrue(iterator.closed);

        }));

    }));

}
 
源代码5 项目: kafka-streams-example   文件: MetricsResource.java
/**
 * Query local state store to extract metrics
 *
 * @return local Metrics
 */
private Metrics getLocalMetrics() {
    HostInfo thisInstance = GlobalAppState.getInstance().getHostPortInfo();
    KafkaStreams ks = GlobalAppState.getInstance().getKafkaStreams();

    String source = thisInstance.host() + ":" + thisInstance.port();
    Metrics localMetrics = new Metrics();

    ReadOnlyKeyValueStore<String, Double> averageStore = ks
            .store(storeName,
                    QueryableStoreTypes.<String, Double>keyValueStore());

    LOGGER.log(Level.INFO, "Entries in store {0}", averageStore.approximateNumEntries());
    KeyValueIterator<String, Double> storeIterator = averageStore.all();

    while (storeIterator.hasNext()) {
        KeyValue<String, Double> kv = storeIterator.next();
        localMetrics.add(source, kv.key, String.valueOf(kv.value));

    }
    LOGGER.log(Level.INFO, "Local store state {0}", localMetrics);
    return localMetrics;
}
 
源代码6 项目: apicurio-registry   文件: DistributedService.java
/**
 * @param streams                The {@link KafkaStreams} application
 * @param localApplicationServer The {@link HostInfo} derived from the
 *                               {@link StreamsConfig#APPLICATION_SERVER_CONFIG application.server}
 *                               configuration property of local kafka streams node for the streams application.
 *                               This is used to identify requests for local service, bypassing gRPC calls
 * @param storeName              The name of the store registered in the streams application and used for distribution
 *                               of keys among kafka streams processing nodes.
 * @param keySerde               the {@link Serde} for keys of the service which are also the distribution keys of the
 *                               corresponding store.
 * @param grpcChannelProvider    A function that establishes gRPC {@link Channel} to a remote service
 *                               for the given {@link HostInfo} parameter
 * @param parallel               {@code true} if service calls that need to dispatch to many local services in
 *                               the cluster are to be performed in parallel
 */
public DistributedService(
    KafkaStreams streams,
    HostInfo localApplicationServer,
    String storeName,
    Serde<K> keySerde,
    Function<? super HostInfo, ? extends Channel> grpcChannelProvider,
    boolean parallel
) {
    this.streams = Objects.requireNonNull(streams, "streams");
    this.localApplicationServer = Objects.requireNonNull(localApplicationServer, "localApplicationServer");
    this.storeName = Objects.requireNonNull(storeName, "storeName");
    this.keySerde = Objects.requireNonNull(keySerde, "keySerde");
    this.grpcChannelProvider = Objects.requireNonNull(grpcChannelProvider, "grpcChannelProvider");
    this.parallel = parallel;
}
 
/**
 * @param streams                The {@link KafkaStreams} application
 * @param localApplicationServer The {@link HostInfo} derived from the
 *                               {@link StreamsConfig#APPLICATION_SERVER_CONFIG application.server}
 *                               configuration property of local kafka streams node for the streams application.
 *                               This is used to identify requests for local store, bypassing gRPC calls
 * @param storeName              The name of the {@link ReadOnlyKeyValueStore} registered in the streams application
 * @param keySerde               The {@link Serde} for keys of the store
 * @param valSerde               The {@link Serde} for values of the store
 * @param grpcChannelProvider    A function that establishes gRPC {@link Channel} to a remote store service
 *                               for the given {@link HostInfo} parameter
 * @param parallel               {@code true} if lookups that need to query many stores in the cluster are
 *                               to be performed in parallel
 * @param filterPredicate        filter predicate to filter out keys and values
 */
public DistributedReadOnlyKeyValueStore(
    KafkaStreams streams,
    HostInfo localApplicationServer,
    String storeName,
    Serde<K> keySerde, Serde<V> valSerde,
    Function<? super HostInfo, ? extends Channel> grpcChannelProvider,
    boolean parallel,
    FilterPredicate<K, V> filterPredicate
) {
    super(
        streams,
        localApplicationServer,
        storeName,
        keySerde,
        valSerde,
        grpcChannelProvider,
        parallel
    );
    this.filterPredicate = filterPredicate;
}
 
源代码8 项目: jMetalSP   文件: KafkaRuntime.java
@Override
public void startStreamingDataSources(List<StreamingDataSource<?>> streamingDataSourceList) {
    for (StreamingDataSource<?> streamingDataSource : streamingDataSourceList) {
        ((KafkaStreamingDataSource)streamingDataSource).setStreamingBuilder(streamsBuilder);
        ((KafkaStreamingDataSource)streamingDataSource).setTopic(topic);
        streamingDataSource.run();
    }

    //streamingContext.start();
    KafkaStreams streams= new KafkaStreams(streamsBuilder.build(),config);

    try {
        streams.start();;
    } catch (Exception e) {
        e.printStackTrace();
    }
}
 
public static void main(String[] args) throws Exception {
    Properties kafkaStreamProperties = new Properties();
    kafkaStreamProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-wordCount");
    kafkaStreamProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    kafkaStreamProperties.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
    kafkaStreamProperties.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    kafkaStreamProperties.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

    Serde<String> stringSerde = Serdes.String();
    Serde<Long> longSerde = Serdes.Long();

    KStreamBuilder streamTopology = new KStreamBuilder();
    KStream<String, String> topicRecords = streamTopology.stream(stringSerde, stringSerde, "input");
    KStream<String, Long> wordCounts = topicRecords
            .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
            .map((key, word) -> new KeyValue<>(word, word))
            .countByKey("Count")
            .toStream();
    wordCounts.to(stringSerde, longSerde, "wordCount");

    KafkaStreams streamManager = new KafkaStreams(streamTopology, kafkaStreamProperties);
    streamManager.start();

    Runtime.getRuntime().addShutdownHook(new Thread(streamManager::close));
}
 
源代码10 项目: javatech   文件: StreamDemo.java
public static void main(String[] args) {
	// 1. 指定流的配置
	Properties config = new Properties();
	config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
	config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, HOST);
	config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
	config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

	// 设置流构造器
	StreamsBuilder builder = new StreamsBuilder();
	KStream<String, String> textLines = builder.stream("TextLinesTopic");
	KTable<String, Long> wordCounts = textLines
		.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
		.groupBy((key, word) -> word)
		.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
	wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));

	// 根据流构造器和流配置初始化 Kafka 流
	KafkaStreams streams = new KafkaStreams(builder.build(), config);
	streams.start();
}
 
@Produces
@Singleton
public KafkaStreams storageStreams(
    StreamsProperties properties,
    ForeachAction<? super String, ? super Str.Data> dataDispatcher,
    ArtifactTypeUtilProviderFactory factory
) {
    Topology topology = new StreamsTopologyProvider(properties, dataDispatcher, factory).get();

    KafkaStreams streams = new KafkaStreams(topology, properties.getProperties());
    streams.setGlobalStateRestoreListener(new LoggingStateRestoreListener());

    return streams;
}
 
源代码12 项目: hdinsight-kafka-java-get-started   文件: Stream.java
public static void main( String[] args ) {
    Properties streamsConfig = new Properties();
    // The name must be unique on the Kafka cluster
    streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example");
    // Brokers
    streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
    // Zookeeper
    //streamsConfig.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, args[1]);
    // SerDes for key and values
    streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

    // Serdes for the word and count
    Serde<String> stringSerde = Serdes.String();
    Serde<Long> longSerde = Serdes.Long();

    KStreamBuilder builder = new KStreamBuilder();
    KStream<String, String> sentences = builder.stream(stringSerde, stringSerde, "test");
    KStream<String, Long> wordCounts = sentences
            .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
            .map((key, word) -> new KeyValue<>(word, word))
            .countByKey("Counts")
            .toStream();
    wordCounts.to(stringSerde, longSerde, "wordcounts");

    KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
    streams.start();

    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
 
源代码13 项目: football-events   文件: UiApplication.java
@Bean
public KafkaStreams kafkaStreams() {
    StreamsBuilder streamsBuilder = new StreamsBuilder();
    StatisticsKeeper statisticsBuilder = new StatisticsKeeper(streamsBuilder, stomp);
    statisticsBuilder.build();
    Topology topology = streamsBuilder.build();
    KafkaStreamsStarter starter = new KafkaStreamsStarter(kafkaBootstrapAddress, topology, APP_ID);
    starter.setKafkaTimeout(kafkaTimeout);
    starter.setStreamsStartupTimeout(streamsStartupTimeout);
    return starter.start();
}
 
@Produces
@Singleton
public LocalService<AsyncBiFunctionService.WithSerdes<Void, Void, KafkaStreams.State>> localStateService(
    StateService localService
) {
    return new LocalService<>(
        StateService.NAME,
        localService
    );
}
 
public void start() {
    KStreamBuilder builder = new KStreamBuilder();

    Serde<UUID> keySerde = new FressianSerde();
    Serde<Map> valSerde = new FressianSerde();

    KStream<UUID, Map> commands = builder.stream(keySerde, valSerde, commandsTopic);
    KStream<UUID, Map> customerEvents = commands
            .filter((id, command) -> command.get(new Keyword("action")).equals(new Keyword("create-customer")))
            .map((id, command) -> {
                logger.debug("Command received");
                Map userEvent = new HashMap(command);
                userEvent.put(new Keyword("action"), new Keyword("customer-created"));
                userEvent.put(new Keyword("parent"), id);
                Map userValue = (Map) userEvent.get(new Keyword("data"));
                userValue.put(new Keyword("id"), UUID.randomUUID());
                return new KeyValue<>(UUID.randomUUID(), userEvent);
    }).through(keySerde, valSerde, eventsTopic);

    KStream<UUID, Map> customers = customerEvents
            .map((id, event) -> {
                Map customer = (Map) event.get(new Keyword("data"));
                UUID customerId = (UUID) customer.get(new Keyword("id"));
                return new KeyValue<UUID, Map>(customerId, customer);
            });

    customers.through(keySerde, valSerde, customersTopic);

    StateStoreSupplier store = Stores.create("Customers")
            .withKeys(keySerde)
            .withValues(valSerde)
            .persistent()
            .build();
    builder.addStateStore(store);

    customers.process(customerStore, "Customers");

    this.kafkaStreams = new KafkaStreams(builder, kafkaStreamsConfig);
    this.kafkaStreams.start();
}
 
源代码16 项目: emodb   文件: KafkaStreamsService.java
@Override
protected final void doStart() {
    _streams = new KafkaStreams(topology(), _streamsConfiguration);
    _streams.setUncaughtExceptionHandler((thread, throwable) -> {
        _uncaughtException.compareAndSet(null, throwable);
        _fatalErrorEncountered.set(true);
        _streamsExceptionMeter.mark();
        _streams.close(Duration.ofMillis(1));
    });
    _streams.setStateListener(this);
    _streams.start();
    notifyStarted();
}
 
源代码17 项目: brave   文件: ITKafkaStreamsTracing.java
@Test
public void should_create_spans_from_stream_with_tracing_filter_not_predicate_false() {
  String inputTopic = testName.getMethodName() + "-input";
  String outputTopic = testName.getMethodName() + "-output";

  StreamsBuilder builder = new StreamsBuilder();
  builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()))
    .transform(kafkaStreamsTracing.filterNot("filterNot-2", (key, value) -> false))
    .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
  Topology topology = builder.build();

  KafkaStreams streams = buildKafkaStreams(topology);

  send(new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE));

  waitForStreamToRun(streams);

  MutableSpan spanInput = testSpanHandler.takeRemoteSpan(CONSUMER);
  assertThat(spanInput.tags()).containsEntry("kafka.topic", inputTopic);

  MutableSpan spanProcessor = testSpanHandler.takeLocalSpan();
  assertChildOf(spanProcessor, spanInput);
  assertThat(spanProcessor.tags()).containsEntry(KAFKA_STREAMS_FILTERED_TAG, "false");

  // the filter transformer returns true so record is not dropped

  MutableSpan spanOutput = testSpanHandler.takeRemoteSpan(PRODUCER);
  assertThat(spanOutput.tags()).containsEntry("kafka.topic", outputTopic);
  assertChildOf(spanOutput, spanProcessor);

  streams.close();
  streams.cleanUp();
}
 
public static void main(String[] args) {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, AppConfigs.applicationID);
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.bootstrapServers);
    props.put(StreamsConfig.STATE_DIR_CONFIG, AppConfigs.stateStoreName);

    StreamsBuilder streamsBuilder = new StreamsBuilder();
    KStream<String, Employee> KS0 = streamsBuilder.stream(AppConfigs.topicName,
        Consumed.with(AppSerdes.String(), AppSerdes.Employee()));

    KGroupedStream<String, Employee> KGS1 = KS0.groupBy(
        (k, v) -> v.getDepartment(),
        Serialized.with(AppSerdes.String(),
            AppSerdes.Employee()));

    KTable<String, DepartmentAggregate> KT2 = KGS1.aggregate(
        //Initializer
        () -> new DepartmentAggregate()
            .withEmployeeCount(0)
            .withTotalSalary(0)
            .withAvgSalary(0D),
        //Aggregator
        (k, v, aggV) -> new DepartmentAggregate()
            .withEmployeeCount(aggV.getEmployeeCount() + 1)
            .withTotalSalary(aggV.getTotalSalary() + v.getSalary())
            .withAvgSalary((aggV.getTotalSalary() + v.getSalary()) / (aggV.getEmployeeCount() + 1D)),
        //Serializer
        Materialized.<String, DepartmentAggregate, KeyValueStore<Bytes, byte[]>>as("agg-store")
            .withKeySerde(AppSerdes.String())
            .withValueSerde(AppSerdes.DepartmentAggregate())
    );

    KT2.toStream().foreach(
        (k, v) -> System.out.println("Key = " + k + " Value = " + v.toString()));

    KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), props);
    streams.start();
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

}
 
源代码19 项目: rya   文件: LocalQueryExecutorTest.java
@Test
public void stopQuery() throws Exception {
    // Test values.
    final String ryaInstance = "rya";
    final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true, false);

    // Mock the streams factory so that we can tell if the stop function is invoked by the executor.
    final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class);
    final KafkaStreams queryJob = mock(KafkaStreams.class);
    when(jobFactory.make(eq(ryaInstance), eq(query))).thenReturn(queryJob);

    // Start the executor that will be tested.
    final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
    executor.startAndWait();
    try {
        // Tell the executor to start the query.
        executor.startQuery(ryaInstance, query);

        // Tell the executor to stop the query.
        executor.stopQuery(query.getQueryId());

        // Show a job was stopped for that query's ID.
        verify(queryJob).close();
    } finally {
        executor.stopAndWait();
    }
}
 
源代码20 项目: tutorials   文件: KafkaStreamsLiveTest.java
@Test
@Ignore("it needs to have kafka broker running on local")
public void shouldTestKafkaStreams() throws InterruptedException {
    //given
    String inputTopic = "inputTopic";

    Properties streamsConfiguration = new Properties();
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    // Use a temporary directory for storing state, which will be automatically removed after the test.
    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());

    //when
    KStreamBuilder builder = new KStreamBuilder();
    KStream<String, String> textLines = builder.stream(inputTopic);
    Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);

    KTable<String, Long> wordCounts = textLines
            .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
            .groupBy((key, word) -> word)
            .count();

    wordCounts.foreach((word, count) -> System.out.println("word: " + word + " -> " + count));

    String outputTopic = "outputTopic";
    final Serde<String> stringSerde = Serdes.String();
    final Serde<Long> longSerde = Serdes.Long();
    wordCounts.to(stringSerde, longSerde, outputTopic);

    KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
    streams.start();

    //then
    Thread.sleep(30000);
    streams.close();
}
 
private KafkaStreams buildStreams(
    final OutputNode outputNode,
    final StreamsBuilder builder,
    final String applicationId,
    final KsqlConfig ksqlConfig,
    final Map<String, Object> overriddenProperties
) {
  Map<String, Object> newStreamsProperties = ksqlConfig.getKsqlStreamConfigProps();
  newStreamsProperties.putAll(overriddenProperties);
  newStreamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
  newStreamsProperties.put(
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
      ksqlConfig.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
  );
  newStreamsProperties.put(
      StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
      ksqlConfig.get(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)
  );
  newStreamsProperties.put(
      StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
      ksqlConfig.get(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)
  );

  final Integer timestampIndex = (Integer) ksqlConfig.get(KsqlConfig.KSQL_TIMESTAMP_COLUMN_INDEX);
  if (timestampIndex != null && timestampIndex >= 0) {
    outputNode.getSourceTimestampExtractionPolicy().applyTo(ksqlConfig, newStreamsProperties);
  }

  updateListProperty(
      newStreamsProperties,
      StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG),
      ConsumerCollector.class.getCanonicalName()
  );
  updateListProperty(
      newStreamsProperties,
      StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG),
      ProducerCollector.class.getCanonicalName()
  );
  return kafkaStreamsBuilder.buildKafkaStreams(builder, new StreamsConfig(newStreamsProperties));
}
 
源代码22 项目: zipkin-storage-kafka   文件: KafkaSpanStore.java
GetRemoteServiceNamesCall(KafkaStreams traceStoreStream, String serviceName,
    BiFunction<String, Integer, String> httpBaseUrl) {
  super(traceStoreStream, REMOTE_SERVICE_NAMES_STORE_NAME, httpBaseUrl,
      "/serviceNames/" + serviceName + "/remoteServiceNames", serviceName);
  this.traceStoreStream = traceStoreStream;
  this.serviceName = serviceName;
  this.httpBaseUrl = httpBaseUrl;
}
 
源代码23 项目: zipkin-storage-kafka   文件: KafkaSpanStore.java
GetTraceCall(KafkaStreams traceStoreStream,
    BiFunction<String, Integer, String> httpBaseUrl,
    String traceId) {
  super(traceStoreStream, TRACES_STORE_NAME, httpBaseUrl, String.format("/traces/%s", traceId),
      traceId);
  this.traceStoreStream = traceStoreStream;
  this.httpBaseUrl = httpBaseUrl;
  this.traceId = traceId;
}
 
源代码24 项目: zipkin-storage-kafka   文件: KafkaSpanStore.java
GetTraceManyCall(KafkaStreams traceStoreStream,
    BiFunction<String, Integer, String> httpBaseUrl,
    String traceIds) {
  super(traceStoreStream, TRACES_STORE_NAME, httpBaseUrl, "/traceMany?traceIds=" + traceIds);
  this.traceStoreStream = traceStoreStream;
  this.httpBaseUrl = httpBaseUrl;
  this.traceIds = traceIds;
}
 
源代码25 项目: brave   文件: ITKafkaStreamsTracing.java
@Test
public void should_create_spans_from_stream_with_tracing_peek() {
  String inputTopic = testName.getMethodName() + "-input";
  String outputTopic = testName.getMethodName() + "-output";

  long now = System.currentTimeMillis();

  StreamsBuilder builder = new StreamsBuilder();
  builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()))
    .transformValues(kafkaStreamsTracing.peek("peek-1", (key, value) -> {
      try {
        Thread.sleep(100L);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      tracing.tracer().currentSpan().annotate(now, "test");
    }))
    .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
  Topology topology = builder.build();

  KafkaStreams streams = buildKafkaStreams(topology);

  send(new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE));

  waitForStreamToRun(streams);

  MutableSpan spanInput = testSpanHandler.takeRemoteSpan(CONSUMER);
  assertThat(spanInput.tags()).containsEntry("kafka.topic", inputTopic);

  MutableSpan spanProcessor = testSpanHandler.takeLocalSpan();
  assertChildOf(spanProcessor, spanInput);
  assertThat(spanProcessor.annotations()).contains(entry(now, "test"));

  MutableSpan spanOutput = testSpanHandler.takeRemoteSpan(PRODUCER);
  assertThat(spanOutput.tags()).containsEntry("kafka.topic", outputTopic);
  assertChildOf(spanOutput, spanProcessor);

  streams.close();
  streams.cleanUp();
}
 
源代码26 项目: micrometer   文件: KafkaStreamsMetricsTest.java
@Test void shouldCreateMetersWithTags() {
    try (KafkaStreams kafkaStreams = createStreams()) {
        metrics = new KafkaStreamsMetrics(kafkaStreams, tags);
        MeterRegistry registry = new SimpleMeterRegistry();

        metrics.bindTo(registry);

        assertThat(registry.getMeters())
                .hasSizeGreaterThan(0)
                .extracting(meter -> meter.getId().getTag("app"))
                .allMatch(s -> s.equals("myapp"));
    }
}
 
public static void main(String[] args) throws Exception {


        StreamsConfig streamsConfig = new StreamsConfig(getProperties());
        Serde<String> stringSerde = Serdes.String();
        Serde<StockPerformance> stockPerformanceSerde = StreamsSerdes.StockPerformanceSerde();
        Serde<StockTransaction> stockTransactionSerde = StreamsSerdes.StockTransactionSerde();


        StreamsBuilder builder = new StreamsBuilder();

        String stocksStateStore = "stock-performance-store";
        double differentialThreshold = 0.02;

        KeyValueBytesStoreSupplier storeSupplier = Stores.lruMap(stocksStateStore, 100);
        StoreBuilder<KeyValueStore<String, StockPerformance>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), stockPerformanceSerde);

        builder.addStateStore(storeBuilder);

        builder.stream("stock-transactions", Consumed.with(stringSerde, stockTransactionSerde))
                .transform(() -> new StockPerformanceTransformer(stocksStateStore, differentialThreshold), stocksStateStore)
                .print(Printed.<String, StockPerformance>toSysOut().withLabel("StockPerformance"));

        //Uncomment this line and comment out the line above for writing to a topic
        //.to(stringSerde, stockPerformanceSerde, "stock-performance");


        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);
        MockDataProducer.produceStockTransactionsWithKeyFunction(50, 50, 25, StockTransaction::getSymbol);
        System.out.println("Stock Analysis KStream/Process API App Started");
        kafkaStreams.cleanUp();
        kafkaStreams.start();
        Thread.sleep(70000);
        System.out.println("Shutting down the Stock KStream/Process API Analysis App now");
        kafkaStreams.close();
        MockDataProducer.shutdown();
    }
 
源代码28 项目: SkaETL   文件: ProcessStreamService.java
public void createStreamSystemOut(String inputTopic) {

        StreamsBuilder builder = new StreamsBuilder();

        builder.stream(inputTopic, Consumed.with(Serdes.String(), GenericSerdes.jsonNodeSerde())).process(() -> new LoggingProcessor<>());

        KafkaStreams streams = new KafkaStreams(builder.build(), KafkaUtils.createKStreamProperties(getProcessConsumer().getIdProcess() + ProcessConstants.SYSOUT_PROCESS, getBootstrapServer()));
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
        streams.start();
        addStreams(streams);
    }
 
源代码29 项目: kafka-tutorials   文件: AggregatingSum.java
private void runRecipe(final String configPath) throws IOException {
  Properties envProps = this.loadEnvProperties(configPath);
  Properties streamProps = this.buildStreamsProperties(envProps);

  Topology topology = this.buildTopology(envProps, this.ticketSaleSerde(envProps));
  this.createTopics(envProps);

  final KafkaStreams streams = new KafkaStreams(topology, streamProps);
  final CountDownLatch latch = new CountDownLatch(1);

  // Attach shutdown handler to catch Control-C.
  Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
    @Override
    public void run() {
      streams.close();
      latch.countDown();
    }
  });

  try {
    streams.start();
    latch.await();
  } catch (Throwable e) {
    System.exit(1);
  }
  System.exit(0);

}
 
源代码30 项目: kafka-tutorials   文件: RunningAverage.java
private void run() {

    Properties envProps = this.loadEnvProperties();
    Properties streamProps = this.buildStreamsProperties(envProps);
    Topology topology = this.buildTopology(new StreamsBuilder(), envProps);

    this.createTopics(envProps);

    final KafkaStreams streams = new KafkaStreams(topology, streamProps);
    final CountDownLatch latch = new CountDownLatch(1);

    // Attach shutdown handler to catch Control-C.
    Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
      @Override
      public void run() {
        streams.close(Duration.ofSeconds(5));
        latch.countDown();
      }
    });

    try {
      streams.cleanUp();
      streams.start();
      latch.await();
    } catch (Throwable e) {
      System.exit(1);
    }
    System.exit(0);
  }