org.apache.kafka.clients.producer.Producer#partitionsFor ( )源码实例Demo

下面列出了org.apache.kafka.clients.producer.Producer#partitionsFor ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: Flink-CEPplus   文件: FlinkKafkaProducer.java
private static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) {
	// the fetched list is immutable, so we're creating a mutable copy in order to sort it
	List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));

	// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
	Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
		@Override
		public int compare(PartitionInfo o1, PartitionInfo o2) {
			return Integer.compare(o1.partition(), o2.partition());
		}
	});

	int[] partitions = new int[partitionsList.size()];
	for (int i = 0; i < partitions.length; i++) {
		partitions[i] = partitionsList.get(i).partition();
	}

	return partitions;
}
 
源代码2 项目: Flink-CEPplus   文件: FlinkKafkaProducer011.java
private static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) {
	// the fetched list is immutable, so we're creating a mutable copy in order to sort it
	List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));

	// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
	Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
		@Override
		public int compare(PartitionInfo o1, PartitionInfo o2) {
			return Integer.compare(o1.partition(), o2.partition());
		}
	});

	int[] partitions = new int[partitionsList.size()];
	for (int i = 0; i < partitions.length; i++) {
		partitions[i] = partitionsList.get(i).partition();
	}

	return partitions;
}
 
源代码3 项目: flink   文件: FlinkKafkaProducer.java
private static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) {
	// the fetched list is immutable, so we're creating a mutable copy in order to sort it
	List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));

	// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
	Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
		@Override
		public int compare(PartitionInfo o1, PartitionInfo o2) {
			return Integer.compare(o1.partition(), o2.partition());
		}
	});

	int[] partitions = new int[partitionsList.size()];
	for (int i = 0; i < partitions.length; i++) {
		partitions[i] = partitionsList.get(i).partition();
	}

	return partitions;
}
 
源代码4 项目: flink   文件: FlinkKafkaProducer011.java
private static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) {
	// the fetched list is immutable, so we're creating a mutable copy in order to sort it
	List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));

	// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
	Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
		@Override
		public int compare(PartitionInfo o1, PartitionInfo o2) {
			return Integer.compare(o1.partition(), o2.partition());
		}
	});

	int[] partitions = new int[partitionsList.size()];
	for (int i = 0; i < partitions.length; i++) {
		partitions[i] = partitionsList.get(i).partition();
	}

	return partitions;
}
 
源代码5 项目: flink   文件: FlinkKafkaProducer.java
protected static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) {
	// the fetched list is immutable, so we're creating a mutable copy in order to sort it
	List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));

	// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
	Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
		@Override
		public int compare(PartitionInfo o1, PartitionInfo o2) {
			return Integer.compare(o1.partition(), o2.partition());
		}
	});

	int[] partitions = new int[partitionsList.size()];
	for (int i = 0; i < partitions.length; i++) {
		partitions[i] = partitionsList.get(i).partition();
	}

	return partitions;
}
 
源代码6 项目: flink   文件: FlinkKafkaProducer011.java
private static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) {
	// the fetched list is immutable, so we're creating a mutable copy in order to sort it
	List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));

	// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
	Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
		@Override
		public int compare(PartitionInfo o1, PartitionInfo o2) {
			return Integer.compare(o1.partition(), o2.partition());
		}
	});

	int[] partitions = new int[partitionsList.size()];
	for (int i = 0; i < partitions.length; i++) {
		partitions[i] = partitionsList.get(i).partition();
	}

	return partitions;
}
 
源代码7 项目: datacollector   文件: KafkaValidationUtil09.java
@Override
public void createTopicIfNotExists(String topic, Map<String, Object> kafkaClientConfigs, String metadataBrokerList) throws StageException {
  Producer<String, String> kafkaProducer = createProducerTopicMetadataClient(
      metadataBrokerList,
      kafkaClientConfigs
  );
  kafkaProducer.partitionsFor(topic);
}
 
/**
 * Should be called only by MapR Streams Producer. It creates a topic using Producer.
 * @param topic
 * @param kafkaClientConfigs
 * @param metadataBrokerList
 * @throws StageException
 */
@Override
public void createTopicIfNotExists(String topic, Map<String, Object> kafkaClientConfigs, String metadataBrokerList)
    throws StageException {
  // Stream topic can be created through Producer if Stream Path exists already
  Producer<String, String> kafkaProducer = createProducerTopicMetadataClient(kafkaClientConfigs);
  kafkaProducer.partitionsFor(topic);
}
 
@Override
public boolean validateTopicExistence(
  Stage.Context context,
  String groupName,
  String configName,
  List<HostAndPort> kafkaBrokers,
  String metadataBrokerList,
  String topic,
  Map<String, Object> kafkaClientConfigs,
  List<Stage.ConfigIssue> issues,
  boolean producer
) {
  boolean valid = true;
  if(topic == null || topic.isEmpty()) {
    issues.add(context.createConfigIssue(groupName, configName, KafkaErrors.KAFKA_05));
    valid = false;
  } else {
    List<PartitionInfo> partitionInfos;
    try {
      if (producer) {
        Producer<String, String> kafkaProducer = createProducerTopicMetadataClient(kafkaClientConfigs);
        partitionInfos = kafkaProducer.partitionsFor(topic);
      } else {
        Consumer<String, String> kafkaConsumer = createTopicMetadataClient();
        partitionInfos = kafkaConsumer.partitionsFor(topic);
      }
      if (null == partitionInfos || partitionInfos.isEmpty()) {
        issues.add(
            context.createConfigIssue(
                groupName,
                KAFKA_CONFIG_BEAN_PREFIX + "topic",
                MapRStreamsErrors.MAPRSTREAMS_02,
                topic
            )
        );
        valid = false;
      }
    } catch (KafkaException e) {
      LOG.error(MapRStreamsErrors.MAPRSTREAMS_01.getMessage(), topic, e.toString(), e);
      issues.add(
          context.createConfigIssue(
              groupName,
              configName,
              MapRStreamsErrors.MAPRSTREAMS_01,
              topic,
              e.getMessage()
          )
      );
      valid = false;
    }
  }
  return valid;
}