下面列出了怎么用org.apache.kafka.common.protocol.types.Schema的API类实例代码及写法,或者点击链接到github查看源代码。
private static Schema schemaForKey(int version) {
Schema schema = MESSAGE_TYPE_SCHEMAS.get(version);
if (null == schema) {
throw new KafkaException("Unknown offset schema version " + version);
}
return schema;
}
private static Schema schemaForOffset(int version) {
Schema schema = OFFSET_VALUE_SCHEMAS.get(version);
if (null == schema) {
throw new KafkaException("Unknown offset schema version " + version);
}
return schema;
}
private static Schema schemaForGroup(int version) {
Schema schema = GROUP_VALUE_SCHEMAS.get(version);
if (null == schema) {
throw new KafkaException("Unknown group metadata version " + version);
}
return schema;
}
private static Schema schemaForKey(int version) {
if (MESSAGE_TYPE_SCHEMAS.containsKey(version)) {
return MESSAGE_TYPE_SCHEMAS.get(version);
} else {
LOG.error("Unknown message key schema version " + version);
return null;
}
}
private static Schema schemaForOffsetValue(int version) {
if (OFFSET_VALUE_SCHEMAS.containsKey(version)) {
return OFFSET_VALUE_SCHEMAS.get(version);
} else {
LOG.error("Unknown offset schema version " + version);
return null;
}
}
private static Schema schemaForGroupValue(int version) {
if (GROUP_VALUE_SCHEMAS.containsKey(version)) {
return GROUP_VALUE_SCHEMAS.get(version);
} else {
LOG.error("Unknown group metadata version " + version);
return null;
}
}
/** Analysis of struct data structure in metadata in Kafka. */
private static MessageValueStructAndVersionInfo readMessageValueStruct(ByteBuffer buffer) {
MessageValueStructAndVersionInfo mvs = new MessageValueStructAndVersionInfo();
if (buffer == null) {
mvs.setValue(null);
mvs.setVersion(Short.valueOf("-1"));
} else {
short version = buffer.getShort();
Schema valueSchema = schemaFor(version).getValueSchema();
Struct value = (Struct) valueSchema.read(buffer);
mvs.setValue(value);
mvs.setVersion(version);
}
return mvs;
}
/** Analysis of struct data structure in metadata in Kafka. */
private static MessageValueStructAndVersionInfo readMessageValueStruct(ByteBuffer buffer) {
MessageValueStructAndVersionInfo mvs = new MessageValueStructAndVersionInfo();
if (buffer == null) {
mvs.setValue(null);
mvs.setVersion(Short.valueOf("-1"));
} else {
short version = buffer.getShort();
Schema valueSchema = schemaFor(version).getValueSchema();
Struct value = (Struct) valueSchema.read(buffer);
mvs.setValue(value);
mvs.setVersion(version);
}
return mvs;
}
/** Analysis of Kafka data in topic in buffer. */
private static GroupTopicPartition readMessageKey(ByteBuffer buffer) {
short version = buffer.getShort();
Schema keySchema = schemaFor(version).getKeySchema();
Struct key = (Struct) keySchema.read(buffer);
String group = key.getString(KEY_GROUP_FIELD);
String topic = key.getString(KEY_TOPIC_FIELD);
int partition = key.getInt(KEY_PARTITION_FIELD);
return new GroupTopicPartition(group, new TopicPartition(topic, partition));
}
/** Analysis of struct data structure in metadata in Kafka. */
private static MessageValueStructAndVersionInfo readMessageValueStruct(ByteBuffer buffer) {
MessageValueStructAndVersionInfo mvs = new MessageValueStructAndVersionInfo();
if (buffer == null) {
mvs.setValue(null);
mvs.setVersion(Short.valueOf("-1"));
} else {
short version = buffer.getShort();
Schema valueSchema = schemaFor(version).getValueSchema();
Struct value = (Struct) valueSchema.read(buffer);
mvs.setValue(value);
mvs.setVersion(version);
}
return mvs;
}
static GroupMetadata readGroupMessageValue(String groupId,
ByteBuffer buffer) {
if (null == buffer) { // tombstone
return null;
}
short version = buffer.getShort();
Schema valueSchema = schemaForGroup(version);
Struct value = valueSchema.read(buffer);
if (version == 0 || version == 1) {
int generationId = value.getInt(GENERATION_KEY);
String protocolType = value.getString(PROTOCOL_TYPE_KEY);
String protocol = value.getString(PROTOCOL_KEY);
String leaderId = value.getString(LEADER_KEY);
Object[] memberMetadataArray = value.getArray(MEMBERS_KEY);
GroupState initialState;
if (memberMetadataArray.length == 0) {
initialState = GroupState.Empty;
} else {
initialState = GroupState.Stable;
}
List<MemberMetadata> members = Lists.newArrayList(memberMetadataArray)
.stream()
.map(memberMetadataObj -> {
Struct memberMetadata = (Struct) memberMetadataObj;
String memberId = memberMetadata.getString(MEMBER_ID_KEY);
String clientId = memberMetadata.getString(CLIENT_ID_KEY);
String clientHost = memberMetadata.getString(CLIENT_HOST_KEY);
int sessionTimeout = memberMetadata.getInt(SESSION_TIMEOUT_KEY);
int rebalanceTimeout;
if (version == 0) {
rebalanceTimeout = sessionTimeout;
} else {
rebalanceTimeout = memberMetadata.getInt(REBALANCE_TIMEOUT_KEY);
}
ByteBuffer subscription = memberMetadata.getBytes(SUBSCRIPTION_KEY);
byte[] subscriptionData = new byte[subscription.remaining()];
subscription.get(subscriptionData);
Map<String, byte[]> protocols = new HashMap<>();
protocols.put(protocol, subscriptionData);
return new MemberMetadata(
memberId,
groupId,
clientId,
clientHost,
rebalanceTimeout,
sessionTimeout,
protocolType,
protocols
);
}).collect(Collectors.toList());
return GroupMetadata.loadGroup(
groupId,
initialState,
generationId,
protocolType,
protocol,
leaderId,
members
);
} else {
throw new IllegalStateException("Unknown group metadata message version");
}
}
public Schema getKeySchema() {
return keySchema;
}
public void setKeySchema(Schema keySchema) {
this.keySchema = keySchema;
}
public Schema getValueSchema() {
return valueSchema;
}
public void setValueSchema(Schema valueSchema) {
this.valueSchema = valueSchema;
}
public Schema getKeySchema() {
return keySchema;
}
public void setKeySchema(Schema keySchema) {
this.keySchema = keySchema;
}
public Schema getValueSchema() {
return valueSchema;
}
public void setValueSchema(Schema valueSchema) {
this.valueSchema = valueSchema;
}