下面列出了org.apache.kafka.clients.producer.BufferExhaustedException#org.apache.kafka.common.PartitionInfo 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void getEndOffsets() {
Map<TopicPartition, Long> offsets = new HashMap<>();
offsets.put(new TopicPartition("topic1", 0), 123L);
offsets.put(new TopicPartition("topic1", 1), 234L);
offsets.put(new TopicPartition("topic2", 0), 345L);
offsets.put(new TopicPartition("topic2", 1), 456L);
when(consumer.partitionsFor("topic1")).thenReturn(Arrays.asList(
new PartitionInfo("topic1", 0, null, null, null),
new PartitionInfo("topic1", 1, null, null, null)));
when(consumer.partitionsFor("topic2")).thenReturn(Arrays.asList(
new PartitionInfo("topic2", 0, null, null, null),
new PartitionInfo("topic2", 1, null, null, null)));
when(consumer.endOffsets(Arrays.asList(
new TopicPartition("topic1", 0),
new TopicPartition("topic1", 1),
new TopicPartition("topic2", 0),
new TopicPartition("topic2", 1)
))).thenReturn(offsets);
assertThat(client.getEndOffsets(Arrays.asList("topic1", "topic2")), is(offsets));
}
@Test
public void testClear() {
List<ExecutionProposal> proposals = new ArrayList<>();
proposals.add(_leaderMovement1);
proposals.add(_partitionMovement1);
ExecutionTaskPlanner planner =
new ExecutionTaskPlanner(null, new KafkaCruiseControlConfig(KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties()));
Set<PartitionInfo> partitions = new HashSet<>();
partitions.add(generatePartitionInfo(_leaderMovement1, false));
partitions.add(generatePartitionInfo(_partitionMovement1, false));
Cluster expectedCluster = new Cluster(null,
_expectedNodes,
partitions,
Collections.<String>emptySet(),
Collections.<String>emptySet());
planner.addExecutionProposals(proposals, expectedCluster, null);
assertEquals(2, planner.remainingLeadershipMovements().size());
assertEquals(2, planner.remainingInterBrokerReplicaMovements().size());
planner.clear();
assertEquals(0, planner.remainingLeadershipMovements().size());
assertEquals(0, planner.remainingInterBrokerReplicaMovements().size());
}
public static void alertOnProlongedUnderReplicatedPartitions(String[] emails,
String clusterName,
int waitTimeInSeconds,
List<PartitionInfo> urps) {
if (prolongedUrpEmails.containsKey(clusterName) &&
System.currentTimeMillis() - prolongedUrpEmails.get(clusterName) < COOLOFF_INTERVAL) {
// return to avoid spamming users if an email has been sent within the coll-time time span
return;
}
prolongedUrpEmails.put(clusterName, System.currentTimeMillis());
String title = clusterName + " has been under-replicated for > "
+ waitTimeInSeconds + " seconds (" + urps.size() + ") under-replicated partitions";
StringBuilder sb = new StringBuilder();
for (PartitionInfo partitionInfo : urps) {
sb.append(partitionInfo + "\n");
}
String content = sb.toString();
sendTo(emails, title, content);
}
@Test
public void listTopicsHappyPath() {
final Map<String, List<PartitionInfo>> topics = new HashMap<>();
topics.put("topic1", Lists.newArrayList());
topics.put("topic2", Lists.newArrayList());
topics.put("topic3", Lists.newArrayList());
when(kafkaConsumer.listTopics()).thenReturn(topics);
final Set<String> listedTopics = kafkaService.listTopics();
assertEquals(Sets.newHashSet("topic1", "topic2", "topic3"), listedTopics);
verifyNoInteractions(zkUtils);
verify(kafkaConsumer).listTopics();
verify(kafkaConsumer).close();
verifyNoMoreInteractions(kafkaConsumer, zkUtils);
}
@Override
protected List<KafkaTopicPartition> getAllPartitionsForTopics(List<String> topics) throws AbstractPartitionDiscoverer.WakeupException {
List<KafkaTopicPartition> partitions = new LinkedList<>();
try {
for (String topic : topics) {
for (PartitionInfo partitionInfo : kafkaConsumer.partitionsFor(topic)) {
partitions.add(new KafkaTopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
}
} catch (org.apache.kafka.common.errors.WakeupException e) {
// rethrow our own wakeup exception
throw new AbstractPartitionDiscoverer.WakeupException();
}
return partitions;
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes != null) {
int hashCode = 0;
if (key instanceof Integer || key instanceof Long) {
hashCode = (int) key;
} else {
hashCode = key.hashCode();
}
hashCode = hashCode & 0x7fffffff;
return hashCode % numPartitions;
} else {
return 0;
}
}
@Override
protected List<KafkaTopicPartition> getAllPartitionsForTopics(List<String> topics) throws WakeupException, RuntimeException {
List<KafkaTopicPartition> partitions = new LinkedList<>();
try {
for (String topic : topics) {
final List<PartitionInfo> kafkaPartitions = kafkaConsumer.partitionsFor(topic);
if (kafkaPartitions == null) {
throw new RuntimeException(String.format("Could not fetch partitions for %s. Make sure that the topic exists.", topic));
}
for (PartitionInfo partitionInfo : kafkaPartitions) {
partitions.add(new KafkaTopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
}
} catch (org.apache.kafka.common.errors.WakeupException e) {
// rethrow our own wakeup exception
throw new WakeupException();
}
return partitions;
}
/**
* Remove the under-replicated partitions that are in the middle of partition reassignment.
*/
public List<PartitionInfo> filterOutInReassignmentUrps(List<PartitionInfo> urps,
Map<String, Integer> replicationFactors) {
List<PartitionInfo> result = new ArrayList<>();
for (PartitionInfo urp : urps) {
if (urp.replicas().length <= replicationFactors.get(urp.topic())) {
// # of replicas <= replication factor
result.add(urp);
} else {
// # of replicas > replication factor. this can happen after
// a failed partition reassignment
Set<Integer> liveReplicas = new HashSet<>();
for (Node node : urp.replicas()) {
if (node.host() != null && OperatorUtil.pingKafkaBroker(node.host(), 9092, 5000)) {
liveReplicas.add(node.id());
}
}
if (liveReplicas.size() < replicationFactors.get(urp.topic())) {
result.add(urp);
}
}
}
return result;
}
protected static int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer) {
// the fetched list is immutable, so we're creating a mutable copy in order to sort it
List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
@Override
public int compare(PartitionInfo o1, PartitionInfo o2) {
return Integer.compare(o1.partition(), o2.partition());
}
});
int[] partitions = new int[partitionsList.size()];
for (int i = 0; i < partitions.length; i++) {
partitions[i] = partitionsList.get(i).partition();
}
return partitions;
}
private static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) {
// the fetched list is immutable, so we're creating a mutable copy in order to sort it
List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
@Override
public int compare(PartitionInfo o1, PartitionInfo o2) {
return Integer.compare(o1.partition(), o2.partition());
}
});
int[] partitions = new int[partitionsList.size()];
for (int i = 0; i < partitions.length; i++) {
partitions[i] = partitionsList.get(i).partition();
}
return partitions;
}
protected void prepareConsumers() {
int numConsumers = _consumers.size();
List<List<TopicPartition>> assignments = new ArrayList<>();
for (int i = 0; i < numConsumers; i++) {
assignments.add(new ArrayList<>());
}
int j = 0;
for (String topic : Arrays.asList(_partitionMetricSampleStoreTopic, _brokerMetricSampleStoreTopic)) {
for (PartitionInfo partInfo : _consumers.get(0).partitionsFor(topic)) {
assignments.get(j++ % numConsumers).add(new TopicPartition(partInfo.topic(), partInfo.partition()));
}
}
for (int i = 0; i < numConsumers; i++) {
_consumers.get(i).assign(assignments.get(i));
}
}
public static Map<Integer, Long> getLatestOffsets(final CubeInstance cubeInstance) {
final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable());
final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
final String topic = kafkaConfig.getTopic();
Map<Integer, Long> startOffsets = Maps.newHashMap();
try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName())) {
final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfos) {
long latest = getLatestOffset(consumer, topic, partitionInfo.partition());
startOffsets.put(partitionInfo.partition(), latest);
}
}
return startOffsets;
}
/**
* Test that KafkaAdminFactory can create a working KafkaConsumer when connecting to a non-ssl cluster.
*/
@Test
public void testCreateNonSslConsumer() {
// Create Cluster config
final ClusterConfig clusterConfig = ClusterConfig.newBuilder()
.withBrokerHosts(sharedKafkaTestResource.getKafkaConnectString())
.build();
// Create a topic
final String topicName = "MyRandomTopic";
sharedKafkaTestResource.getKafkaTestUtils().createTopic(topicName, 1, (short) 1);
final KafkaAdminFactory kafkaAdminFactory = new KafkaAdminFactory(new KafkaClientConfigUtil("NotUsed", "Prefix"));
// Create instance
try (final KafkaConsumer<String, String> consumerClient = kafkaAdminFactory.createConsumer(clusterConfig, "MyClientId")) {
// Call method to validate things work as expected
final Map<String, List<PartitionInfo>> results = consumerClient.listTopics();
assertNotNull(results);
assertTrue(results.containsKey(topicName), "Should have our topic.");
}
}
public Map<Integer, List<TopicPartition>> getBrokerLeaderPartitions(
Map<String, List<PartitionInfo>> topicPartitonInfoMap) {
Map<Integer, List<TopicPartition>> result = new HashMap<>();
for (String topic : topicPartitonInfoMap.keySet()) {
List<PartitionInfo> partitionInfoList = topicPartitonInfoMap.get(topic);
if (partitionInfoList == null) {
LOG.error("Failed to get partition info for {}", topic);
continue;
}
for (PartitionInfo info : partitionInfoList) {
Node leaderNode = info.leader();
if (leaderNode != null) {
result.putIfAbsent(leaderNode.id(), new ArrayList<>());
TopicPartition topicPartiton = new TopicPartition(info.topic(), info.partition());
result.get(leaderNode.id()).add(topicPartiton);
}
}
}
return result;
}
private Collection<TopicPartition> toKafkaPartitions(
String topic,
Set<PlatformPartition> newPartitions,
KafkaConsumer<?, ?> consumer
) {
List<PartitionInfo> kafkaPartitions = consumer.partitionsFor(topic);
int partitionRatio = platformPartitions / kafkaPartitions.size();
logger.info("Discovered [{}] kafka partitions and [{}] platform partitions: [{}] platform partitions per kafka partition", kafkaPartitions.size(), platformPartitions, partitionRatio);
Map<Integer, Integer> partitionMap = new LinkedHashMap<>();
for(PlatformPartition pp: newPartitions) {
int kafkaPartition = pp.getId() % kafkaPartitions.size();
partitionMap.put(kafkaPartition, partitionMap.getOrDefault(kafkaPartition, 0) + 1);
}
List<TopicPartition> tp = new ArrayList<>(Math.max(1, partitionMap.size()));
for(Map.Entry<Integer, Integer> entry: partitionMap.entrySet()) {
Preconditions.checkState(entry.getValue() == partitionRatio, "Kafka partition %d partially assigned to this node, that is not currently supported", entry.getKey());
tp.add(new TopicPartition(topic, entry.getKey()));
}
logger.info("Assigning partitions [{}] to this node", partitionMap.keySet());
return tp;
}
@Ignore // By incorporating available partitions instead of overall partition count,
// we were getting non-deterministic partitions for known keys. This is not
// what we want for some applications, so this was changed.
@Test
public void nullKeyRoundRobinThreeAvailablePartitionsTest() {
List<PartitionInfo> partitions = new ArrayList<>();
for (int i = 0; i < 3; i++) {
partitions.add(new PartitionInfo(null, i, null, null, null));
}
when(cluster.availablePartitionsForTopic(anyString())).thenReturn(partitions);
List<Integer> results = new ArrayList<>();
for (int i = 0; i < 12; i++) {
results.add(partitioner.partition("events", null, null,
null, null, cluster));
}
List<Integer> shouldBe = of(0, 1, 2, 0, 1, 2, 0, 1, 2, 0, 1, 2);
assertThat(results).isEqualTo(shouldBe);
}
@Test
public void getPartitionsFor() {
when(consumer.partitionsFor("topic1")).thenReturn(Arrays.asList(
new PartitionInfo("topic1", 0, null, null, null),
new PartitionInfo("topic1", 1, null, null, null)));
when(consumer.partitionsFor("topic2")).thenReturn(Arrays.asList(
new PartitionInfo("topic2", 0, null, null, null),
new PartitionInfo("topic2", 1, null, null, null)));
assertThat(client.getPartitionsFor(Arrays.asList("topic1", "topic2")), is(Arrays.asList(
new TopicPartition("topic1", 0),
new TopicPartition("topic1", 1),
new TopicPartition("topic2", 0),
new TopicPartition("topic2", 1)
)));
}
private List<TopicPartition> getAllPartitions() {
// If we have not pulled this yet
if (cachedTopicsAndPartitions == null) {
// Determine which partitions to subscribe to, for now do all
final List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor(clientConfig.getTopicConfig().getTopicName());
// Pull out partitions, convert to topic partitions
cachedTopicsAndPartitions = new ArrayList<>();
for (final PartitionInfo partitionInfo : partitionInfos) {
// Skip filtered partitions
if (!clientConfig.isPartitionFiltered(partitionInfo.partition())) {
cachedTopicsAndPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
}
}
return cachedTopicsAndPartitions;
}
public static Map<Integer, Long> getEarliestOffsets(final CubeInstance cubeInstance) {
final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable());
final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
final String topic = kafkaConfig.getTopic();
Map<Integer, Long> startOffsets = Maps.newHashMap();
try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName())) {
final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfos) {
long latest = getEarliestOffset(consumer, topic, partitionInfo.partition());
startOffsets.put(partitionInfo.partition(), latest);
}
}
return startOffsets;
}
public int partition(Object object, List<PartitionInfo> partitions) {
int numPartitions = partitions.size();
Preconditions.checkArgument(numPartitions > 0);
byte[] key = (byte[]) object;
String strKey = new String((byte[]) key);
int partitionNum = hf.newHasher(16).putString(strKey, Charsets.UTF_8).hash().asInt()
% numPartitions;
if (partitionNum < 0) {
partitionNum += numPartitions;
}
LOG.debug(String.format("The partition number for key {} is {}", key, partitionNum));
return partitionNum;
}
public boolean verifyTopicsExist(String kafkaBrokers, Set<String> requiredTopics,
boolean checkPartitionCounts) {
Properties props = new Properties();
props.put("bootstrap.servers", kafkaBrokers);
props.put("group.id", UUID.randomUUID().toString());
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
if (serviceProperties != null) {
SaslConfigurator configurator = new SaslConfigurator();
configurator.configureSasl(props, serviceProperties.getServiceName(), serviceProperties.getKafkaPassword());
} else {
logger.warn("TopicVerification was not initialized, SASL will not be supported for this connection");
}
KafkaConsumer consumer = new KafkaConsumer(props);
try {
@SuppressWarnings("unchecked")
Map<String, List<PartitionInfo>> topics = consumer.listTopics();
Set<Integer> partitionCount = new HashSet<>();
for (String requiredTopic : requiredTopics) {
List<PartitionInfo> partitions = topics.get(requiredTopic);
if (partitions == null) {
logger.info("Required kafka topic {} not present", requiredTopic);
return false;
}
partitionCount.add(partitions.size());
}
if (checkPartitionCounts && partitionCount.size() > 1) {
logger.warn("Partition count mismatch in topics {}",
Arrays.toString(requiredTopics.toArray()));
return false;
}
return true;
} finally {
consumer.close();
}
}
/**
* Check if local partitions are available if yes then assign those to the localPartitions object
* else assigns all supplied partitions to the localPartitions object
* @param partitions
*/
protected void checkAndAssignLocalPartitions(List<PartitionInfo> partitions) {
List<PartitionInfo> retainLocalPartition = retainLocalPartitions(partitions);
if (retainLocalPartition.isEmpty()) {
OpenTsdbMetricConverter.gauge(SingerMetrics.MISSING_LOCAL_PARTITIONS, 1, "locality=" + rack,
"host=" + KafkaWriter.HOSTNAME);
// reset to all partitions if no local partitions are available
localPartitions = partitions;
} else {
localPartitions = retainLocalPartition;
}
}
@Override
public int partition(Object messageKey, List<PartitionInfo> partitions) {
if (localPartitions == null || isTimeToRefresh()) {
checkAndAssignLocalPartitions(partitions);
// set next refresh time
updateNextRefreshTime();
// NOTE we are not doing a delta update here since PartitionInfo object doesn't
// have an overridden hashcode and equals implementation therefore the delta computation
// will be cumbersome
partitionId = localPartitions.get(Math.abs(random.nextInt() % localPartitions.size()))
.partition();
}
return partitionId;
}
@Test
public void deletingTopicThatExistShouldReturnTrue() {
final Map<String, List<PartitionInfo>> topics = new HashMap<>();
topics.put("non_existent_topic", Lists.newArrayList());
when(kafkaConsumer.listTopics()).thenReturn(topics);
assertTrue(kafkaService.deleteTopic("non_existent_topic"));
verify(kafkaConsumer).listTopics();
verify(kafkaConsumer).close();
verify(adminUtils).deleteTopic(zkUtils, "non_existent_topic");
verifyNoMoreInteractions(kafkaConsumer);
}
@Override
public List<PartitionInfo> partitionsFor(String topic) {
synchronized (producerClosingLock) {
ensureNotClosed();
return kafkaProducer.partitionsFor(topic);
}
}
private Collection<PartitionInfo> getPartitionInfo(String topic,
final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties,
final ConsumerFactory<?, ?> consumerFactory, int partitionCount) {
return provisioningProvider.getPartitionsForTopic(partitionCount,
extendedConsumerProperties.getExtension().isAutoRebalanceEnabled(),
() -> {
try (Consumer<?, ?> consumer = consumerFactory.createConsumer()) {
return consumer.partitionsFor(topic);
}
}, topic);
}
@Test
public void getSampleMessageProperlyReturnsAMessageFromAGivenKafkaTopic() {
final String topicName = "t";
final Node host = new Node(1, "host", 8080);
final Node[] replicas = {host};
final List<PartitionInfo> partitionInfo = Lists.newArrayList(new PartitionInfo(topicName, 1, host, replicas, replicas));
final TopicPartition topicPartition = new TopicPartition(topicName, 1);
final List<TopicPartition> topicPartitions = Lists.newArrayList(topicPartition);
final Set<TopicPartition> topicPartitionsSet = Sets.newHashSet(topicPartitions);
final ConsumerRecords<String, String> records = new ConsumerRecords<>(new HashMap<TopicPartition, List<ConsumerRecord<String, String>>>() {{
put(topicPartition, Lists.newArrayList(new ConsumerRecord<>(topicName, 1, 1, "k", "message")));
}});
when(kafkaConsumer.listTopics()).thenReturn(new HashMap<String, List<PartitionInfo>>() {{ put(topicName, Lists.newArrayList()); }});
when(kafkaConsumer.partitionsFor(eq(topicName))).thenReturn(partitionInfo);
when(kafkaConsumer.assignment()).thenReturn(topicPartitionsSet);
when(kafkaConsumer.position(topicPartition)).thenReturn(1L);
when(kafkaConsumer.poll(100)).thenReturn(records);
assertEquals("message", kafkaService.getSampleMessage(topicName));
verify(kafkaConsumer).assign(eq(topicPartitions));
verify(kafkaConsumer).assignment();
verify(kafkaConsumer).poll(100);
verify(kafkaConsumer).unsubscribe();
verify(kafkaConsumer, times(2)).position(topicPartition);
verify(kafkaConsumer).seek(topicPartition, 0);
verifyNoInteractions(zkUtils, adminUtils);
}
@Test
public void testPartitionWithStringKey() {
List<PartitionInfo> partitions = Arrays.asList(new PartitionInfo[50]);
String strKey = "399553798187727154";
byte[] key = strKey.getBytes(Charsets.UTF_8);
assertEquals(24, partitioner.partition(key, partitions));
}
@Test(timeout = 5000)
public void kafkaBinderDoesNotAnswer() {
final List<PartitionInfo> partitions = partitions(new Node(-1, null, 0));
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation(
"group3-healthIndicator", partitions, false));
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC))
.willAnswer(invocation -> {
final int fiveMinutes = 1000 * 60 * 5;
Thread.sleep(fiveMinutes);
return partitions;
});
this.indicator.setTimeout(1);
Health health = indicator.health();
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
}
private void validatePausedPartitions(Datastream datastream, List<Datastream> allDatastreams)
throws DatastreamValidationException {
Map<String, Set<String>> pausedSourcePartitionsMap = DatastreamUtils.getDatastreamSourcePartitions(datastream);
for (Map.Entry<String, Set<String>> entry : pausedSourcePartitionsMap.entrySet()) {
String source = entry.getKey();
Set<String> newPartitions = entry.getValue();
// Validate that partitions actually exist and convert any "*" to actual list of partitions.
// For that, get the list of existing partitions first.
List<PartitionInfo> partitionInfos = getKafkaTopicPartitions(datastream, source);
Set<String> allPartitions = new HashSet<>();
for (PartitionInfo info : partitionInfos) {
allPartitions.add(String.valueOf(info.partition()));
}
// if there is any * in the new list, just convert it to actual list of partitions.
if (newPartitions.contains(DatastreamMetadataConstants.REGEX_PAUSE_ALL_PARTITIONS_IN_A_TOPIC)) {
newPartitions.clear();
newPartitions.addAll(allPartitions);
} else {
// Else make sure there aren't any partitions that don't exist.
newPartitions.retainAll(allPartitions);
}
}
// Now write back the set to datastream
datastream.getMetadata()
.put(DatastreamMetadataConstants.PAUSED_SOURCE_PARTITIONS_KEY, JsonUtils.toJson(pausedSourcePartitionsMap));
}