下面列出了怎么用org.apache.kafka.common.record.InvalidRecordException的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* 决定消息被写入哪个分区
* @param topic topic
* @param key key
* @param keyBytes key serialize byte
* @param value value
* @param valueBytes value serialize byte
* @param cluster kakfa cluster
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
// 所有分区信息
List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
int partitionCount = partitionInfos.size();
// 要求必须存在 key,如果key 是"name" 就分配到最后一个分区, 其他key hash取模
if (keyBytes == null || !key.getClass().equals(String.class)) {
throw new InvalidRecordException("kafka message must have a String key");
}
if (partitionCount == 1 || StringUtils.endsWithIgnoreCase("name", key.toString())) {
return partitionCount - 1;
}
return Math.abs(Utils.murmur2(keyBytes)) % (partitionCount - 1);
}
private static void readHeaders(ByteBuffer buffer, int numHeaders) {
for (int i = 0; i < numHeaders; i++) {
int headerKeySize = ByteUtils.readVarint(buffer);
if (headerKeySize < 0)
throw new InvalidRecordException("Invalid negative header key size " + headerKeySize);
Utils.utf8(buffer, headerKeySize);
buffer.position(buffer.position() + headerKeySize);
int headerValueSize = ByteUtils.readVarint(buffer);
if (headerValueSize >= 0) {
buffer.position(buffer.position() + headerValueSize);
}
}
}
/**
* Simple partitioning on the message key
* Odd keys to partition 0
* Even keys to partition 1
*
* @param topic topic Name
* @param key Message Key
* @param keyBytes Key Bytes
* @param value Message Value
* @param valueBytes Value Bytes
* @param cluster Cluster Object
* @return Partition Id
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if ((keyBytes == null) || (!(key instanceof Integer)))
throw new InvalidRecordException("Topic Key must have a valid Integer value.");
if (cluster.partitionsForTopic(topic).size() != 2)
throw new InvalidTopicException("Topic must have exactly two partitions");
return (Integer) key % 2;
}