下面列出了怎么用org.apache.kafka.common.utils.Utils的API类实例代码及写法,或者点击链接到github查看源代码。
@SuppressWarnings("unchecked")
public static <T> T getConfiguredInstance(String className, Map<String, ?> configs) {
try {
Class<T> cls = (Class<T>) Class.forName(className);
if (cls == null) {
return null;
}
Object o = Utils.newInstance(cls);
if (o instanceof Configurable) {
((Configurable) o).configure(configs);
}
return cls.cast(o);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
@Before
public void setUp() throws Exception {
zookeeper = new EmbeddedZookeeper();
zkConnect = String.format("localhost:%d", zookeeper.port());
configs = new Vector<>();
servers = new Vector<>();
for (int i = 0; i < numBrokers; i++) {
KafkaConfig config = getKafkaConfig(i);
configs.add(config);
KafkaServer server = TestUtils.createServer(config, Time.SYSTEM);
servers.add(server);
}
String[] serverUrls = new String[servers.size()];
ListenerName listenerType = ListenerName.forSecurityProtocol(getSecurityProtocol());
for (int i = 0; i < servers.size(); i++) {
serverUrls[i] =
Utils.formatAddress(
servers.get(i).config().advertisedListeners().head().host(),
servers.get(i).boundPort(listenerType)
);
}
bootstrapServers = Utils.join(serverUrls, ",");
}
protected void handleSyncGroupRequest(KafkaHeaderAndRequest syncGroup,
CompletableFuture<AbstractResponse> resultFuture) {
checkArgument(syncGroup.getRequest() instanceof SyncGroupRequest);
SyncGroupRequest request = (SyncGroupRequest) syncGroup.getRequest();
groupCoordinator.handleSyncGroup(
request.groupId(),
request.generationId(),
request.memberId(),
CoreUtils.mapValue(
request.groupAssignment(), Utils::toArray
)
).thenAccept(syncGroupResult -> {
SyncGroupResponse response = new SyncGroupResponse(
syncGroupResult.getKey(),
ByteBuffer.wrap(syncGroupResult.getValue())
);
resultFuture.complete(response);
});
}
/**
* 决定消息被写入哪个分区
* @param topic topic
* @param key key
* @param keyBytes key serialize byte
* @param value value
* @param valueBytes value serialize byte
* @param cluster kakfa cluster
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
// 所有分区信息
List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
int partitionCount = partitionInfos.size();
// 要求必须存在 key,如果key 是"name" 就分配到最后一个分区, 其他key hash取模
if (keyBytes == null || !key.getClass().equals(String.class)) {
throw new InvalidRecordException("kafka message must have a String key");
}
if (partitionCount == 1 || StringUtils.endsWithIgnoreCase("name", key.toString())) {
return partitionCount - 1;
}
return Math.abs(Utils.murmur2(keyBytes)) % (partitionCount - 1);
}
@Test
public void shouldSelectAllFromUsers() throws Exception {
final QueuedQueryMetadata queryMetadata = executeQuery(
"SELECT * from %s;", userTable);
BlockingQueue<KeyValue<String, GenericRow>> rowQueue = queryMetadata.getRowQueue();
Set<String> actualUsers = new HashSet<>();
Set<String> expectedUsers = Utils.mkSet("USER_0", "USER_1", "USER_2", "USER_3", "USER_4");
while (actualUsers.size() < expectedUsers.size()) {
KeyValue<String, GenericRow> nextRow = rowQueue.poll();
if (nextRow != null) {
List<Object> columns = nextRow.value.getColumns();
assertEquals(6, columns.size());
actualUsers.add((String) columns.get(1));
}
}
assertEquals(expectedUsers, actualUsers);
}
private void handleJoinGroupRequest(ChannelHandlerContext ctx, Request request) {
JoinGroupRequest joinGroupRequest = (JoinGroupRequest) request.getBody();
ResponseHeader responseHeader = new ResponseHeader(request.getHeader().correlationId());
List<ProtocolEntry> protocols = joinGroupRequest.groupProtocols().stream().map(protocol -> new ProtocolEntry(protocol.name(), Utils.toArray(protocol.metadata()))).collect(Collectors.toList());
coordinator.handleJoinGroup(
joinGroupRequest.groupId(),
joinGroupRequest.memberId(),
request.getHeader().clientId(),
request.getClientAddress().toString(),
joinGroupRequest.rebalanceTimeout(),
joinGroupRequest.sessionTimeout(),
joinGroupRequest.protocolType(),
protocols,
(joinResult) -> {
Map<String, ByteBuffer> members = joinResult.getMembers().entrySet().stream().collect(Collectors.toMap(k -> k.getKey(), k -> ByteBuffer.wrap(k.getValue())));
JoinGroupResponse responseBody = new JoinGroupResponse(request.getHeader().apiVersion(), joinResult.getErrorCode(), joinResult.getGenerationId(),
joinResult.getSubProtocol(), joinResult.getMemberId(), joinResult.getLeaderId(), members);
logger.trace(String.format("Sending join group response %s for correlation id %d to client %s.",
responseBody, request.getHeader().correlationId(), request.getHeader().clientId()));
sendResponse(ctx, new Response(responseHeader, responseBody));
}
);
}
private static MirusOffsetTool newOffsetTool(Args args) throws IOException {
// This needs to be the admin topic properties.
// By default these are in the worker properties file, as this has the has admin producer and
// consumer settings. Separating these might be wise - also useful for storing state in
// source cluster if it proves necessary.
final Map<String, String> properties =
!args.propertiesFile.isEmpty()
? Utils.propsToStringMap(Utils.loadProps(args.propertiesFile))
: Collections.emptyMap();
final DistributedConfig config = new DistributedConfig(properties);
final KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
offsetBackingStore.configure(config);
// Avoid initializing the entire Kafka Connect plugin system by assuming the
// internal.[key|value].converter is org.apache.kafka.connect.json.JsonConverter
final Converter internalConverter = new JsonConverter();
internalConverter.configure(config.originalsWithPrefix("internal.key.converter."), true);
final OffsetSetter offsetSetter = new OffsetSetter(internalConverter, offsetBackingStore);
final OffsetFetcher offsetFetcher = new OffsetFetcher(config, internalConverter);
final OffsetSerDe offsetSerDe = OffsetSerDeFactory.create(args.format);
return new MirusOffsetTool(args, offsetFetcher, offsetSetter, offsetSerDe);
}
/**
* Create a temporary relative directory in the specified parent directory with the given prefix.
*
* @param parent The parent folder path name, if null using the default temporary-file directory
* @param prefix The prefix of the temporary directory, if null using "kafka-" as default prefix
* @return the temp dir
*/
public static File tempDirectory(final Path parent, String prefix) {
final File file;
prefix = prefix == null ? "kafka-" : prefix;
try {
file = parent == null
?
Files.createTempDirectory(prefix).toFile()
: Files.createTempDirectory(parent, prefix).toFile();
} catch (final IOException ex) {
throw new RuntimeException("Failed to create a temp dir", ex);
}
file.deleteOnExit();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
Utils.delete(file);
} catch (IOException e) {
log.error("Error deleting {}", file.getAbsolutePath(), e);
}
}));
return file;
}
@Before
public void setUp() throws Exception {
zookeeper = new EmbeddedZookeeper();
zkConnect = String.format("localhost:%d", zookeeper.port());
configs = new Vector<>();
servers = new Vector<>();
for (int i = 0; i < numBrokers; i++) {
KafkaConfig config = getKafkaConfig(i);
configs.add(config);
KafkaServer server = TestUtils.createServer(config, Time.SYSTEM);
servers.add(server);
}
String[] serverUrls = new String[servers.size()];
ListenerName listenerType = ListenerName.forSecurityProtocol(getSecurityProtocol());
for (int i = 0; i < servers.size(); i++) {
serverUrls[i] =
Utils.formatAddress(
servers.get(i).config().advertisedListeners().head().host(),
servers.get(i).boundPort(listenerType)
);
}
bootstrapServers = Utils.join(serverUrls, ",");
}
@Test(timeout = 120000)
public void isKafkaReadyWithSASLAndSSL() throws Exception {
Properties clientSecurityProps = kafka.getClientSecurityConfig();
Map<String, String> config = Utils.propsToStringMap(clientSecurityProps);
config.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapBroker
(SecurityProtocol.SASL_SSL));
// Set password and enabled protocol as the Utils.propsToStringMap just returns toString()
// representations and these properties don't have a valid representation.
Password trustStorePassword = (Password) clientSecurityProps.get("ssl.truststore.password");
config.put("ssl.truststore.password", trustStorePassword.value());
config.put("ssl.enabled.protocols", "TLSv1.2");
assertThat(ClusterStatus.isKafkaReady(config, 3, 10000)).isTrue();
}
@Test(timeout = 120000)
public void isKafkaReadyWithSASLAndSSLUsingZK() throws Exception {
Properties clientSecurityProps = kafka.getClientSecurityConfig();
boolean zkReady = ClusterStatus.isZookeeperReady(this.kafka.getZookeeperConnectString(), 30000);
if (!zkReady) {
throw new RuntimeException(
"Could not reach zookeeper " + this.kafka.getZookeeperConnectString());
}
Map<String, String> endpoints = ClusterStatus.getKafkaEndpointFromZookeeper(
this.kafka.getZookeeperConnectString(),
30000
);
String bootstrap_broker = endpoints.get("SASL_SSL");
Map<String, String> config = Utils.propsToStringMap(clientSecurityProps);
config.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrap_broker);
// Set password and enabled protocol as the Utils.propsToStringMap just returns toString()
// representations and these properties don't have a valid representation.
Password trustStorePassword = (Password) clientSecurityProps.get("ssl.truststore.password");
config.put("ssl.truststore.password", trustStorePassword.value());
config.put("ssl.enabled.protocols", "TLSv1.2");
assertThat(ClusterStatus.isKafkaReady(config, 3, 10000)).isTrue();
}
public <T> T getConfiguredInstance(String key, Class<T> t, Producer<byte[], byte[]> producer) {
Class<?> c = getClass(key);
if (c == null) {
return null;
}
Object o = Utils.newInstance(c);
if (!t.isInstance(o)) {
throw new KafkaException(c.getName() + " is not an instance of " + t.getName());
}
if (o instanceof Configurable) {
((Configurable) o).configure(configsWithCurrentProducer(producer));
}
return t.cast(o);
}
@SuppressWarnings("unchecked")
protected <T> T newInstance(Map<String, ?> map, String key, Class<T> klass) throws KafkaException {
Object val = map.get(key);
if (val == null) {
throw new KafkaException("No value for '" + key + "' found");
} else if (val instanceof String) {
try {
return (T) Utils.newInstance(Class.forName((String) val));
} catch (Exception e) {
throw new KafkaException(e);
}
} else if (val instanceof Class) {
return (T) Utils.newInstance((Class<T>) val);
} else {
throw new KafkaException("Unexpected type '" + val.getClass() + "' for '" + key + "'");
}
}
public ConnectEmbedded(Properties workerConfig, Properties... connectorConfigs) throws Exception {
Time time = new SystemTime();
DistributedConfig config = new DistributedConfig(Utils.propsToStringMap(workerConfig));
KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
offsetBackingStore.configure(config);
//not sure if this is going to work but because we don't have advertised url we can get at least a fairly random
String workerId = UUID.randomUUID().toString();
worker = new Worker(workerId, time, config, offsetBackingStore);
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, worker.getInternalValueConverter());
statusBackingStore.configure(config);
ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(worker.getInternalValueConverter());
configBackingStore.configure(config);
//advertisedUrl = "" as we don't have the rest server - hopefully this will not break anything
herder = new DistributedHerder(config, time, worker, statusBackingStore, configBackingStore, "");
this.connectorConfigs = connectorConfigs;
shutdownHook = new ShutdownHook();
}
private Serde<?> getKeySerde(String keySerdeString) {
Serde<?> keySerde;
try {
if (StringUtils.hasText(keySerdeString)) {
keySerde = Utils.newInstance(keySerdeString, Serde.class);
}
else {
keySerde = getFallbackSerde("default.key.serde");
}
keySerde.configure(this.streamConfigGlobalProperties, true);
}
catch (ClassNotFoundException ex) {
throw new IllegalStateException("Serde class not found: ", ex);
}
return keySerde;
}
private Serde<?> getKeySerde(String keySerdeString, ResolvableType resolvableType) {
Serde<?> keySerde = null;
try {
if (StringUtils.hasText(keySerdeString)) {
keySerde = Utils.newInstance(keySerdeString, Serde.class);
}
else {
if (resolvableType != null &&
(isResolvalbeKafkaStreamsType(resolvableType) || isResolvableKStreamArrayType(resolvableType))) {
ResolvableType generic = resolvableType.isArray() ? resolvableType.getComponentType().getGeneric(0) : resolvableType.getGeneric(0);
Serde<?> fallbackSerde = getFallbackSerde("default.key.serde");
keySerde = getSerde(generic, fallbackSerde);
}
if (keySerde == null) {
keySerde = Serdes.ByteArray();
}
}
keySerde.configure(this.streamConfigGlobalProperties, true);
}
catch (ClassNotFoundException ex) {
throw new IllegalStateException("Serde class not found: ", ex);
}
return keySerde;
}
@SuppressWarnings("unchecked")
private Serde<?> getValueSerde(String valueSerdeString, ResolvableType resolvableType)
throws ClassNotFoundException {
Serde<?> valueSerde = null;
if (StringUtils.hasText(valueSerdeString)) {
valueSerde = Utils.newInstance(valueSerdeString, Serde.class);
}
else {
if (resolvableType != null && ((isResolvalbeKafkaStreamsType(resolvableType)) ||
(isResolvableKStreamArrayType(resolvableType)))) {
Serde<?> fallbackSerde = getFallbackSerde("default.value.serde");
ResolvableType generic = resolvableType.isArray() ? resolvableType.getComponentType().getGeneric(1) : resolvableType.getGeneric(1);
valueSerde = getSerde(generic, fallbackSerde);
}
if (valueSerde == null) {
valueSerde = Serdes.ByteArray();
}
}
valueSerde.configure(streamConfigGlobalProperties, false);
return valueSerde;
}
@VisibleForTesting
public List<Future> asyncSendFutures(List<Coordinate> coordinateList) {
List<Future> futures = coordinateList
.stream()
// Using the minimum UUID here to make sure the time is always beyond the FCL so that the resolver is certain to put the document in to actual Megabus.
// This way we wouldn't be in a situation where there is a ref in Ref topic but not in the Megabus topic.
.map(coordinate -> new MegabusRef(coordinate.getTable(), coordinate.getId(), TimeUUIDs.minimumUuid(), _clock.instant(), MegabusRef.RefType.TOUCH))
.collect(Collectors.groupingBy(ref -> {
String key = Coordinate.of(ref.getTable(), ref.getKey()).toString();
return Utils.toPositive(Utils.murmur2(key.getBytes())) % _topic.getPartitions();
}, Collectors.toList()))
.entrySet()
.stream()
.map(entry -> _producer.send(new ProducerRecord<>(_topic.getName(), entry.getKey(), TimeUUIDs.newUUID().toString(),
_objectMapper.valueToTree(entry.getValue()))))
.collect(Collectors.toList());
return futures;
}
@SuppressWarnings("unchecked")
@Override
public Producer<Integer, Long> apply(Map<String, Object> config) {
// Make sure the config is correctly set up for serializers.
Utils.newInstance(
(Class<? extends Serializer<?>>)
((Class<?>) config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
.asSubclass(Serializer.class))
.configure(config, true);
Utils.newInstance(
(Class<? extends Serializer<?>>)
((Class<?>) config.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
.asSubclass(Serializer.class))
.configure(config, false);
// Returning same producer in each instance in a pipeline seems to work fine currently.
// If DirectRunner creates multiple DoFn instances for sinks, we might need to handle
// it appropriately. I.e. allow multiple producers for each producerKey and concatenate
// all the messages written to each producer for verification after the pipeline finishes.
return MOCK_PRODUCER_MAP.get(producerKey);
}
/**
* here does not seem to be a public interface for embedding a Kafka connect runtime,
* therefore, this code is modeled from the behavior taken from
* https://github.com/apache/kafka/blob/2.1/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
* and performs the initialization in a roughly similar manner.
*
*/
private void init() {
LOG.info("Started worked initialization");
Time time = Time.SYSTEM;
// Initializes the system runtime information and logs some of the information
WorkerInfo initInfo = new WorkerInfo();
initInfo.logAll();
Properties props = kafkaConnectPropertyFactory.getProperties();
Map<String, String> standAloneProperties = Utils.propsToStringMap(props);
// Not needed, but we need this one to initialize the worker
Plugins plugins = new Plugins(standAloneProperties);
StandaloneConfig config = new StandaloneConfig(standAloneProperties);
String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
AllConnectorClientConfigOverridePolicy allConnectorClientConfigOverridePolicy = new AllConnectorClientConfigOverridePolicy();
RestServer rest = new RestServer(config);
rest.initializeServer();
/*
According to the Kafka source code "... Worker runs a (dynamic) set of tasks
in a set of threads, doing the work of actually moving data to/from Kafka ..."
*/
Worker worker = new Worker(bootstrapServer, time, plugins, config, new FileOffsetBackingStore(), allConnectorClientConfigOverridePolicy);
/*
From Kafka source code: " ... The herder interface tracks and manages workers
and connectors ..."
*/
herder = new StandaloneHerder(worker, kafkaClusterId, allConnectorClientConfigOverridePolicy);
connect = new Connect(herder, rest);
LOG.info("Finished initializing the worker");
}
public void initializeConnector(ConnectorPropertyFactory connectorPropertyFactory,
Consumer<ConnectorInitState> callback) throws ExecutionException, InterruptedException {
Properties connectorProps = connectorPropertyFactory.getProperties();
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>((error, info) ->
callback.accept(new ConnectorInitState(info.result().config(), info.created(), error)));
herder.putConnectorConfig(
connectorProps.getProperty(ConnectorConfig.NAME_CONFIG),
Utils.propsToStringMap(connectorProps), false, cb);
cb.get();
}
public <T> void initializeConnector(ConnectorPropertyFactory connectorPropertyFactory,
BiConsumer<ConnectorInitState, T> callback, T payload) throws ExecutionException, InterruptedException {
Properties connectorProps = connectorPropertyFactory.getProperties();
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>((error, info) ->
callback.accept(new ConnectorInitState(info.result().config(), info.created(), error), payload));
herder.putConnectorConfig(
connectorProps.getProperty(ConnectorConfig.NAME_CONFIG),
Utils.propsToStringMap(connectorProps), false, cb);
cb.get();
}
@Override
public void ensureValid(String name, Object invocationMode) {
try {
InvocationMode.valueOf(((String)invocationMode).trim());
} catch (Exception e) {
throw new ConfigException(name, invocationMode, "Value must be one of [" +
Utils.join(InvocationMode.values(), ", ") + "]");
}
}
@Override
public void ensureValid(String name, Object invocationFailure) {
try {
InvocationFailure.valueOf(((String)invocationFailure).trim());
} catch (Exception e) {
throw new ConfigException(name, invocationFailure, "Value must be one of [" +
Utils.join(InvocationFailure.values(), ", ") + "]");
}
}
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (null == keyBytes) {
return counter.getAndIncrement() % numPartitions;
} else
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
@Test
@SuppressWarnings("unchecked")
public void shouldExtractCorrectSourceForSimpleQuery() {
PlanNode planNode = logicalPlanBuilder.buildLogicalPlan("select col0 from TEST2 limit 5;");
PlanSourceExtractorVisitor planSourceExtractorVisitor = new PlanSourceExtractorVisitor();
planSourceExtractorVisitor.process(planNode, null);
Set<String> sourceNames = planSourceExtractorVisitor.getSourceNames();
assertThat(sourceNames.size(), equalTo(1));
assertThat(sourceNames, equalTo(Utils.mkSet("TEST2")));
}
@Test
@SuppressWarnings("unchecked")
public void shouldExtractCorrectSourceForJoinQuery() {
PlanNode planNode = logicalPlanBuilder.buildLogicalPlan(
"SELECT t1.col1, t2.col1, t1.col4, t2.col2 FROM test1 t1 LEFT JOIN "
+ "test2 t2 ON t1.col1 = t2.col1;");
PlanSourceExtractorVisitor planSourceExtractorVisitor = new PlanSourceExtractorVisitor();
planSourceExtractorVisitor.process(planNode, null);
Set<String> sourceNames = planSourceExtractorVisitor.getSourceNames();
assertThat(sourceNames, equalTo(Utils.mkSet("TEST1", "TEST2")));
}
@Test
public void shouldHaveLeftJoin() {
setupTopicClientExpectations(1, 1);
buildJoin();
final Topology topology = builder.build();
final TopologyDescription.Processor leftJoin
= (TopologyDescription.Processor) getNodeByName(topology, "KSTREAM-LEFTJOIN-0000000014");
final List<String> predecessors = leftJoin.predecessors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
assertThat(leftJoin.stores(), equalTo(Utils.mkSet("KSTREAM-REDUCE-STATE-STORE-0000000003")));
assertThat(predecessors, equalTo(Collections.singletonList("KSTREAM-SOURCE-0000000013")));
}
@Test
public void shouldUpdateReferentialIntegrityTableCorrectly() throws Exception {
ksqlEngine.createQueries("create table bar as select * from test2;" +
"create table foo as select * from test2;");
MetaStore metaStore = ksqlEngine.getMetaStore();
assertThat(metaStore.getQueriesWithSource("TEST2"),
equalTo(Utils.mkSet("CTAS_BAR", "CTAS_FOO")));
assertThat(metaStore.getQueriesWithSink("BAR"), equalTo(Utils.mkSet("CTAS_BAR")));
assertThat(metaStore.getQueriesWithSink("FOO"), equalTo(Utils.mkSet("CTAS_FOO")));
}
@Test
public void shouldRetryListTopics() {
expect(adminClient.listTopics()).andReturn(listTopicResultWithNotControllerException()).once();
expect(adminClient.listTopics()).andReturn(getListTopicsResult());
replay(adminClient);
KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient);
Set<String> names = kafkaTopicClient.listTopicNames();
assertThat(names, equalTo(Utils.mkSet(topicName1, topicName2, topicName3)));
verify(adminClient);
}