org.apache.kafka.clients.producer.BufferExhaustedException#org.apache.kafka.common.PartitionInfo源码实例Demo

下面列出了org.apache.kafka.clients.producer.BufferExhaustedException#org.apache.kafka.common.PartitionInfo 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: common-kafka   文件: ConsumerOffsetClientTest.java
@Test
public void getEndOffsets() {
    Map<TopicPartition, Long> offsets = new HashMap<>();
    offsets.put(new TopicPartition("topic1", 0), 123L);
    offsets.put(new TopicPartition("topic1", 1), 234L);
    offsets.put(new TopicPartition("topic2", 0), 345L);
    offsets.put(new TopicPartition("topic2", 1), 456L);

    when(consumer.partitionsFor("topic1")).thenReturn(Arrays.asList(
            new PartitionInfo("topic1", 0, null, null, null),
            new PartitionInfo("topic1", 1, null, null, null)));
    when(consumer.partitionsFor("topic2")).thenReturn(Arrays.asList(
            new PartitionInfo("topic2", 0, null, null, null),
            new PartitionInfo("topic2", 1, null, null, null)));

    when(consumer.endOffsets(Arrays.asList(
            new TopicPartition("topic1", 0),
            new TopicPartition("topic1", 1),
            new TopicPartition("topic2", 0),
            new TopicPartition("topic2", 1)
    ))).thenReturn(offsets);

    assertThat(client.getEndOffsets(Arrays.asList("topic1", "topic2")), is(offsets));
}
 
源代码2 项目: cruise-control   文件: ExecutionTaskPlannerTest.java
@Test
public void testClear() {
  List<ExecutionProposal> proposals = new ArrayList<>();
  proposals.add(_leaderMovement1);
  proposals.add(_partitionMovement1);
  ExecutionTaskPlanner planner =
      new ExecutionTaskPlanner(null, new KafkaCruiseControlConfig(KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties()));

  Set<PartitionInfo> partitions = new HashSet<>();

  partitions.add(generatePartitionInfo(_leaderMovement1, false));
  partitions.add(generatePartitionInfo(_partitionMovement1, false));

  Cluster expectedCluster = new Cluster(null,
                                        _expectedNodes,
                                        partitions,
                                        Collections.<String>emptySet(),
                                        Collections.<String>emptySet());

  planner.addExecutionProposals(proposals, expectedCluster, null);
  assertEquals(2, planner.remainingLeadershipMovements().size());
  assertEquals(2, planner.remainingInterBrokerReplicaMovements().size());
  planner.clear();
  assertEquals(0, planner.remainingLeadershipMovements().size());
  assertEquals(0, planner.remainingInterBrokerReplicaMovements().size());
}
 
源代码3 项目: doctorkafka   文件: Email.java
public static void alertOnProlongedUnderReplicatedPartitions(String[] emails,
                                                             String clusterName,
                                                             int waitTimeInSeconds,
                                                             List<PartitionInfo> urps) {
  if (prolongedUrpEmails.containsKey(clusterName) &&
      System.currentTimeMillis() - prolongedUrpEmails.get(clusterName) < COOLOFF_INTERVAL) {
    // return to avoid spamming users if an email has been sent within the coll-time time span
    return;
  }

  prolongedUrpEmails.put(clusterName, System.currentTimeMillis());
  String title = clusterName + " has been under-replicated for > "
      + waitTimeInSeconds + " seconds (" + urps.size() + ") under-replicated partitions";
  StringBuilder sb = new StringBuilder();
  for (PartitionInfo partitionInfo : urps) {
    sb.append(partitionInfo + "\n");
  }
  String content = sb.toString();
  sendTo(emails, title, content);
}
 
源代码4 项目: metron   文件: KafkaServiceImplTest.java
@Test
public void listTopicsHappyPath() {
  final Map<String, List<PartitionInfo>> topics = new HashMap<>();
  topics.put("topic1", Lists.newArrayList());
  topics.put("topic2", Lists.newArrayList());
  topics.put("topic3", Lists.newArrayList());

  when(kafkaConsumer.listTopics()).thenReturn(topics);

  final Set<String> listedTopics = kafkaService.listTopics();

  assertEquals(Sets.newHashSet("topic1", "topic2", "topic3"), listedTopics);

  verifyNoInteractions(zkUtils);
  verify(kafkaConsumer).listTopics();
  verify(kafkaConsumer).close();
  verifyNoMoreInteractions(kafkaConsumer, zkUtils);
}
 
源代码5 项目: Flink-CEPplus   文件: KafkaPartitionDiscoverer.java
@Override
protected List<KafkaTopicPartition> getAllPartitionsForTopics(List<String> topics) throws AbstractPartitionDiscoverer.WakeupException {
	List<KafkaTopicPartition> partitions = new LinkedList<>();

	try {
		for (String topic : topics) {
			for (PartitionInfo partitionInfo : kafkaConsumer.partitionsFor(topic)) {
				partitions.add(new KafkaTopicPartition(partitionInfo.topic(), partitionInfo.partition()));
			}
		}
	} catch (org.apache.kafka.common.errors.WakeupException e) {
		// rethrow our own wakeup exception
		throw new AbstractPartitionDiscoverer.WakeupException();
	}

	return partitions;
}
 
源代码6 项目: KafkaExample   文件: HashPartitioner.java
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
	List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
	int numPartitions = partitions.size();
	if (keyBytes != null) {
		int hashCode = 0;
		if (key instanceof Integer || key instanceof Long) {
			hashCode = (int) key;
		} else {
			hashCode = key.hashCode();
		}
		hashCode = hashCode & 0x7fffffff;
		return hashCode % numPartitions;
	} else {
		return 0;
	}
}
 
源代码7 项目: flink   文件: Kafka010PartitionDiscoverer.java
@Override
protected List<KafkaTopicPartition> getAllPartitionsForTopics(List<String> topics) throws WakeupException, RuntimeException {
	List<KafkaTopicPartition> partitions = new LinkedList<>();

	try {
		for (String topic : topics) {
			final List<PartitionInfo> kafkaPartitions = kafkaConsumer.partitionsFor(topic);

			if (kafkaPartitions == null) {
				throw new RuntimeException(String.format("Could not fetch partitions for %s. Make sure that the topic exists.", topic));
			}

			for (PartitionInfo partitionInfo : kafkaPartitions) {
				partitions.add(new KafkaTopicPartition(partitionInfo.topic(), partitionInfo.partition()));
			}
		}
	} catch (org.apache.kafka.common.errors.WakeupException e) {
		// rethrow our own wakeup exception
		throw new WakeupException();
	}

	return partitions;
}
 
源代码8 项目: doctorkafka   文件: KafkaClusterManager.java
/**
 *  Remove the under-replicated partitions that are in the middle of partition reassignment.
 */
public List<PartitionInfo> filterOutInReassignmentUrps(List<PartitionInfo> urps,
                                                       Map<String, Integer> replicationFactors) {
  List<PartitionInfo> result = new ArrayList<>();
  for (PartitionInfo urp : urps) {
    if (urp.replicas().length <= replicationFactors.get(urp.topic())) {
      // # of replicas <= replication factor
      result.add(urp);
    } else {
      // # of replicas > replication factor. this can happen after
      // a failed partition reassignment
      Set<Integer> liveReplicas = new HashSet<>();
      for (Node node : urp.replicas()) {
        if (node.host() != null && OperatorUtil.pingKafkaBroker(node.host(), 9092, 5000)) {
          liveReplicas.add(node.id());
        }
      }
      if (liveReplicas.size() < replicationFactors.get(urp.topic())) {
        result.add(urp);
      }
    }
  }
  return result;
}
 
源代码9 项目: flink   文件: FlinkKafkaProducerBase.java
protected static int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer) {
	// the fetched list is immutable, so we're creating a mutable copy in order to sort it
	List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));

	// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
	Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
		@Override
		public int compare(PartitionInfo o1, PartitionInfo o2) {
			return Integer.compare(o1.partition(), o2.partition());
		}
	});

	int[] partitions = new int[partitionsList.size()];
	for (int i = 0; i < partitions.length; i++) {
		partitions[i] = partitionsList.get(i).partition();
	}

	return partitions;
}
 
源代码10 项目: flink   文件: FlinkKafkaProducer.java
private static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) {
	// the fetched list is immutable, so we're creating a mutable copy in order to sort it
	List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));

	// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
	Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
		@Override
		public int compare(PartitionInfo o1, PartitionInfo o2) {
			return Integer.compare(o1.partition(), o2.partition());
		}
	});

	int[] partitions = new int[partitionsList.size()];
	for (int i = 0; i < partitions.length; i++) {
		partitions[i] = partitionsList.get(i).partition();
	}

	return partitions;
}
 
源代码11 项目: cruise-control   文件: KafkaSampleStore.java
protected void prepareConsumers() {
  int numConsumers = _consumers.size();
  List<List<TopicPartition>> assignments = new ArrayList<>();
  for (int i = 0; i < numConsumers; i++) {
    assignments.add(new ArrayList<>());
  }
  int j = 0;
  for (String topic : Arrays.asList(_partitionMetricSampleStoreTopic, _brokerMetricSampleStoreTopic)) {
    for (PartitionInfo partInfo : _consumers.get(0).partitionsFor(topic)) {
      assignments.get(j++ % numConsumers).add(new TopicPartition(partInfo.topic(), partInfo.partition()));
    }
  }
  for (int i = 0; i < numConsumers; i++) {
    _consumers.get(i).assign(assignments.get(i));
  }
}
 
源代码12 项目: kylin   文件: KafkaClient.java
public static Map<Integer, Long> getLatestOffsets(final CubeInstance cubeInstance) {
    final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable());

    final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
    final String topic = kafkaConfig.getTopic();

    Map<Integer, Long> startOffsets = Maps.newHashMap();
    try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName())) {
        final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
        for (PartitionInfo partitionInfo : partitionInfos) {
            long latest = getLatestOffset(consumer, topic, partitionInfo.partition());
            startOffsets.put(partitionInfo.partition(), latest);
        }
    }
    return startOffsets;
}
 
源代码13 项目: kafka-webview   文件: KafkaAdminFactoryTest.java
/**
 * Test that KafkaAdminFactory can create a working KafkaConsumer when connecting to a non-ssl cluster.
 */
@Test
public void testCreateNonSslConsumer() {
    // Create Cluster config
    final ClusterConfig clusterConfig = ClusterConfig.newBuilder()
        .withBrokerHosts(sharedKafkaTestResource.getKafkaConnectString())
        .build();

    // Create a topic
    final String topicName = "MyRandomTopic";
    sharedKafkaTestResource.getKafkaTestUtils().createTopic(topicName, 1, (short) 1);

    final KafkaAdminFactory kafkaAdminFactory = new KafkaAdminFactory(new KafkaClientConfigUtil("NotUsed", "Prefix"));

    // Create instance
    try (final KafkaConsumer<String, String> consumerClient = kafkaAdminFactory.createConsumer(clusterConfig, "MyClientId")) {

        // Call method to validate things work as expected
        final Map<String, List<PartitionInfo>> results = consumerClient.listTopics();
        assertNotNull(results);
        assertTrue(results.containsKey(topicName), "Should have our topic.");
    }
}
 
源代码14 项目: doctorkafka   文件: KafkaClusterManager.java
public Map<Integer, List<TopicPartition>> getBrokerLeaderPartitions(
    Map<String, List<PartitionInfo>> topicPartitonInfoMap) {
  Map<Integer, List<TopicPartition>> result = new HashMap<>();

  for (String topic : topicPartitonInfoMap.keySet()) {
    List<PartitionInfo> partitionInfoList = topicPartitonInfoMap.get(topic);
    if (partitionInfoList == null) {
      LOG.error("Failed to get partition info for {}", topic);
      continue;
    }

    for (PartitionInfo info : partitionInfoList) {
      Node leaderNode = info.leader();
      if (leaderNode != null) {
        result.putIfAbsent(leaderNode.id(), new ArrayList<>());
        TopicPartition topicPartiton = new TopicPartition(info.topic(), info.partition());
        result.get(leaderNode.id()).add(topicPartiton);
      }
    }
  }
  return result;
}
 
源代码15 项目: arcusplatform   文件: KafkaDispatcherImpl.java
private Collection<TopicPartition> toKafkaPartitions(
		String topic, 
		Set<PlatformPartition> newPartitions,
		KafkaConsumer<?, ?> consumer
) {
	List<PartitionInfo> kafkaPartitions = consumer.partitionsFor(topic);
	int partitionRatio = platformPartitions / kafkaPartitions.size(); 
	logger.info("Discovered [{}] kafka partitions and [{}] platform partitions: [{}] platform partitions per kafka partition", kafkaPartitions.size(), platformPartitions, partitionRatio);
	Map<Integer, Integer> partitionMap = new LinkedHashMap<>();
	for(PlatformPartition pp: newPartitions) {
		int kafkaPartition = pp.getId() % kafkaPartitions.size();
		partitionMap.put(kafkaPartition, partitionMap.getOrDefault(kafkaPartition, 0) + 1);
	}
	List<TopicPartition> tp = new ArrayList<>(Math.max(1, partitionMap.size()));
	for(Map.Entry<Integer, Integer> entry: partitionMap.entrySet()) {
		Preconditions.checkState(entry.getValue() == partitionRatio, "Kafka partition %d partially assigned to this node, that is not currently supported", entry.getKey());
		tp.add(new TopicPartition(topic, entry.getKey()));
	}
	logger.info("Assigning partitions [{}] to this node", partitionMap.keySet());
	return tp;
}
 
源代码16 项目: ja-micro   文件: SixtPartitionerTest.java
@Ignore // By incorporating available partitions instead of overall partition count,
        // we were getting non-deterministic partitions for known keys.  This is not
        // what we want for some applications, so this was changed.
@Test
public void nullKeyRoundRobinThreeAvailablePartitionsTest() {
    List<PartitionInfo> partitions = new ArrayList<>();
    for (int i = 0; i < 3; i++) {
        partitions.add(new PartitionInfo(null, i, null, null, null));
    }
    when(cluster.availablePartitionsForTopic(anyString())).thenReturn(partitions);

    List<Integer> results = new ArrayList<>();
    for (int i = 0; i < 12; i++) {
        results.add(partitioner.partition("events", null, null,
                null, null, cluster));
    }
    List<Integer> shouldBe = of(0, 1, 2, 0, 1, 2, 0, 1, 2, 0, 1, 2);
    assertThat(results).isEqualTo(shouldBe);
}
 
源代码17 项目: common-kafka   文件: ConsumerOffsetClientTest.java
@Test
public void getPartitionsFor() {
    when(consumer.partitionsFor("topic1")).thenReturn(Arrays.asList(
            new PartitionInfo("topic1", 0, null, null, null),
            new PartitionInfo("topic1", 1, null, null, null)));
    when(consumer.partitionsFor("topic2")).thenReturn(Arrays.asList(
            new PartitionInfo("topic2", 0, null, null, null),
            new PartitionInfo("topic2", 1, null, null, null)));

    assertThat(client.getPartitionsFor(Arrays.asList("topic1", "topic2")), is(Arrays.asList(
            new TopicPartition("topic1", 0),
            new TopicPartition("topic1", 1),
            new TopicPartition("topic2", 0),
            new TopicPartition("topic2", 1)
    )));
}
 
源代码18 项目: kafka-webview   文件: DefaultWebKafkaConsumer.java
private List<TopicPartition> getAllPartitions() {
    // If we have not pulled this yet
    if (cachedTopicsAndPartitions == null) {
        // Determine which partitions to subscribe to, for now do all
        final List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor(clientConfig.getTopicConfig().getTopicName());

        // Pull out partitions, convert to topic partitions
        cachedTopicsAndPartitions = new ArrayList<>();
        for (final PartitionInfo partitionInfo : partitionInfos) {
            // Skip filtered partitions
            if (!clientConfig.isPartitionFiltered(partitionInfo.partition())) {
                cachedTopicsAndPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
            }
        }
    }
    return cachedTopicsAndPartitions;
}
 
源代码19 项目: kylin   文件: KafkaClient.java
public static Map<Integer, Long> getEarliestOffsets(final CubeInstance cubeInstance) {
    final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable());

    final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
    final String topic = kafkaConfig.getTopic();

    Map<Integer, Long> startOffsets = Maps.newHashMap();
    try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName())) {
        final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
        for (PartitionInfo partitionInfo : partitionInfos) {
            long latest = getEarliestOffset(consumer, topic, partitionInfo.partition());
            startOffsets.put(partitionInfo.partition(), latest);
        }
    }
    return startOffsets;
}
 
源代码20 项目: singer   文件: StringKeyMd5Partitioner.java
public int partition(Object object, List<PartitionInfo> partitions) {
  int numPartitions = partitions.size();
  Preconditions.checkArgument(numPartitions > 0);
  byte[] key = (byte[]) object;
  String strKey = new String((byte[]) key);
  int partitionNum = hf.newHasher(16).putString(strKey, Charsets.UTF_8).hash().asInt()
      % numPartitions;
  if (partitionNum < 0) {
    partitionNum += numPartitions;
  }
  LOG.debug(String.format("The partition number for key {} is {}", key, partitionNum));
  return partitionNum;
}
 
源代码21 项目: ja-micro   文件: TopicVerification.java
public boolean verifyTopicsExist(String kafkaBrokers, Set<String> requiredTopics,
                                 boolean checkPartitionCounts) {
    Properties props = new Properties();
    props.put("bootstrap.servers", kafkaBrokers);
    props.put("group.id", UUID.randomUUID().toString());
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());
    if (serviceProperties != null) {
        SaslConfigurator configurator = new SaslConfigurator();
        configurator.configureSasl(props, serviceProperties.getServiceName(), serviceProperties.getKafkaPassword());
    } else {
        logger.warn("TopicVerification was not initialized, SASL will not be supported for this connection");
    }
    KafkaConsumer consumer = new KafkaConsumer(props);
    try {
        @SuppressWarnings("unchecked")
        Map<String, List<PartitionInfo>> topics = consumer.listTopics();

        Set<Integer> partitionCount = new HashSet<>();
        for (String requiredTopic : requiredTopics) {
            List<PartitionInfo> partitions = topics.get(requiredTopic);
            if (partitions == null) {
                logger.info("Required kafka topic {} not present", requiredTopic);
                return false;
            }
            partitionCount.add(partitions.size());
        }
        if (checkPartitionCounts && partitionCount.size() > 1) {
            logger.warn("Partition count mismatch in topics {}",
                    Arrays.toString(requiredTopics.toArray()));
            return false;
        }
        return true;
    } finally {
        consumer.close();
    }
}
 
源代码22 项目: singer   文件: LocalityAwarePartitioner.java
/**
 * Check if local partitions are available if yes then assign those to the localPartitions object
 * else assigns all supplied partitions to the localPartitions object
 * @param partitions
 */
protected void checkAndAssignLocalPartitions(List<PartitionInfo> partitions) {
  List<PartitionInfo> retainLocalPartition = retainLocalPartitions(partitions);
  if (retainLocalPartition.isEmpty()) {
    OpenTsdbMetricConverter.gauge(SingerMetrics.MISSING_LOCAL_PARTITIONS, 1, "locality=" + rack,
        "host=" + KafkaWriter.HOSTNAME);
    // reset to all partitions if no local partitions are available
    localPartitions = partitions;
  } else {
    localPartitions = retainLocalPartition;
  }
}
 
@Override
public int partition(Object messageKey, List<PartitionInfo> partitions) {
  if (localPartitions == null || isTimeToRefresh()) {
    checkAndAssignLocalPartitions(partitions);
    // set next refresh time
    updateNextRefreshTime();
    // NOTE we are not doing a delta update here since PartitionInfo object doesn't
    // have an overridden hashcode and equals implementation therefore the delta computation
    // will be cumbersome
    partitionId = localPartitions.get(Math.abs(random.nextInt() % localPartitions.size()))
        .partition();
  }
  return partitionId;
}
 
源代码24 项目: metron   文件: KafkaServiceImplTest.java
@Test
public void deletingTopicThatExistShouldReturnTrue() {
  final Map<String, List<PartitionInfo>> topics = new HashMap<>();
  topics.put("non_existent_topic", Lists.newArrayList());

  when(kafkaConsumer.listTopics()).thenReturn(topics);

  assertTrue(kafkaService.deleteTopic("non_existent_topic"));

  verify(kafkaConsumer).listTopics();
  verify(kafkaConsumer).close();
  verify(adminUtils).deleteTopic(zkUtils, "non_existent_topic");
  verifyNoMoreInteractions(kafkaConsumer);
}
 
源代码25 项目: flink   文件: FlinkKafkaProducer.java
@Override
public List<PartitionInfo> partitionsFor(String topic) {
	synchronized (producerClosingLock) {
		ensureNotClosed();
		return kafkaProducer.partitionsFor(topic);
	}
}
 
private Collection<PartitionInfo> getPartitionInfo(String topic,
		final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties,
		final ConsumerFactory<?, ?> consumerFactory, int partitionCount) {
	return provisioningProvider.getPartitionsForTopic(partitionCount,
			extendedConsumerProperties.getExtension().isAutoRebalanceEnabled(),
			() -> {
				try (Consumer<?, ?> consumer = consumerFactory.createConsumer()) {
					return consumer.partitionsFor(topic);
				}
			}, topic);
}
 
源代码27 项目: metron   文件: KafkaServiceImplTest.java
@Test
public void getSampleMessageProperlyReturnsAMessageFromAGivenKafkaTopic() {
  final String topicName = "t";
  final Node host = new Node(1, "host", 8080);
  final Node[] replicas = {host};
  final List<PartitionInfo> partitionInfo = Lists.newArrayList(new PartitionInfo(topicName, 1, host, replicas, replicas));
  final TopicPartition topicPartition = new TopicPartition(topicName, 1);
  final List<TopicPartition> topicPartitions = Lists.newArrayList(topicPartition);
  final Set<TopicPartition> topicPartitionsSet = Sets.newHashSet(topicPartitions);
  final ConsumerRecords<String, String> records = new ConsumerRecords<>(new HashMap<TopicPartition, List<ConsumerRecord<String, String>>>() {{
    put(topicPartition, Lists.newArrayList(new ConsumerRecord<>(topicName, 1, 1, "k", "message")));
  }});

  when(kafkaConsumer.listTopics()).thenReturn(new HashMap<String, List<PartitionInfo>>() {{ put(topicName, Lists.newArrayList()); }});
  when(kafkaConsumer.partitionsFor(eq(topicName))).thenReturn(partitionInfo);
  when(kafkaConsumer.assignment()).thenReturn(topicPartitionsSet);
  when(kafkaConsumer.position(topicPartition)).thenReturn(1L);
  when(kafkaConsumer.poll(100)).thenReturn(records);

  assertEquals("message", kafkaService.getSampleMessage(topicName));

  verify(kafkaConsumer).assign(eq(topicPartitions));
  verify(kafkaConsumer).assignment();
  verify(kafkaConsumer).poll(100);
  verify(kafkaConsumer).unsubscribe();
  verify(kafkaConsumer, times(2)).position(topicPartition);
  verify(kafkaConsumer).seek(topicPartition, 0);

  verifyNoInteractions(zkUtils, adminUtils);
}
 
源代码28 项目: singer   文件: StringKeyMd5PartitionerTest.java
@Test
public void testPartitionWithStringKey() {
  List<PartitionInfo> partitions = Arrays.asList(new PartitionInfo[50]);
  String strKey = "399553798187727154";
  byte[] key = strKey.getBytes(Charsets.UTF_8);
  assertEquals(24, partitioner.partition(key, partitions));
}
 
@Test(timeout = 5000)
public void kafkaBinderDoesNotAnswer() {
	final List<PartitionInfo> partitions = partitions(new Node(-1, null, 0));
	topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation(
			"group3-healthIndicator", partitions, false));
	org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC))
			.willAnswer(invocation -> {
				final int fiveMinutes = 1000 * 60 * 5;
				Thread.sleep(fiveMinutes);
				return partitions;
			});
	this.indicator.setTimeout(1);
	Health health = indicator.health();
	assertThat(health.getStatus()).isEqualTo(Status.DOWN);
}
 
源代码30 项目: brooklin   文件: AbstractKafkaConnector.java
private void validatePausedPartitions(Datastream datastream, List<Datastream> allDatastreams)
    throws DatastreamValidationException {
  Map<String, Set<String>> pausedSourcePartitionsMap = DatastreamUtils.getDatastreamSourcePartitions(datastream);

  for (Map.Entry<String, Set<String>> entry : pausedSourcePartitionsMap.entrySet()) {
    String source = entry.getKey();
    Set<String> newPartitions = entry.getValue();

    // Validate that partitions actually exist and convert any "*" to actual list of partitions.
    // For that, get the list of existing partitions first.
    List<PartitionInfo> partitionInfos = getKafkaTopicPartitions(datastream, source);
    Set<String> allPartitions = new HashSet<>();
    for (PartitionInfo info : partitionInfos) {
      allPartitions.add(String.valueOf(info.partition()));
    }

    // if there is any * in the new list, just convert it to actual list of partitions.
    if (newPartitions.contains(DatastreamMetadataConstants.REGEX_PAUSE_ALL_PARTITIONS_IN_A_TOPIC)) {
      newPartitions.clear();
      newPartitions.addAll(allPartitions);
    } else {
      // Else make sure there aren't any partitions that don't exist.
      newPartitions.retainAll(allPartitions);
    }
  }

  // Now write back the set to datastream
  datastream.getMetadata()
      .put(DatastreamMetadataConstants.PAUSED_SOURCE_PARTITIONS_KEY, JsonUtils.toJson(pausedSourcePartitionsMap));
}