下面列出了com.google.common.collect.Maps.EntryTransformer#kafka.javaapi.PartitionMetadata 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public boolean run() throws Exception {
if (init) {
return init;
}
// find the meta data about the topic and partition we are interested in
PartitionMetadata metadata = findLeader(seedBrokers, port, topic, partition);
if (metadata == null) {
throw new SynapseException("Can't find metadata for Topic and Partition. Exiting");
}
if (metadata.leader() == null) {
throw new SynapseException("Can't find Leader for Topic and Partition. Exiting");
}
this.leadBroker = metadata.leader().host();
this.clientName = "Client_" + topic + "_" + partition;
this.consumer = new SimpleConsumer(leadBroker, port, KAFKAConstants.BUFFER_SIZE, KAFKAConstants.SO_TIMEOUT,
clientName);
this.readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
init = true;
return init;
}
private String findNewLeader(String oldLeader, String topic, int partition, int port) throws Exception {
for (int i = 0; i < 3; i++) {
boolean goToSleep = false;
PartitionMetadata metadata = findLeader(replicaBrokers, port, topic, partition);
if (metadata == null) {
goToSleep = true;
} else if (metadata.leader() == null) {
goToSleep = true;
} else if (oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
goToSleep = true;
} else {
return metadata.leader().host();
}
if (goToSleep) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}
}
throw new SynapseException("Unable to find new leader after Broker failure. Exiting");
}
public Map<Integer, Long> fetch(String topic, int partitionCount) {
Map<Integer, PartitionMetadata> metadatas = fetchPartitionMetadata(brokerList, port, topic, partitionCount);
Map<Integer, Long> ret = new HashMap<>();
for (int partition = 0; partition < partitionCount; partition++) {
PartitionMetadata metadata = metadatas.get(partition);
if (metadata == null || metadata.leader() == null) {
ret.put(partition, -1L);
//throw new RuntimeException("Can't find Leader for Topic and Partition. Exiting");
}
String leadBroker = metadata.leader().host();
String clientName = "Client_" + topic + "_" + partition;
SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
long latestOffset = getLatestOffset(consumer, topic, partition, clientName);
if (consumer != null) {
consumer.close();
}
ret.put(partition, latestOffset);
}
return ret;
}
private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
for (int i = 0; i < 3; i++) {
boolean goToSleep = false;
PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
if (metadata == null) {
goToSleep = true;
} else if (metadata.leader() == null) {
goToSleep = true;
} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
// first time through if the leader hasn't changed give ZooKeeper a second to recover
// second time, assume the broker did recover before failover, or it was a non-Broker issue
//
goToSleep = true;
} else {
return metadata.leader().host();
}
if (goToSleep) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}
}
LOG.info("Unable to find new leader after Broker failure. Exiting");
throw new Exception("Unable to find new leader after Broker failure. Exiting");
}
private HostAndPort findNewLeader(HostAndPort oldLeader, String topic, int partition) throws StageException {
//try 3 times to find a new leader
for (int i = 0; i < 3; i++) {
boolean sleep;
PartitionMetadata metadata = getPartitionMetadata(replicaBrokers, topic, partition);
if (metadata == null || metadata.leader() == null) {
sleep = true;
} else if (oldLeader.getHostText().equalsIgnoreCase(metadata.leader().host()) && i == 0) {
//leader has not yet changed, give zookeeper sometime
sleep = true;
} else {
return HostAndPort.fromParts(metadata.leader().host(), metadata.leader().port());
}
if (sleep) {
ThreadUtil.sleep(ONE_SECOND);
}
}
LOG.error(KafkaErrors.KAFKA_21.getMessage());
throw new StageException(KafkaErrors.KAFKA_21);
}
private HostAndPort findNewLeader(HostAndPort oldLeader, String topic, int partition) throws StageException {
//try 3 times to find a new leader
for (int i = 0; i < 3; i++) {
boolean sleep;
PartitionMetadata metadata = getPartitionMetadata(replicaBrokers, topic, partition);
if (metadata == null || metadata.leader() == null) {
sleep = true;
} else if (oldLeader.getHostText().equalsIgnoreCase(metadata.leader().host()) && i == 0) {
//leader has not yet changed, give zookeeper sometime
sleep = true;
} else {
return HostAndPort.fromParts(metadata.leader().host(), metadata.leader().port());
}
if (sleep) {
ThreadUtil.sleep(ONE_SECOND);
}
}
LOG.error(KafkaErrors.KAFKA_21.getMessage());
throw new StageException(KafkaErrors.KAFKA_21);
}
private Broker findNewLeader(Broker oldLeader) throws InterruptedException {
long retryCnt = 0;
while (true) {
PartitionMetadata metadata = findLeader();
logger.debug("findNewLeader - meta leader {}, previous leader {}", metadata, oldLeader);
if (metadata != null && metadata.leader() != null && (oldLeader == null ||
(!(oldLeader.host().equalsIgnoreCase(metadata.leader().host()) &&
(oldLeader.port() == metadata.leader().port())) || retryCnt != 0))) {
// first time through if the leader hasn't changed give ZooKeeper a second to recover
// second time, assume the broker did recover before failover, or it was a non-Broker issue
logger.info("findNewLeader - using new leader {} from meta data, previous leader {}", metadata.leader(), oldLeader);
return metadata.leader();
}
//TODO: backoff retry
Thread.sleep(1000L);
retryCnt ++;
// if could not find the leader for current replicaBrokers, let's try to find one via allBrokers
if (retryCnt >= 3 && (retryCnt - 3) % 5 == 0) {
logger.warn("can nof find leader for {} - {} after {} retries", topic, partitionId, retryCnt);
replicaBrokers.clear();
replicaBrokers.addAll(allBrokers);
}
}
}
private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
for (int i = 0; i < 3; i++) {
boolean goToSleep = false;
PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
if (metadata == null) {
goToSleep = true;
} else if (metadata.leader() == null) {
goToSleep = true;
} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
// first time through if the leader hasn't changed give ZooKeeper a second to recover
// second time, assume the broker did recover before failover, or it was a non-Broker issue
//
goToSleep = true;
} else {
return metadata.leader().host();
}
if (goToSleep) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}
}
System.out.println("Unable to find new leader after Broker failure. Exiting");
throw new Exception("Unable to find new leader after Broker failure. Exiting");
}
private List<KafkaPartition> getPartitionsForTopic(TopicMetadata topicMetadata) {
List<KafkaPartition> partitions = Lists.newArrayList();
for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
if (null == partitionMetadata) {
LOG.error("Ignoring topic with null partition metadata " + topicMetadata.topic());
return Collections.emptyList();
}
if (null == partitionMetadata.leader()) {
LOG.error(
"Ignoring topic with null partition leader " + topicMetadata.topic() + " metatada=" + partitionMetadata);
return Collections.emptyList();
}
partitions.add(new KafkaPartition.Builder().withId(partitionMetadata.partitionId())
.withTopicName(topicMetadata.topic()).withLeaderId(partitionMetadata.leader().id())
.withLeaderHostAndPort(partitionMetadata.leader().host(), partitionMetadata.leader().port()).build());
}
return partitions;
}
private void refreshTopicMetadata(KafkaPartition partition) {
for (String broker : KafkaWrapper.this.getBrokers()) {
List<TopicMetadata> topicMetadataList = fetchTopicMetadataFromBroker(broker, partition.getTopicName());
if (topicMetadataList != null && !topicMetadataList.isEmpty()) {
TopicMetadata topicMetadata = topicMetadataList.get(0);
for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
if (partitionMetadata.partitionId() == partition.getId()) {
partition.setLeader(partitionMetadata.leader().id(), partitionMetadata.leader().host(),
partitionMetadata.leader().port());
break;
}
}
break;
}
}
}
private List<KafkaPartition> getPartitionsForTopic(TopicMetadata topicMetadata) {
List<KafkaPartition> partitions = Lists.newArrayList();
for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
if (null == partitionMetadata) {
log.error("Ignoring topic with null partition metadata " + topicMetadata.topic());
return Collections.emptyList();
}
if (null == partitionMetadata.leader()) {
log.error("Ignoring topic with null partition leader " + topicMetadata.topic() + " metatada="
+ partitionMetadata);
return Collections.emptyList();
}
partitions.add(new KafkaPartition.Builder().withId(partitionMetadata.partitionId())
.withTopicName(topicMetadata.topic()).withLeaderId(partitionMetadata.leader().id())
.withLeaderHostAndPort(partitionMetadata.leader().host(), partitionMetadata.leader().port()).build());
}
return partitions;
}
private void refreshTopicMetadata(KafkaPartition partition) {
for (String broker : this.brokers) {
List<TopicMetadata> topicMetadataList = fetchTopicMetadataFromBroker(broker, partition.getTopicName());
if (topicMetadataList != null && !topicMetadataList.isEmpty()) {
TopicMetadata topicMetadata = topicMetadataList.get(0);
for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
if (partitionMetadata.partitionId() == partition.getId()) {
partition.setLeader(partitionMetadata.leader().id(), partitionMetadata.leader().host(), partitionMetadata
.leader().port());
break;
}
}
break;
}
}
}
private HostPort findNewLeader(String oldHost, int oldPort) throws LostLeadershipException {
for (int i = 0; i < 3; i++) {
boolean goToSleep = false;
PartitionMetadata metadata = findLeader(replicaBrokers, topic, partition);
if (metadata == null) {
goToSleep = true;
} else if (metadata.leader() == null) {
goToSleep = true;
} else if (oldHost.equalsIgnoreCase(metadata.leader().host()) &&
oldPort == metadata.leader().port()) {
// first time through if the leader hasn't changed give ZooKeeper a second to recover
// second time, assume the broker did recover before failover, or it was a non-Broker issue
goToSleep = true;
} else {
return new HostPort(metadata.leader().host(), metadata.leader().port());
}
if (goToSleep) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}
}
// Can't recover from a leadership disappearance.
throw new LostLeadershipException();
}
public void run() {
try {
while (!exit) {
KafkaTool kafkaTool = new KafkaTool(topic, cluster.getZKConnect());
kafkaTool.connect();
TopicMetadata topicMeta = kafkaTool.findTopicMetadata(topic);
PartitionMetadata partitionMeta = findPartition(topicMeta, partition);
Broker partitionLeader = partitionMeta.leader();
Server kafkaServer = cluster.findKafkaServerByPort(partitionLeader.port());
System.out.println("Shutdown kafka server " + kafkaServer.getPort());
kafkaServer.shutdown();
failureCount++;
Thread.sleep(sleepBeforeRestart);
kafkaServer.start();
kafkaTool.close();
Thread.sleep(10000); //wait to make sure that the kafka server start
}
} catch (Exception e) {
e.printStackTrace();
}
synchronized (this) {
notify();
}
}
private void info(List<PartitionMetadata> holder) {
String[] header = {
"Partition Id", "Leader", "Replicas"
};
TabularFormater formater = new TabularFormater(header);
formater.setTitle("Partitions");
for(PartitionMetadata sel : holder) {
StringBuilder replicas = new StringBuilder();
for(Broker broker : sel.replicas()) {
if(replicas.length() > 0) replicas.append(",");
replicas.append(broker.port());
}
formater.addRow(sel.partitionId(), sel.leader().port(), replicas.toString());
}
System.out.println(formater.getFormatText());
}
private void readFromPartition(String consumerName, int partition, int maxRead) throws Exception {
KafkaTool kafkaTool = new KafkaTool(consumerName, cluster.getZKConnect());
kafkaTool.connect();
TopicMetadata topicMetadata = kafkaTool.findTopicMetadata("hello");
PartitionMetadata partitionMetadata = findPartition(topicMetadata.partitionsMetadata(), partition);
KafkaPartitionReader partitionReader =
new KafkaPartitionReader(consumerName, cluster.getZKConnect(), "hello", partitionMetadata);
List<byte[]> messages = partitionReader.fetch(10000, maxRead);
for(int i = 0; i < messages.size(); i++) {
byte[] message = messages.get(i) ;
System.out.println((i + 1) + ". " + new String(message));
}
partitionReader.commit();
partitionReader.close();
kafkaTool.close();
}
/**
* @param a_oldLeader
* @param a_topic
* @param a_partition
* @param a_port
* @return String
* @throws Exception
* 找一个leader broker
*/
private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
for (int i = 0; i < 3; i++) {
boolean goToSleep = false;
PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
if (metadata == null) {
goToSleep = true;
} else if (metadata.leader() == null) {
goToSleep = true;
} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
// first time through if the leader hasn't changed give
// ZooKeeper a second to recover
// second time, assume the broker did recover before failover,
// or it was a non-Broker issue
//
goToSleep = true;
} else {
return metadata.leader().host();
}
if (goToSleep) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}
}
System.out.println("Unable to find new leader after Broker failure. Exiting");
throw new Exception("Unable to find new leader after Broker failure. Exiting");
}
/**
* @param brokerList brokers in same cluster
* @param topic
* @return Get the partition metadata list for the specific topic via the brokerList <br>
* null if topic is not found
*/
public static List<PartitionMetadata> getPartitionsForTopic(Set<String> brokerList, String topic)
{
TopicMetadata tmd = getTopicMetadata(brokerList, topic);
if (tmd == null) {
return null;
}
return tmd.partitionsMetadata();
}
/**
* @param brokers in multiple clusters, keyed by cluster id
* @param topic
* @return Get the partition metadata list for the specific topic via the brokers
* null if topic is not found
*/
public static Map<String, List<PartitionMetadata>> getPartitionsForTopic(SetMultimap<String, String> brokers, final String topic)
{
return Maps.transformEntries(brokers.asMap(), new EntryTransformer<String, Collection<String>, List<PartitionMetadata>>()
{
@Override
public List<PartitionMetadata> transformEntry(String key, Collection<String> bs)
{
return getPartitionsForTopic(new HashSet<String>(bs), topic);
}
});
}
/**
* @param brokerList
* @param topic
* @param partition
* @return Get the partition metadata for specific topic and partition via the brokerList<br>
* null if topic is not found
*/
public static PartitionMetadata getPartitionForTopic(Set<String> brokerList, String topic, int partition)
{
List<PartitionMetadata> pmds = getPartitionsForTopic(brokerList, topic);
if (pmds == null) {
return null;
}
for (PartitionMetadata pmd : pmds) {
if (pmd.partitionId() != partition) {
continue;
}
return pmd;
}
return null;
}
private void initializeLastProcessingOffset()
{
// read last received kafka message
TopicMetadata tm = KafkaMetadataUtil.getTopicMetadata(Sets.newHashSet((String)getConfigProperties().get(KafkaMetadataUtil.PRODUCER_PROP_BROKERLIST)), this.getTopic());
if (tm == null) {
throw new RuntimeException("Failed to retrieve topic metadata");
}
partitionNum = tm.partitionsMetadata().size();
lastMsgs = new HashMap<Integer, Pair<byte[],byte[]>>(partitionNum);
for (PartitionMetadata pm : tm.partitionsMetadata()) {
String leadBroker = pm.leader().host();
int port = pm.leader().port();
String clientName = this.getClass().getName().replace('$', '.') + "_Client_" + tm.topic() + "_" + pm.partitionId();
SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
long readOffset = KafkaMetadataUtil.getLastOffset(consumer, tm.topic(), pm.partitionId(), kafka.api.OffsetRequest.LatestTime(), clientName);
FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(tm.topic(), pm.partitionId(), readOffset - 1, 100000).build();
FetchResponse fetchResponse = consumer.fetch(req);
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(tm.topic(), pm.partitionId())) {
Message m = messageAndOffset.message();
ByteBuffer payload = m.payload();
ByteBuffer key = m.key();
byte[] valueBytes = new byte[payload.limit()];
byte[] keyBytes = new byte[key.limit()];
payload.get(valueBytes);
key.get(keyBytes);
lastMsgs.put(pm.partitionId(), new Pair<byte[], byte[]>(keyBytes, valueBytes));
}
}
}
@Override
public void init() throws StageException {
List<HostAndPort> brokers = new ArrayList<>();
brokers.add(broker);
PartitionMetadata metadata = getPartitionMetadata(brokers, topic, partition);
if (metadata == null) {
LOG.error(KafkaErrors.KAFKA_23.getMessage(), topic, partition);
throw new StageException(KafkaErrors.KAFKA_23, topic, partition);
}
if (metadata.leader() == null) {
LOG.error(KafkaErrors.KAFKA_24.getMessage(), topic, partition);
throw new StageException(KafkaErrors.KAFKA_24, topic, partition);
}
leader = HostAndPort.fromParts(metadata.leader().host(), metadata.leader().port());
//recreate consumer instance with the leader information for that topic
LOG.info(
"Creating SimpleConsumer using the following configuration: host {}, port {}, max wait time {}, max " +
"fetch size {}, client columnName {}",
leader.getHostText(),
leader.getPort(),
maxWaitTime,
maxFetchSize,
clientName
);
consumer = new SimpleConsumer(
leader.getHostText(),
leader.getPort(),
maxWaitTime,
maxFetchSize,
clientName
);
}
@Override
public void init() throws StageException {
List<HostAndPort> brokers = new ArrayList<>();
brokers.add(broker);
PartitionMetadata metadata = getPartitionMetadata(brokers, topic, partition);
if (metadata == null) {
LOG.error(KafkaErrors.KAFKA_23.getMessage(), topic, partition);
throw new StageException(KafkaErrors.KAFKA_23, topic, partition);
}
if (metadata.leader() == null) {
LOG.error(KafkaErrors.KAFKA_24.getMessage(), topic, partition);
throw new StageException(KafkaErrors.KAFKA_24, topic, partition);
}
leader = HostAndPort.fromParts(metadata.leader().host(), metadata.leader().port());
//recreate consumer instance with the leader information for that topic
LOG.info(
"Creating SimpleConsumer using the following configuration: host {}, port {}, max wait time {}, max " +
"fetch size {}, client columnName {}",
leader.getHostText(),
leader.getPort(),
maxWaitTime,
maxFetchSize,
clientName
);
consumer = new SimpleConsumer(
leader.getHostText(),
leader.getPort(),
maxWaitTime,
maxFetchSize,
clientName
);
}
private HostAndPort findLeader(TopicPartition topicPartition) {
SimpleConsumer consumer = null;
try {
LOG.debug("looking up leader for topic {} partition {}", topicPartition.getTopic(), topicPartition.getPartition());
consumer = createConsumer(
mConfig.getKafkaSeedBrokerHost(),
mConfig.getKafkaSeedBrokerPort(),
"leaderLookup");
List<String> topics = new ArrayList<String>();
topics.add(topicPartition.getTopic());
TopicMetadataRequest request = new TopicMetadataRequest(topics);
TopicMetadataResponse response = consumer.send(request);
List<TopicMetadata> metaData = response.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
if (part.partitionId() == topicPartition.getPartition()) {
return HostAndPort.fromParts(part.leader().host(), part.leader().port());
}
}
}
} finally {
if (consumer != null) {
consumer.close();
}
}
return null;
}
@Override
protected List<String> buildCommandList(ScribeConsumerConfig c) {
LOG.info("buildCommandList. ");
List<String> r = new ArrayList<String>();
for ( Map.Entry<String, Map<Integer, PartitionMetadata> > entry : topicMetadataMap.entrySet() ) {
String t = entry.getKey();
LOG.info("topic : " + t);
for ( Map.Entry<Integer, PartitionMetadata> innerEntry: entry.getValue().entrySet()) {
Integer partition = innerEntry.getKey();
LOG.info("partition: " + partition);
StringBuilder sb = new StringBuilder();
sb.append(Environment.JAVA_HOME.$()).append("/bin/java").append(" ");
sb.append("-cp scribeconsumer.jar " + com.neverwinterdp.scribengin.scribeconsumer.ScribeConsumer.class.getName())
.append(" --"+Constants.OPT_BROKER_LIST+" "+getBrokerListStr())
.append(" --"+Constants.OPT_CHECK_POINT_INTERVAL+" "+Long.toString(c.commitCheckPointInterval))
.append(" --"+Constants.OPT_COMMIT_PATH_PREFIX+" "+c.COMMIT_PATH_PREFIX)
.append(" --"+Constants.OPT_PARTITION+" "+ Integer.toString(partition))
.append(" --"+Constants.OPT_PRE_COMMIT_PATH_PREFIX+" "+ c.PRE_COMMIT_PATH_PREFIX)
.append(" --"+Constants.OPT_KAFKA_TOPIC+" "+t)
;
if(c.hdfsPath != null){
sb.append(" --"+Constants.OPT_HDFS_PATH+" "+c.hdfsPath);
}
if(c.cleanStart){
sb.append(" --"+Constants.OPT_CLEAN_START);
}
r.add(sb.toString());
}
}
LOG.info("Command list "+ r);
return r;
}
private void getMetaData(String topic) {
LOG.info("inside getMetaData"); //xxx
LOG.info("seedBrokerList" + this.brokerList); //xxx
for (HostPort seed: brokerList) {
SimpleConsumer consumer = new SimpleConsumer(
seed.getHost(),
seed.getPort(),
10000, // timeout
64*1024, // bufferSize
"metaLookup" // clientId
);
List <String> topicList = Collections.singletonList(topic);
TopicMetadataRequest req = new TopicMetadataRequest(topicList);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<TopicMetadata> metaDataList = resp.topicsMetadata();
LOG.info("metaDataList: " + metaDataList); //xxxx
for (TopicMetadata m: metaDataList) {
LOG.info("inside the metadatalist loop"); //xxx
LOG.info("m partitionsMetadata: " + m.partitionsMetadata()); //xxx
for (PartitionMetadata part : m.partitionsMetadata()) {
LOG.info("inside the partitionmetadata loop"); //xxx
storeMetadata(topic, part);
}
}
}
}
private void storeMetadata(String topic, PartitionMetadata p) {
Integer id = new Integer(p.partitionId());
Map<Integer, PartitionMetadata> m;
if (topicMetadataMap.containsKey(id)) {
LOG.info("already crreated a partitionMap. Just retrieve it."); //xxx
m = topicMetadataMap.get(topic);
} else {
LOG.info("making a new partitionMap"); //xxx
m = new HashMap<Integer, PartitionMetadata>();
topicMetadataMap.put(topic, m);
}
m.put(id, p);
}
public boolean connectToTopic() {
boolean r = true;
PartitionMetadata metadata = findLeader(brokerList, topic, partition);
if (metadata == null) {
r = false;
LOG.error("Can't find meta data for Topic: " + topic + " partition: " + partition
+ ". In fact, meta is null.");
}
else if (metadata.leader() == null) {
r = false;
LOG.error("Can't find meta data for Topic: " + topic + " partition: " + partition);
}
if (r) {
storeReplicaBrokers(metadata);
consumer = new SimpleConsumer(
metadata.leader().host(),
metadata.leader().port(),
10000, // timeout
64 * 1024, // buffersize
getClientName());
//scheduleCommitTimer();
}
return r;
}
void init(StorageDescriptor descriptor) throws Exception {
this.descriptor = descriptor;
KafkaTool kafkaTool = new KafkaTool(descriptor.attribute("name"), descriptor.attribute("zk.connect"));
kafkaTool.connect();
TopicMetadata topicMetdadata = kafkaTool.findTopicMetadata(descriptor.attribute("topic"));
List<PartitionMetadata> partitionMetadatas = topicMetdadata.partitionsMetadata();
for(int i = 0; i < partitionMetadatas.size(); i++) {
PartitionMetadata partitionMetadata = partitionMetadatas.get(i);
KafkaSourceStream sourceStream = new KafkaSourceStream(descriptor, partitionMetadata);
sourceStreams.put(sourceStream.getId(), sourceStream);
}
kafkaTool.close();
}
public KafkaPartitionReader(String name, String zkConnect, String topic, PartitionMetadata partitionMetadata) {
this.name = name;
this.zkConnect = zkConnect;
this.topic = topic;
this.partitionMetadata = partitionMetadata;
reconnect() ;
currentOffset = getLastCommitOffset();
}