下面列出了org.apache.kafka.clients.producer.Producer#partitionsFor ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) {
// the fetched list is immutable, so we're creating a mutable copy in order to sort it
List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
@Override
public int compare(PartitionInfo o1, PartitionInfo o2) {
return Integer.compare(o1.partition(), o2.partition());
}
});
int[] partitions = new int[partitionsList.size()];
for (int i = 0; i < partitions.length; i++) {
partitions[i] = partitionsList.get(i).partition();
}
return partitions;
}
private static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) {
// the fetched list is immutable, so we're creating a mutable copy in order to sort it
List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
@Override
public int compare(PartitionInfo o1, PartitionInfo o2) {
return Integer.compare(o1.partition(), o2.partition());
}
});
int[] partitions = new int[partitionsList.size()];
for (int i = 0; i < partitions.length; i++) {
partitions[i] = partitionsList.get(i).partition();
}
return partitions;
}
private static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) {
// the fetched list is immutable, so we're creating a mutable copy in order to sort it
List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
@Override
public int compare(PartitionInfo o1, PartitionInfo o2) {
return Integer.compare(o1.partition(), o2.partition());
}
});
int[] partitions = new int[partitionsList.size()];
for (int i = 0; i < partitions.length; i++) {
partitions[i] = partitionsList.get(i).partition();
}
return partitions;
}
private static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) {
// the fetched list is immutable, so we're creating a mutable copy in order to sort it
List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
@Override
public int compare(PartitionInfo o1, PartitionInfo o2) {
return Integer.compare(o1.partition(), o2.partition());
}
});
int[] partitions = new int[partitionsList.size()];
for (int i = 0; i < partitions.length; i++) {
partitions[i] = partitionsList.get(i).partition();
}
return partitions;
}
protected static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) {
// the fetched list is immutable, so we're creating a mutable copy in order to sort it
List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
@Override
public int compare(PartitionInfo o1, PartitionInfo o2) {
return Integer.compare(o1.partition(), o2.partition());
}
});
int[] partitions = new int[partitionsList.size()];
for (int i = 0; i < partitions.length; i++) {
partitions[i] = partitionsList.get(i).partition();
}
return partitions;
}
private static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) {
// the fetched list is immutable, so we're creating a mutable copy in order to sort it
List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
@Override
public int compare(PartitionInfo o1, PartitionInfo o2) {
return Integer.compare(o1.partition(), o2.partition());
}
});
int[] partitions = new int[partitionsList.size()];
for (int i = 0; i < partitions.length; i++) {
partitions[i] = partitionsList.get(i).partition();
}
return partitions;
}
@Override
public void createTopicIfNotExists(String topic, Map<String, Object> kafkaClientConfigs, String metadataBrokerList) throws StageException {
Producer<String, String> kafkaProducer = createProducerTopicMetadataClient(
metadataBrokerList,
kafkaClientConfigs
);
kafkaProducer.partitionsFor(topic);
}
/**
* Should be called only by MapR Streams Producer. It creates a topic using Producer.
* @param topic
* @param kafkaClientConfigs
* @param metadataBrokerList
* @throws StageException
*/
@Override
public void createTopicIfNotExists(String topic, Map<String, Object> kafkaClientConfigs, String metadataBrokerList)
throws StageException {
// Stream topic can be created through Producer if Stream Path exists already
Producer<String, String> kafkaProducer = createProducerTopicMetadataClient(kafkaClientConfigs);
kafkaProducer.partitionsFor(topic);
}
@Override
public boolean validateTopicExistence(
Stage.Context context,
String groupName,
String configName,
List<HostAndPort> kafkaBrokers,
String metadataBrokerList,
String topic,
Map<String, Object> kafkaClientConfigs,
List<Stage.ConfigIssue> issues,
boolean producer
) {
boolean valid = true;
if(topic == null || topic.isEmpty()) {
issues.add(context.createConfigIssue(groupName, configName, KafkaErrors.KAFKA_05));
valid = false;
} else {
List<PartitionInfo> partitionInfos;
try {
if (producer) {
Producer<String, String> kafkaProducer = createProducerTopicMetadataClient(kafkaClientConfigs);
partitionInfos = kafkaProducer.partitionsFor(topic);
} else {
Consumer<String, String> kafkaConsumer = createTopicMetadataClient();
partitionInfos = kafkaConsumer.partitionsFor(topic);
}
if (null == partitionInfos || partitionInfos.isEmpty()) {
issues.add(
context.createConfigIssue(
groupName,
KAFKA_CONFIG_BEAN_PREFIX + "topic",
MapRStreamsErrors.MAPRSTREAMS_02,
topic
)
);
valid = false;
}
} catch (KafkaException e) {
LOG.error(MapRStreamsErrors.MAPRSTREAMS_01.getMessage(), topic, e.toString(), e);
issues.add(
context.createConfigIssue(
groupName,
configName,
MapRStreamsErrors.MAPRSTREAMS_01,
topic,
e.getMessage()
)
);
valid = false;
}
}
return valid;
}