类org.apache.kafka.common.utils.Utils源码实例Demo

下面列出了怎么用org.apache.kafka.common.utils.Utils的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: kareldb   文件: KarelDbEngine.java
@SuppressWarnings("unchecked")
public static <T> T getConfiguredInstance(String className, Map<String, ?> configs) {
    try {
        Class<T> cls = (Class<T>) Class.forName(className);
        if (cls == null) {
            return null;
        }
        Object o = Utils.newInstance(cls);
        if (o instanceof Configurable) {
            ((Configurable) o).configure(configs);
        }
        return cls.cast(o);
    } catch (ClassNotFoundException e) {
        throw new RuntimeException(e);
    }
}
 
源代码2 项目: kareldb   文件: ClusterTestHarness.java
@Before
public void setUp() throws Exception {
    zookeeper = new EmbeddedZookeeper();
    zkConnect = String.format("localhost:%d", zookeeper.port());

    configs = new Vector<>();
    servers = new Vector<>();
    for (int i = 0; i < numBrokers; i++) {
        KafkaConfig config = getKafkaConfig(i);
        configs.add(config);

        KafkaServer server = TestUtils.createServer(config, Time.SYSTEM);
        servers.add(server);
    }

    String[] serverUrls = new String[servers.size()];
    ListenerName listenerType = ListenerName.forSecurityProtocol(getSecurityProtocol());
    for (int i = 0; i < servers.size(); i++) {
        serverUrls[i] =
            Utils.formatAddress(
                servers.get(i).config().advertisedListeners().head().host(),
                servers.get(i).boundPort(listenerType)
            );
    }
    bootstrapServers = Utils.join(serverUrls, ",");
}
 
源代码3 项目: kop   文件: KafkaRequestHandler.java
protected void handleSyncGroupRequest(KafkaHeaderAndRequest syncGroup,
                                      CompletableFuture<AbstractResponse> resultFuture) {
    checkArgument(syncGroup.getRequest() instanceof SyncGroupRequest);
    SyncGroupRequest request = (SyncGroupRequest) syncGroup.getRequest();

    groupCoordinator.handleSyncGroup(
        request.groupId(),
        request.generationId(),
        request.memberId(),
        CoreUtils.mapValue(
            request.groupAssignment(), Utils::toArray
        )
    ).thenAccept(syncGroupResult -> {
        SyncGroupResponse response = new SyncGroupResponse(
            syncGroupResult.getKey(),
            ByteBuffer.wrap(syncGroupResult.getValue())
        );

        resultFuture.complete(response);
    });
}
 
源代码4 项目: ad   文件: CustomPartitioner.java
/**
 * 决定消息被写入哪个分区
 * @param topic topic
 * @param key key
 * @param keyBytes key serialize byte
 * @param value value
 * @param valueBytes value serialize byte
 * @param cluster kakfa cluster
 * @return
 */
@Override
public int partition(String topic, Object key, byte[] keyBytes,
                     Object value, byte[] valueBytes, Cluster cluster) {
    // 所有分区信息
    List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
    int partitionCount = partitionInfos.size();
    // 要求必须存在 key,如果key 是"name" 就分配到最后一个分区, 其他key hash取模
    if (keyBytes == null || !key.getClass().equals(String.class)) {
        throw new InvalidRecordException("kafka message must have a String key");
    }
    if (partitionCount == 1 || StringUtils.endsWithIgnoreCase("name", key.toString())) {
        return partitionCount - 1;
    }
    return Math.abs(Utils.murmur2(keyBytes)) % (partitionCount - 1);
}
 
@Test
public void shouldSelectAllFromUsers() throws Exception {
  final QueuedQueryMetadata queryMetadata = executeQuery(
      "SELECT * from %s;", userTable);

  BlockingQueue<KeyValue<String, GenericRow>> rowQueue = queryMetadata.getRowQueue();

  Set<String> actualUsers = new HashSet<>();
  Set<String> expectedUsers = Utils.mkSet("USER_0", "USER_1", "USER_2", "USER_3", "USER_4");
  while (actualUsers.size() < expectedUsers.size()) {
    KeyValue<String, GenericRow> nextRow = rowQueue.poll();
    if (nextRow != null) {
      List<Object> columns = nextRow.value.getColumns();
      assertEquals(6, columns.size());
      actualUsers.add((String) columns.get(1));
    }
  }
  assertEquals(expectedUsers, actualUsers);
}
 
源代码6 项目: DataLink   文件: SessionHandler.java
private void handleJoinGroupRequest(ChannelHandlerContext ctx, Request request) {
    JoinGroupRequest joinGroupRequest = (JoinGroupRequest) request.getBody();
    ResponseHeader responseHeader = new ResponseHeader(request.getHeader().correlationId());

    List<ProtocolEntry> protocols = joinGroupRequest.groupProtocols().stream().map(protocol -> new ProtocolEntry(protocol.name(), Utils.toArray(protocol.metadata()))).collect(Collectors.toList());
    coordinator.handleJoinGroup(
            joinGroupRequest.groupId(),
            joinGroupRequest.memberId(),
            request.getHeader().clientId(),
            request.getClientAddress().toString(),
            joinGroupRequest.rebalanceTimeout(),
            joinGroupRequest.sessionTimeout(),
            joinGroupRequest.protocolType(),
            protocols,
            (joinResult) -> {
                Map<String, ByteBuffer> members = joinResult.getMembers().entrySet().stream().collect(Collectors.toMap(k -> k.getKey(), k -> ByteBuffer.wrap(k.getValue())));
                JoinGroupResponse responseBody = new JoinGroupResponse(request.getHeader().apiVersion(), joinResult.getErrorCode(), joinResult.getGenerationId(),
                        joinResult.getSubProtocol(), joinResult.getMemberId(), joinResult.getLeaderId(), members);

                logger.trace(String.format("Sending join group response %s for correlation id %d to client %s.",
                        responseBody, request.getHeader().correlationId(), request.getHeader().clientId()));

                sendResponse(ctx, new Response(responseHeader, responseBody));
            }
    );
}
 
源代码7 项目: mirus   文件: MirusOffsetTool.java
private static MirusOffsetTool newOffsetTool(Args args) throws IOException {
  // This needs to be the admin topic properties.
  // By default these are in the worker properties file, as this has the has admin producer and
  // consumer settings.  Separating these might be wise - also useful for storing state in
  // source cluster if it proves necessary.
  final Map<String, String> properties =
      !args.propertiesFile.isEmpty()
          ? Utils.propsToStringMap(Utils.loadProps(args.propertiesFile))
          : Collections.emptyMap();
  final DistributedConfig config = new DistributedConfig(properties);
  final KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
  offsetBackingStore.configure(config);

  // Avoid initializing the entire Kafka Connect plugin system by assuming the
  // internal.[key|value].converter is org.apache.kafka.connect.json.JsonConverter
  final Converter internalConverter = new JsonConverter();
  internalConverter.configure(config.originalsWithPrefix("internal.key.converter."), true);

  final OffsetSetter offsetSetter = new OffsetSetter(internalConverter, offsetBackingStore);
  final OffsetFetcher offsetFetcher = new OffsetFetcher(config, internalConverter);
  final OffsetSerDe offsetSerDe = OffsetSerDeFactory.create(args.format);

  return new MirusOffsetTool(args, offsetFetcher, offsetSetter, offsetSerDe);
}
 
源代码8 项目: kafka-graphs   文件: ClientUtils.java
/**
 * Create a temporary relative directory in the specified parent directory with the given prefix.
 *
 * @param parent The parent folder path name, if null using the default temporary-file directory
 * @param prefix The prefix of the temporary directory, if null using "kafka-" as default prefix
 * @return the temp dir
 */
public static File tempDirectory(final Path parent, String prefix) {
    final File file;
    prefix = prefix == null ? "kafka-" : prefix;
    try {
        file = parent == null
            ?
            Files.createTempDirectory(prefix).toFile()
            : Files.createTempDirectory(parent, prefix).toFile();
    } catch (final IOException ex) {
        throw new RuntimeException("Failed to create a temp dir", ex);
    }
    file.deleteOnExit();

    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        try {
            Utils.delete(file);
        } catch (IOException e) {
            log.error("Error deleting {}", file.getAbsolutePath(), e);
        }
    }));

    return file;
}
 
源代码9 项目: kcache   文件: ClusterTestHarness.java
@Before
public void setUp() throws Exception {
    zookeeper = new EmbeddedZookeeper();
    zkConnect = String.format("localhost:%d", zookeeper.port());

    configs = new Vector<>();
    servers = new Vector<>();
    for (int i = 0; i < numBrokers; i++) {
        KafkaConfig config = getKafkaConfig(i);
        configs.add(config);

        KafkaServer server = TestUtils.createServer(config, Time.SYSTEM);
        servers.add(server);
    }

    String[] serverUrls = new String[servers.size()];
    ListenerName listenerType = ListenerName.forSecurityProtocol(getSecurityProtocol());
    for (int i = 0; i < servers.size(); i++) {
        serverUrls[i] =
            Utils.formatAddress(
                servers.get(i).config().advertisedListeners().head().host(),
                servers.get(i).boundPort(listenerType)
            );
    }
    bootstrapServers = Utils.join(serverUrls, ",");
}
 
源代码10 项目: common-docker   文件: ClusterStatusSASLTest.java
@Test(timeout = 120000)
public void isKafkaReadyWithSASLAndSSL() throws Exception {
  Properties clientSecurityProps = kafka.getClientSecurityConfig();

  Map<String, String> config = Utils.propsToStringMap(clientSecurityProps);
  config.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapBroker
      (SecurityProtocol.SASL_SSL));

  // Set password and enabled protocol as the Utils.propsToStringMap just returns toString()
  // representations and these properties don't have a valid representation.
  Password trustStorePassword = (Password) clientSecurityProps.get("ssl.truststore.password");
  config.put("ssl.truststore.password", trustStorePassword.value());
  config.put("ssl.enabled.protocols", "TLSv1.2");

  assertThat(ClusterStatus.isKafkaReady(config, 3, 10000)).isTrue();
}
 
源代码11 项目: common-docker   文件: ClusterStatusSASLTest.java
@Test(timeout = 120000)
public void isKafkaReadyWithSASLAndSSLUsingZK() throws Exception {
  Properties clientSecurityProps = kafka.getClientSecurityConfig();

  boolean zkReady = ClusterStatus.isZookeeperReady(this.kafka.getZookeeperConnectString(), 30000);
  if (!zkReady) {
    throw new RuntimeException(
        "Could not reach zookeeper " + this.kafka.getZookeeperConnectString());
  }
  Map<String, String> endpoints = ClusterStatus.getKafkaEndpointFromZookeeper(
      this.kafka.getZookeeperConnectString(),
      30000
  );

  String bootstrap_broker = endpoints.get("SASL_SSL");
  Map<String, String> config = Utils.propsToStringMap(clientSecurityProps);
  config.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrap_broker);

  // Set password and enabled protocol as the Utils.propsToStringMap just returns toString()
  // representations and these properties don't have a valid representation.
  Password trustStorePassword = (Password) clientSecurityProps.get("ssl.truststore.password");
  config.put("ssl.truststore.password", trustStorePassword.value());
  config.put("ssl.enabled.protocols", "TLSv1.2");

  assertThat(ClusterStatus.isKafkaReady(config, 3, 10000)).isTrue();
}
 
public <T> T getConfiguredInstance(String key, Class<T> t, Producer<byte[], byte[]> producer) {
  Class<?> c = getClass(key);
  if (c == null) {
    return null;
  }
  Object o = Utils.newInstance(c);

  if (!t.isInstance(o)) {
    throw new KafkaException(c.getName() + " is not an instance of " + t.getName());
  }

  if (o instanceof Configurable) {
    ((Configurable) o).configure(configsWithCurrentProducer(producer));
  }

  return t.cast(o);
}
 
@SuppressWarnings("unchecked")
protected <T> T newInstance(Map<String, ?> map, String key, Class<T> klass) throws KafkaException {
    Object val = map.get(key);
    if (val == null) {
        throw new KafkaException("No value for '" + key + "' found");
    } else if (val instanceof String) {
        try {
            return (T) Utils.newInstance(Class.forName((String) val));
        } catch (Exception e) {
            throw new KafkaException(e);
        }
    } else if (val instanceof Class) {
        return (T) Utils.newInstance((Class<T>) val);
    } else {
        throw new KafkaException("Unexpected type '" + val.getClass() + "' for '" + key + "'");
    }
}
 
源代码14 项目: hello-kafka-streams   文件: ConnectEmbedded.java
public ConnectEmbedded(Properties workerConfig, Properties... connectorConfigs) throws Exception {
    Time time = new SystemTime();
    DistributedConfig config = new DistributedConfig(Utils.propsToStringMap(workerConfig));

    KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
    offsetBackingStore.configure(config);

    //not sure if this is going to work but because we don't have advertised url we can get at least a fairly random
    String workerId = UUID.randomUUID().toString();
    worker = new Worker(workerId, time, config, offsetBackingStore);

    StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, worker.getInternalValueConverter());
    statusBackingStore.configure(config);

    ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(worker.getInternalValueConverter());
    configBackingStore.configure(config);

    //advertisedUrl = "" as we don't have the rest server - hopefully this will not break anything
    herder = new DistributedHerder(config, time, worker, statusBackingStore, configBackingStore, "");
    this.connectorConfigs = connectorConfigs;

    shutdownHook = new ShutdownHook();
}
 
private Serde<?> getKeySerde(String keySerdeString) {
	Serde<?> keySerde;
	try {
		if (StringUtils.hasText(keySerdeString)) {
			keySerde = Utils.newInstance(keySerdeString, Serde.class);
		}
		else {
			keySerde = getFallbackSerde("default.key.serde");
		}
		keySerde.configure(this.streamConfigGlobalProperties, true);

	}
	catch (ClassNotFoundException ex) {
		throw new IllegalStateException("Serde class not found: ", ex);
	}
	return keySerde;
}
 
private Serde<?> getKeySerde(String keySerdeString, ResolvableType resolvableType) {
	Serde<?> keySerde = null;
	try {
		if (StringUtils.hasText(keySerdeString)) {
			keySerde = Utils.newInstance(keySerdeString, Serde.class);
		}
		else {
			if (resolvableType != null &&
					(isResolvalbeKafkaStreamsType(resolvableType) || isResolvableKStreamArrayType(resolvableType))) {
				ResolvableType generic = resolvableType.isArray() ? resolvableType.getComponentType().getGeneric(0) : resolvableType.getGeneric(0);
				Serde<?> fallbackSerde = getFallbackSerde("default.key.serde");
				keySerde = getSerde(generic, fallbackSerde);
			}
			if (keySerde == null) {
				keySerde = Serdes.ByteArray();
			}
		}
		keySerde.configure(this.streamConfigGlobalProperties, true);
	}
	catch (ClassNotFoundException ex) {
		throw new IllegalStateException("Serde class not found: ", ex);
	}
	return keySerde;
}
 
@SuppressWarnings("unchecked")
private Serde<?> getValueSerde(String valueSerdeString, ResolvableType resolvableType)
		throws ClassNotFoundException {
	Serde<?> valueSerde = null;
	if (StringUtils.hasText(valueSerdeString)) {
		valueSerde = Utils.newInstance(valueSerdeString, Serde.class);
	}
	else {

		if (resolvableType != null && ((isResolvalbeKafkaStreamsType(resolvableType)) ||
				(isResolvableKStreamArrayType(resolvableType)))) {
			Serde<?> fallbackSerde = getFallbackSerde("default.value.serde");
			ResolvableType generic = resolvableType.isArray() ? resolvableType.getComponentType().getGeneric(1) : resolvableType.getGeneric(1);
			valueSerde = getSerde(generic, fallbackSerde);
		}
		if (valueSerde == null) {

			valueSerde = Serdes.ByteArray();
		}
	}
	valueSerde.configure(streamConfigGlobalProperties, false);
	return valueSerde;
}
 
源代码18 项目: emodb   文件: DefaultMegabusSource.java
@VisibleForTesting
public List<Future> asyncSendFutures(List<Coordinate> coordinateList) {
    List<Future> futures = coordinateList
            .stream()
            // Using the minimum UUID here to make sure the time is always beyond the FCL so that the resolver is certain to put the document in to actual Megabus.
            // This way we wouldn't be in a situation where there is a ref in Ref topic but not in the Megabus topic.
            .map(coordinate -> new MegabusRef(coordinate.getTable(), coordinate.getId(), TimeUUIDs.minimumUuid(), _clock.instant(), MegabusRef.RefType.TOUCH))
            .collect(Collectors.groupingBy(ref -> {
                String key = Coordinate.of(ref.getTable(), ref.getKey()).toString();
                return Utils.toPositive(Utils.murmur2(key.getBytes())) % _topic.getPartitions();
            }, Collectors.toList()))
            .entrySet()
            .stream()
            .map(entry -> _producer.send(new ProducerRecord<>(_topic.getName(), entry.getKey(), TimeUUIDs.newUUID().toString(),
                    _objectMapper.valueToTree(entry.getValue()))))
            .collect(Collectors.toList());

    return futures;
}
 
源代码19 项目: beam   文件: KafkaIOTest.java
@SuppressWarnings("unchecked")
@Override
public Producer<Integer, Long> apply(Map<String, Object> config) {

  // Make sure the config is correctly set up for serializers.
  Utils.newInstance(
          (Class<? extends Serializer<?>>)
              ((Class<?>) config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
                  .asSubclass(Serializer.class))
      .configure(config, true);

  Utils.newInstance(
          (Class<? extends Serializer<?>>)
              ((Class<?>) config.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
                  .asSubclass(Serializer.class))
      .configure(config, false);

  // Returning same producer in each instance in a pipeline seems to work fine currently.
  // If DirectRunner creates multiple DoFn instances for sinks, we might need to handle
  // it appropriately. I.e. allow multiple producers for each producerKey and concatenate
  // all the messages written to each producer for verification after the pipeline finishes.

  return MOCK_PRODUCER_MAP.get(producerKey);
}
 
源代码20 项目: camel-kafka-connector   文件: KafkaConnectRunner.java
/**
 * here does not seem to be a public interface for embedding a Kafka connect runtime,
 * therefore, this code is modeled from the behavior taken from
 * https://github.com/apache/kafka/blob/2.1/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
 * and performs the initialization in a roughly similar manner.
 *
 */
private void init() {
    LOG.info("Started worked initialization");

    Time time = Time.SYSTEM;

    // Initializes the system runtime information and logs some of the information
    WorkerInfo initInfo = new WorkerInfo();
    initInfo.logAll();

    Properties props = kafkaConnectPropertyFactory.getProperties();

    Map<String, String> standAloneProperties = Utils.propsToStringMap(props);

    // Not needed, but we need this one to initialize the worker
    Plugins plugins = new Plugins(standAloneProperties);

    StandaloneConfig config = new StandaloneConfig(standAloneProperties);
    String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
    AllConnectorClientConfigOverridePolicy allConnectorClientConfigOverridePolicy = new AllConnectorClientConfigOverridePolicy();

    RestServer rest = new RestServer(config);
    rest.initializeServer();

    /*
     According to the Kafka source code "... Worker runs a (dynamic) set of tasks
     in a set of threads, doing the work of actually moving data to/from Kafka ..."
     */
    Worker worker = new Worker(bootstrapServer, time, plugins, config, new FileOffsetBackingStore(), allConnectorClientConfigOverridePolicy);

    /*
    From Kafka source code: " ... The herder interface tracks and manages workers
    and connectors ..."
     */
    herder = new StandaloneHerder(worker, kafkaClusterId, allConnectorClientConfigOverridePolicy);
    connect = new Connect(herder, rest);
    LOG.info("Finished initializing the worker");
}
 
源代码21 项目: camel-kafka-connector   文件: KafkaConnectRunner.java
public void initializeConnector(ConnectorPropertyFactory connectorPropertyFactory,
                                Consumer<ConnectorInitState> callback) throws ExecutionException, InterruptedException {
    Properties connectorProps = connectorPropertyFactory.getProperties();

    FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>((error, info) ->
            callback.accept(new ConnectorInitState(info.result().config(), info.created(), error)));

    herder.putConnectorConfig(
            connectorProps.getProperty(ConnectorConfig.NAME_CONFIG),
            Utils.propsToStringMap(connectorProps), false, cb);

    cb.get();
}
 
源代码22 项目: camel-kafka-connector   文件: KafkaConnectRunner.java
public <T> void initializeConnector(ConnectorPropertyFactory connectorPropertyFactory,
                                    BiConsumer<ConnectorInitState, T> callback, T payload) throws ExecutionException, InterruptedException {
    Properties connectorProps = connectorPropertyFactory.getProperties();

    FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>((error, info) ->
            callback.accept(new ConnectorInitState(info.result().config(), info.created(), error), payload));

    herder.putConnectorConfig(
            connectorProps.getProperty(ConnectorConfig.NAME_CONFIG),
            Utils.propsToStringMap(connectorProps), false, cb);

    cb.get();
}
 
@Override
public void ensureValid(String name, Object invocationMode) {
    try {
        InvocationMode.valueOf(((String)invocationMode).trim());
    } catch (Exception e) {
        throw new ConfigException(name, invocationMode, "Value must be one of [" +
            Utils.join(InvocationMode.values(), ", ") + "]");
    }
}
 
@Override
public void ensureValid(String name, Object invocationFailure) {
    try {
        InvocationFailure.valueOf(((String)invocationFailure).trim());
    } catch (Exception e) {
        throw new ConfigException(name, invocationFailure, "Value must be one of [" +
            Utils.join(InvocationFailure.values(), ", ") + "]");
    }
}
 
源代码25 项目: kafka_book_demo   文件: DemoPartitioner.java
@Override
public int partition(String topic, Object key, byte[] keyBytes,
                     Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (null == keyBytes) {
        return counter.getAndIncrement() % numPartitions;
    } else
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
 
@Test
@SuppressWarnings("unchecked")
public void shouldExtractCorrectSourceForSimpleQuery() {
  PlanNode planNode = logicalPlanBuilder.buildLogicalPlan("select col0 from TEST2 limit 5;");
  PlanSourceExtractorVisitor planSourceExtractorVisitor = new PlanSourceExtractorVisitor();
  planSourceExtractorVisitor.process(planNode, null);
  Set<String> sourceNames = planSourceExtractorVisitor.getSourceNames();
  assertThat(sourceNames.size(), equalTo(1));
  assertThat(sourceNames, equalTo(Utils.mkSet("TEST2")));
}
 
@Test
@SuppressWarnings("unchecked")
public void shouldExtractCorrectSourceForJoinQuery() {
  PlanNode planNode = logicalPlanBuilder.buildLogicalPlan(
      "SELECT t1.col1, t2.col1, t1.col4, t2.col2 FROM test1 t1 LEFT JOIN "
                        + "test2 t2 ON t1.col1 = t2.col1;");
  PlanSourceExtractorVisitor planSourceExtractorVisitor = new PlanSourceExtractorVisitor();
  planSourceExtractorVisitor.process(planNode, null);
  Set<String> sourceNames = planSourceExtractorVisitor.getSourceNames();
  assertThat(sourceNames, equalTo(Utils.mkSet("TEST1", "TEST2")));
}
 
@Test
public void shouldHaveLeftJoin() {
  setupTopicClientExpectations(1, 1);
  buildJoin();
  final Topology topology = builder.build();
  final TopologyDescription.Processor leftJoin
      = (TopologyDescription.Processor) getNodeByName(topology, "KSTREAM-LEFTJOIN-0000000014");
  final List<String> predecessors = leftJoin.predecessors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
  assertThat(leftJoin.stores(), equalTo(Utils.mkSet("KSTREAM-REDUCE-STATE-STORE-0000000003")));
  assertThat(predecessors, equalTo(Collections.singletonList("KSTREAM-SOURCE-0000000013")));
}
 
@Test
public void shouldUpdateReferentialIntegrityTableCorrectly() throws Exception {
  ksqlEngine.createQueries("create table bar as select * from test2;" +
                                 "create table foo as select * from test2;");
  MetaStore metaStore = ksqlEngine.getMetaStore();
  assertThat(metaStore.getQueriesWithSource("TEST2"),
             equalTo(Utils.mkSet("CTAS_BAR", "CTAS_FOO")));
  assertThat(metaStore.getQueriesWithSink("BAR"), equalTo(Utils.mkSet("CTAS_BAR")));
  assertThat(metaStore.getQueriesWithSink("FOO"), equalTo(Utils.mkSet("CTAS_FOO")));
}
 
@Test
public void shouldRetryListTopics() {
  expect(adminClient.listTopics()).andReturn(listTopicResultWithNotControllerException()).once();
  expect(adminClient.listTopics()).andReturn(getListTopicsResult());
  replay(adminClient);
  KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient);
  Set<String> names = kafkaTopicClient.listTopicNames();
  assertThat(names, equalTo(Utils.mkSet(topicName1, topicName2, topicName3)));
  verify(adminClient);
}
 
 类所在包
 类方法
 同包方法