下面列出了com.google.protobuf.DynamicMessage#Builder ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public DynamicMessage encode(Row row) {
DynamicMessage.Builder builder = DynamicMessage.newBuilder(messageType);
for (int i = 0; i < fields.length; i++) {
if (row.value(i).isNull()) {
if (nullFields != null) {
FieldDescriptor nullField = nullFields[i];
if (nullField != null) {
builder.setField(nullField, Boolean.TRUE);
}
}
} else {
conversions[i].setValue(builder, fields[i], row.value(i));
}
}
return builder.build();
}
@Test
public void concatenateMessageWithRepeatedFieldWithEmpty() throws Descriptors.DescriptorValidationException {
long kafkaOffset = 0L;
DynamicMessage.Builder messageWithRepeatedFieldBuilder = createMessageWithRepeatedField();
Descriptors.Descriptor msgDesc = messageWithRepeatedFieldBuilder.getDescriptorForType();
Message msg = messageWithRepeatedFieldBuilder
.addRepeatedField(msgDesc.findFieldByName("repeated_field"), 1)
.addRepeatedField(msgDesc.findFieldByName("repeated_field"), 2)
.build();
Map<String, Object> expectedValues = new HashMap<>();
ArrayList<Object> ints = new ArrayList<>(Arrays.asList(1, 2));
expectedValues.put("repeated_field", ints);
expectedValues.put(ProtoConcatenator.TIMESTAMP_FIELD_NAME, 0L);
expectedValues.put(ProtoConcatenator.KAFKA_OFFSET, kafkaOffset);
testAllOutTypesWith(0L, Arrays.asList(msg, createEmptyMessage()), expectedValues, kafkaOffset);
}
private static void handleUnknownFields(
Record record,
String fieldPath,
DynamicMessage.Builder builder
) throws IOException {
String path = fieldPath.isEmpty() ? FORWARD_SLASH : fieldPath;
String attribute = record.getHeader().getAttribute(ProtobufTypeUtil.PROTOBUF_UNKNOWN_FIELDS_PREFIX + path);
if (attribute != null) {
UnknownFieldSet.Builder unknownFieldBuilder = UnknownFieldSet.newBuilder();
unknownFieldBuilder.mergeDelimitedFrom(
new ByteArrayInputStream(
org.apache.commons.codec.binary.Base64.decodeBase64(attribute.getBytes(StandardCharsets.UTF_8))
)
);
UnknownFieldSet unknownFieldSet = unknownFieldBuilder.build();
builder.setUnknownFields(unknownFieldSet);
}
}
protected static DynamicMessage buildRecord3Dynamic(@Nonnull String name, @Nullable Integer intValue, @Nullable String stringValue) {
final Descriptors.Descriptor descriptor = TestRecordsNulls3Proto.MyNullRecord.getDescriptor();
DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
builder.setField(descriptor.findFieldByNumber(TestRecordsNulls3Proto.MyNullRecord.NAME_FIELD_NUMBER), name);
if (intValue != null) {
builder.setField(descriptor.findFieldByNumber(TestRecordsNulls3Proto.MyNullRecord.INT_VALUE_FIELD_NUMBER), intValue);
}
if (stringValue != null) {
builder.setField(descriptor.findFieldByNumber(TestRecordsNulls3Proto.MyNullRecord.STRING_VALUE_FIELD_NUMBER), stringValue);
}
return builder.build();
}
/** Parses all the messages and returns them in a list. */
public ImmutableList<DynamicMessage> read() {
ImmutableList.Builder<DynamicMessage> resultBuilder = ImmutableList.builder();
try {
String line;
boolean wasLastLineEmpty = false;
while (true) {
line = bufferedReader.readLine();
// Two consecutive empty lines mark the end of the stream.
if (Strings.isNullOrEmpty(line)) {
if (wasLastLineEmpty) {
return resultBuilder.build();
}
wasLastLineEmpty = true;
continue;
} else {
wasLastLineEmpty = false;
}
// Read the next full message.
StringBuilder stringBuilder = new StringBuilder();
while (!Strings.isNullOrEmpty(line)) {
stringBuilder.append(line);
line = bufferedReader.readLine();
}
wasLastLineEmpty = true;
DynamicMessage.Builder nextMessage = DynamicMessage.newBuilder(descriptor);
jsonParser.merge(stringBuilder.toString(), nextMessage);
// Clean up and prepare for next message.
resultBuilder.add(nextMessage.build());
}
} catch (Exception e) {
throw new IllegalArgumentException("Unable to read messages from: " + source, e);
}
}
private static void setRepeatedField(DynamicMessage.Builder builder, Descriptors.FieldDescriptor dstFieldDescriptor,
Map.Entry<Descriptors.FieldDescriptor, Object> entry) {
@SuppressWarnings("unchecked")
final Collection<Object> values = (Collection<Object>) entry.getValue();
for (Object value : values) {
builder.addRepeatedField(dstFieldDescriptor, value);
}
}
private boolean dumpPlainField(DynamicMessage.Builder builder, IdentifyDescriptor ident,
DataDstWriterNode.DataDstOneofDescriptor field, boolean isTopLevel, String input) throws ConvException {
if (field == null) {
return false;
}
Object[] res = parsePlainDataOneof(input, ident, field);
if (null == res) {
return false;
}
if (res.length < 1) {
return false;
}
DataDstWriterNode.DataDstFieldDescriptor sub_field = (DataDstWriterNode.DataDstFieldDescriptor) res[0];
if (sub_field == null) {
return false;
}
if (res.length == 1) {
dumpDefault(builder, (Descriptors.FieldDescriptor) sub_field.getRawDescriptor());
return true;
}
// 非顶层,不用验证类型
return dumpPlainField(builder, null, sub_field, false, (String) res[1]);
}
private static void handleNonRepeatedField(
Record record,
Map<String, Field> valueAsMap,
String fieldPath,
Map<String, Set<Descriptors.FieldDescriptor>> messageTypeToExtensionMap,
Map<String, Object> defaultValueMap,
Descriptors.Descriptor desc,
Descriptors.FieldDescriptor f,
DynamicMessage.Builder builder
) throws DataGeneratorException {
Object val;
String keyName = f.getName();
if (valueAsMap.containsKey(keyName)) {
val = getValue(
f,
valueAsMap.get(keyName),
record,
fieldPath + FORWARD_SLASH + f.getName(),
messageTypeToExtensionMap,
defaultValueMap
);
} else {
// record does not contain field, look up default value
String key = desc.getFullName() + "." + f.getName();
if (!defaultValueMap.containsKey(key) && !f.isOptional()) {
throw new DataGeneratorException(
Errors.PROTOBUF_04,
record.getHeader().getSourceId(),
key
);
}
val = defaultValueMap.get(key);
}
if (val != null) {
builder.setField(f, val);
}
}
@Override
public T apply(Row input) {
DynamicMessage.Builder builder = context.invokeNewBuilder();
Iterator values = input.getValues().iterator();
Iterator<Convert> convertIterator = converters.iterator();
for (int i = 0; i < input.getValues().size(); i++) {
Convert convert = convertIterator.next();
Object value = values.next();
convert.setOnProtoMessage(builder, value);
}
return (T) builder.build();
}
@Override
public DynamicMessage.Builder invokeNewBuilder() {
if (descriptor == null) {
descriptor = domain.getDescriptor(messageName);
}
return DynamicMessage.newBuilder(descriptor);
}
@Override
public DynamicMessage encode(Row row) {
Integer tableId = row.rowType().table().getTableId();
DynamicMessage inside =
tableConvertersByTableId.get(tableId).encode(row);
DynamicMessage.Builder builder = DynamicMessage.newBuilder(messageType);
builder.setField(groupFieldsByTabelId.get(tableId), inside);
return builder.build();
}
private static DynamicMessage.Builder createBodyBuilder() throws Descriptors.DescriptorValidationException {
DynamicSchema.Builder schemaBuilder = DynamicSchema.newBuilder();
MessageDefinition msgDef = MessageDefinition.newBuilder("Body")
.addField("required", "int32", "bodyInt", 3)
.addField("optional", "string", "bodyString", 4)
.build();
schemaBuilder.addMessageDefinition(msgDef);
DynamicSchema schema = schemaBuilder.build();
return schema.newMessageBuilder("Body");
}
private static void handleMapField(
Record record,
Field field,
String fieldPath,
Map<String, Set<Descriptors.FieldDescriptor>> messageTypeToExtensionMap,
Map<String, Object> defaultValueMap,
Descriptors.FieldDescriptor fieldDescriptor,
DynamicMessage.Builder builder
) throws DataGeneratorException {
Descriptors.Descriptor mapEntryDescriptor = fieldDescriptor.getMessageType();
// MapEntry contains key and value fields
Map<String, Field> sdcMapField = field.getValueAsMap();
for (Map.Entry<String, Field> entry : sdcMapField.entrySet()) {
builder.addRepeatedField(fieldDescriptor, DynamicMessage.newBuilder(mapEntryDescriptor)
.setField(mapEntryDescriptor.findFieldByName(KEY), entry.getKey())
.setField(
mapEntryDescriptor.findFieldByName(VALUE),
getValue(
mapEntryDescriptor.findFieldByName(VALUE),
entry.getValue(),
record,
fieldPath + FORWARD_SLASH + entry.getKey(),
messageTypeToExtensionMap,
defaultValueMap
)
)
.build()
);
}
}
private static Descriptors.Descriptor descriptorForTypeWithHeader(Descriptors.Descriptor classDescriptor)
throws Descriptors.DescriptorValidationException {
final Collection<Descriptors.FieldDescriptor> allFields = new ArrayList<>();
allFields.addAll(EventHeaderProtos.Header.getDescriptor().getFields());
allFields.addAll(classDescriptor.getFields());
DynamicMessage.Builder builder = ProtoConcatenator.buildMessageBuilder("OffsetResetter", allFields);
return builder.getDescriptorForType();
}
protected static DynamicMessage buildRecord2Dynamic(@Nonnull String name, @Nullable Integer intValue, @Nullable String stringValue) {
final Descriptors.Descriptor descriptor = TestRecordsNulls2Proto.MyNullRecord.getDescriptor();
DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
builder.setField(descriptor.findFieldByNumber(TestRecordsNulls3Proto.MyNullRecord.NAME_FIELD_NUMBER), name);
if (intValue != null) {
builder.setField(descriptor.findFieldByNumber(TestRecordsNulls3Proto.MyNullRecord.INT_VALUE_FIELD_NUMBER), intValue);
}
if (stringValue != null) {
builder.setField(descriptor.findFieldByNumber(TestRecordsNulls3Proto.MyNullRecord.STRING_VALUE_FIELD_NUMBER), stringValue);
}
return builder.build();
}
/**
* Build a dynamic message builder based on a list of fields. Fields will be numbered in the order they were
* provided, starting from 1
*
* @param msgName Name of the output message
* @param fields Fields to be added to the output message definition
* @return A builder able to fill a message made of all input fields
* @throws Descriptors.DescriptorValidationException In case of a bug (shouldn't happen)
*/
public static DynamicMessage.Builder buildMessageBuilder(String msgName, Collection<Descriptors.FieldDescriptor> fields)
throws Descriptors.DescriptorValidationException {
final MessageDefinition.Builder msgDef = MessageDefinition.newBuilder(msgName);
//Add Enum definitions before adding fields
fields
.stream()
.filter(fd -> Descriptors.FieldDescriptor.Type.ENUM.equals(fd.getType()))
.map(Descriptors.FieldDescriptor::getEnumType)
.distinct()
.forEach(enumDescriptor -> {
EnumDefinition.Builder enumDefinitionBuilder = EnumDefinition.newBuilder(enumDescriptor.getName());
enumDescriptor.getValues().forEach(desc -> enumDefinitionBuilder.addValue(desc.getName(), desc.getNumber()));
msgDef.addEnumDefinition(enumDefinitionBuilder.build());
});
int currentIndex = 1;
for (Descriptors.FieldDescriptor fieldDescriptor : fields) {
String label;
if (fieldDescriptor.isRepeated()) {
label = "repeated";
} else {
label = (fieldDescriptor.isRequired()) ? "required" : "optional";
}
String typeName;
switch (fieldDescriptor.getType()) {
case ENUM:
typeName = fieldDescriptor.getEnumType().getName();
break;
default:
typeName = fieldDescriptor.getType().toString().toLowerCase();
}
msgDef.addField(label, typeName, fieldDescriptor.getName(), currentIndex++);
}
msgDef.addField("optional", "int64", TIMESTAMP_FIELD_NAME, currentIndex++);
msgDef.addField("optional", "int64", KAFKA_OFFSET, currentIndex++);
final DynamicSchema.Builder schemaBuilder = DynamicSchema.newBuilder();
schemaBuilder.addMessageDefinition(msgDef.build());
final DynamicSchema schema = schemaBuilder.build();
return schema.newMessageBuilder(msgName);
}
/**
* Serializes a field path in a record to a protobuf message using the specified descriptor.
*
* @param record Record with the field to serialize
* @param field The field to serialize
* @param fieldPath The field path of the specified field
* @param desc Protobuf descriptor
* @param messageTypeToExtensionMap Protobuf extension map
* @param defaultValueMap Protobuf default field values
* @return serialized message
* @throws DataGeneratorException
*/
private static DynamicMessage sdcFieldToProtobufMsg(
Record record,
Field field,
String fieldPath,
Descriptors.Descriptor desc,
Map<String, Set<Descriptors.FieldDescriptor>> messageTypeToExtensionMap,
Map<String, Object> defaultValueMap
) throws DataGeneratorException {
if (field == null) {
return null;
}
// compute all fields to look for including extensions
DynamicMessage.Builder builder = DynamicMessage.newBuilder(desc);
List<Descriptors.FieldDescriptor> fields = new ArrayList<>();
fields.addAll(desc.getFields());
if (messageTypeToExtensionMap.containsKey(desc.getFullName())) {
fields.addAll(messageTypeToExtensionMap.get(desc.getFullName()));
}
// root field is always a Map in a record representing protobuf data
Map<String, Field> valueAsMap = field.getValueAsMap();
for (Descriptors.FieldDescriptor f : fields) {
Field mapField = valueAsMap.get(f.getName());
// Repeated field
if (f.isMapField()) {
handleMapField(record, mapField, fieldPath, messageTypeToExtensionMap, defaultValueMap, f, builder);
} else if (f.isRepeated()) {
if (mapField != null) {
handleRepeatedField(
record,
mapField,
fieldPath,
messageTypeToExtensionMap,
defaultValueMap,
f,
builder
);
}
} else {
// non repeated field
handleNonRepeatedField(
record,
valueAsMap,
fieldPath,
messageTypeToExtensionMap,
defaultValueMap,
desc,
f,
builder
);
}
}
// if record has unknown fields for this field path, handle it
try {
handleUnknownFields(record, fieldPath, builder);
} catch (IOException e) {
throw new DataGeneratorException(Errors.PROTOBUF_05, e.toString(), e);
}
return builder.build();
}
private void dumpDefault(DynamicMessage.Builder builder, Descriptors.FieldDescriptor fd) {
switch (fd.getType()) {
case DOUBLE:
dumpValue(builder, fd, Double.valueOf(0.0));
break;
case FLOAT:
dumpValue(builder, fd, Float.valueOf(0));
break;
case INT64:
case UINT64:
case INT32:
case FIXED64:
case FIXED32:
case UINT32:
case SFIXED32:
case SFIXED64:
case SINT32:
case SINT64:
dumpValue(builder, fd, 0);
break;
case ENUM:
dumpValue(builder, fd, fd.getEnumType().findValueByNumber(0));
break;
case BOOL:
dumpValue(builder, fd, false);
break;
case STRING:
dumpValue(builder, fd, "");
break;
case GROUP:
dumpValue(builder, fd, new byte[0]);
break;
case MESSAGE: {
DynamicMessage.Builder subnode = DynamicMessage.newBuilder(fd.getMessageType());
// fill required
for (Descriptors.FieldDescriptor sub_fd : fd.getMessageType().getFields()) {
if (checkFieldIsRequired(sub_fd) || ProgramOptions.getInstance().enbleEmptyList) {
dumpDefault(subnode, sub_fd);
}
}
dumpValue(builder, fd, subnode.build());
break;
}
case BYTES:
dumpValue(builder, fd, new byte[0]);
break;
}
}
public DynamicMessage.Builder invokeNewBuilder() {
throw new IllegalStateException("Should not be calling invokeNewBuilder");
}
/**
* Creates a new dynamic message builder for the given message type
*
* @param msgTypeName the message type name
* @return the message builder (null if not found)
*/
public DynamicMessage.Builder newMessageBuilder(String msgTypeName) {
Descriptor msgType = getMessageDescriptor(msgTypeName);
if (msgType == null) return null;
return DynamicMessage.newBuilder(msgType);
}