类org.apache.kafka.common.record.InvalidRecordException源码实例Demo

下面列出了怎么用org.apache.kafka.common.record.InvalidRecordException的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: ad   文件: CustomPartitioner.java
/**
 * 决定消息被写入哪个分区
 * @param topic topic
 * @param key key
 * @param keyBytes key serialize byte
 * @param value value
 * @param valueBytes value serialize byte
 * @param cluster kakfa cluster
 * @return
 */
@Override
public int partition(String topic, Object key, byte[] keyBytes,
                     Object value, byte[] valueBytes, Cluster cluster) {
    // 所有分区信息
    List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
    int partitionCount = partitionInfos.size();
    // 要求必须存在 key,如果key 是"name" 就分配到最后一个分区, 其他key hash取模
    if (keyBytes == null || !key.getClass().equals(String.class)) {
        throw new InvalidRecordException("kafka message must have a String key");
    }
    if (partitionCount == 1 || StringUtils.endsWithIgnoreCase("name", key.toString())) {
        return partitionCount - 1;
    }
    return Math.abs(Utils.murmur2(keyBytes)) % (partitionCount - 1);
}
 
源代码2 项目: feeyo-redisproxy   文件: Record.java
private static void readHeaders(ByteBuffer buffer, int numHeaders) {
	for (int i = 0; i < numHeaders; i++) {
		int headerKeySize = ByteUtils.readVarint(buffer);
		if (headerKeySize < 0)
			throw new InvalidRecordException("Invalid negative header key size " + headerKeySize);

		Utils.utf8(buffer, headerKeySize);
		buffer.position(buffer.position() + headerKeySize);

		int headerValueSize = ByteUtils.readVarint(buffer);
		if (headerValueSize >= 0) {
			buffer.position(buffer.position() + headerValueSize);
		}
	}
}
 
/**
 * Simple partitioning on the message key
 * Odd keys to partition 0
 * Even keys to partition 1
 *
 * @param topic topic Name
 * @param key Message Key
 * @param keyBytes Key Bytes
 * @param value Message Value
 * @param valueBytes Value Bytes
 * @param cluster Cluster Object
 * @return Partition Id
 */
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    if ((keyBytes == null) || (!(key instanceof Integer)))
        throw new InvalidRecordException("Topic Key must have a valid Integer value.");

    if (cluster.partitionsForTopic(topic).size() != 2)
        throw new InvalidTopicException("Topic must have exactly two partitions");

    return (Integer) key % 2;
}
 
 类所在包
 类方法
 同包方法