下面列出了怎么用com.google.protobuf.DynamicMessage的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void deserializeUnionWithUnknownFields() throws Descriptors.DescriptorValidationException {
// Add a field to the union descriptor message so as
// to make it possible to read an unknown field.
final Descriptors.Descriptor biggerUnionDescriptor = addFieldToUnionDescriptor("dummy_field", DescriptorProtos.FieldDescriptorProto.Type.TYPE_BOOL);
final DynamicMessage message1 = DynamicMessage.newBuilder(biggerUnionDescriptor)
.setField(biggerUnionDescriptor.findFieldByName("_MySimpleRecord"), TestRecords1Proto.MySimpleRecord.newBuilder().setRecNo(1066L).build())
.setField(biggerUnionDescriptor.findFieldByName("dummy_field"), Boolean.TRUE)
.build();
RecordSerializationException ex1 = assertThrows(RecordSerializationException.class,
() -> serializer.deserializeUnion(metaData.getUnionDescriptor(), Tuple.from(1066L), message1.toByteArray(), metaData.getVersion()));
assertThat(ex1.getMessage(), containsString("Could not deserialize union message"));
assertThat(ex1.getMessage(), containsString("there are unknown fields"));
assertThat((Collection<?>)ex1.getLogInfo().get("unknownFields"), not(empty()));
// Remove a field from the union descriptor and set it.
final Descriptors.Descriptor smallerUnionDescriptor = removeFieldFromUnionDescriptor("_MySimpleRecord");
final Message message2 = TestRecords1Proto.RecordTypeUnion.newBuilder()
.setMySimpleRecord(TestRecords1Proto.MySimpleRecord.newBuilder().setRecNo(1066L).build())
.build();
RecordSerializationException ex2 = assertThrows(RecordSerializationException.class,
() -> serializer.deserializeUnion(smallerUnionDescriptor, Tuple.from(1066L), message2.toByteArray(), metaData.getVersion()));
assertThat(ex2.getMessage(), containsString("Could not deserialize union message"));
assertThat(ex2.getMessage(), containsString("there are unknown fields"));
assertThat((Collection<?>)ex2.getLogInfo().get("unknownFields"), not(empty()));
}
private Map<String, Object> toOptions(
Map<Descriptors.FieldDescriptor, Object> optionFields, Boolean useFullName) {
if (optionFields.size() == 0) {
return null;
}
Map<String, Object> options = new HashMap<>();
optionFields.forEach(
(k, v) -> {
if (v.getClass().getName().equals("com.google.protobuf.DynamicMessage")) {
options.put(k.getFullName(), toOptions((((DynamicMessage) v).getAllFields()), false));
} else {
final String fieldName = useFullName ? k.getFullName() : k.getName();
options.put(fieldName, v);
}
});
return options;
}
private void addRepeatedFields(Map<String, Object> row, String key, Object value, List<Object> fieldValue) {
if (fieldValue.isEmpty()) {
return;
}
List<Object> repeatedNestedFields = new ArrayList<>();
String columnName = null;
for (Object f : fieldValue) {
if (f instanceof DynamicMessage) {
ColumnMapping nestedMappings = (ColumnMapping) value;
repeatedNestedFields.add(getMappings((DynamicMessage) f, nestedMappings));
columnName = getNestedColumnName(nestedMappings);
} else {
repeatedNestedFields.add(f);
columnName = (String) value;
}
}
row.put(columnName, repeatedNestedFields);
}
@Override
public Row expandRow (FDBStore store, Session session,
FDBStoreData storeData, Schema schema) {
ensureRowConverter();
DynamicMessage msg;
try {
msg = DynamicMessage.parseFrom(rowConverter.getMessageType(), storeData.rawValue);
} catch (InvalidProtocolBufferException ex) {
ProtobufReadException nex = new ProtobufReadException(rowDataConverter.getMessageType().getName(), ex.getMessage());
nex.initCause(ex);
throw nex;
}
Row row = rowConverter.decode(msg);
row = overlayBlobData(row.rowType(), row, store, session);
return row;
}
@Override
void setOnProtoMessage(Message.Builder message, Map map) {
if (map != null) {
FieldDescriptor fieldDescriptor = getFieldDescriptor(message);
List<Message> messageMap = new ArrayList<>();
map.forEach(
(k, v) -> {
DynamicMessage.Builder builder =
DynamicMessage.newBuilder(fieldDescriptor.getMessageType());
FieldDescriptor keyFieldDescriptor =
fieldDescriptor.getMessageType().findFieldByName("key");
builder.setField(
keyFieldDescriptor, this.key.convertToProtoValue(keyFieldDescriptor, k));
FieldDescriptor valueFieldDescriptor =
fieldDescriptor.getMessageType().findFieldByName("value");
builder.setField(
valueFieldDescriptor, value.convertToProtoValue(valueFieldDescriptor, v));
messageMap.add(builder.build());
});
message.setField(fieldDescriptor, messageMap);
}
}
@Override
public void write(Record record) throws IOException, DataGeneratorException {
if (closed) {
throw new IOException("generator has been closed");
}
DynamicMessage message = ProtobufTypeUtil.sdcFieldToProtobufMsg(
record,
descriptor,
messageTypeToExtensionMap,
defaultValueMap
);
if (isDelimited) {
message.writeDelimitedTo(outputStream);
} else {
message.writeTo(outputStream);
}
}
@Test
public void testSdcToProtobufExtensions() throws Exception {
List<Record> protobufRecords = ProtobufTestUtil.getProtobufRecords();
ByteArrayOutputStream bOut = new ByteArrayOutputStream();
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(bOut);
for (int i = 0; i < protobufRecords.size(); i++) {
DynamicMessage dynamicMessage = ProtobufTypeUtil.sdcFieldToProtobufMsg(
protobufRecords.get(i),
md,
typeToExtensionMap,
defaultValueMap
);
dynamicMessage.writeDelimitedTo(bufferedOutputStream);
}
bufferedOutputStream.flush();
bufferedOutputStream.close();
ProtobufTestUtil.checkProtobufDataExtensions(bOut.toByteArray());
}
/**
* Handle all the logic leading to the decoding of a Protobuf-encoded binary given a schema file path.
* @param schema Schema used to decode the binary data
* @param messageType Type of Protobuf Message
* @param encodedData Encoded data source
* @return A JSON representation of the data, contained in a Java String
* @throws InvalidProtocolBufferException Thrown when an error occurs during the encoding of the decoded data into JSON
* @throws Descriptors.DescriptorValidationException Thrown when the schema is invalid
* @throws UnknownMessageTypeException Thrown when the given message type is not contained in the schema
* @throws MessageDecodingException Thrown when an error occurs during the binary decoding
* @throws SchemaLoadingException Thrown when an error occurs while reading the schema file
*/
public static String decodeProtobuf(DynamicSchema schema, String messageType, InputStream encodedData) throws InvalidProtocolBufferException, Descriptors.DescriptorValidationException, UnknownMessageTypeException, MessageDecodingException, SchemaLoadingException {
Descriptors.Descriptor descriptor;
DynamicMessage message;
descriptor = schema.getMessageDescriptor(messageType);
if (descriptor == null) {
throw new UnknownMessageTypeException(messageType);
}
try {
message = DynamicMessage.parseFrom(descriptor, encodedData);
} catch (IOException e) {
throw new MessageDecodingException(e);
}
return JSONMapper.toJSON(message);
}
private boolean dumpPlainField(DynamicMessage.Builder builder, IdentifyDescriptor ident,
DataDstWriterNode.DataDstFieldDescriptor field, boolean isTopLevel) throws ConvException {
Descriptors.FieldDescriptor fd = (Descriptors.FieldDescriptor) field.getRawDescriptor();
if (null == fd) {
// 不需要提示,如果从其他方式解包协议描述的时候可能有可选字段丢失的
return false;
}
if (null == ident) {
if (ProgramOptions.getInstance().enbleEmptyList) {
dumpDefault(builder, fd);
}
return false;
}
DataContainer<String> res = DataSrcImpl.getOurInstance().getValue(ident, "");
if (null == res || !res.valid) {
if (field.isRequired() || ProgramOptions.getInstance().enbleEmptyList) {
dumpDefault(builder, fd);
}
return false;
}
return dumpPlainField(builder, ident, field, isTopLevel, res.value);
}
@Test
public void concatenateSingleMessage() throws Descriptors.DescriptorValidationException {
long kafkaOffset = 21L;
DynamicMessage.Builder inMsg = createBodyBuilder();
Descriptors.Descriptor inMsgDesc = inMsg.getDescriptorForType();
inMsg.setField(inMsgDesc.findFieldByName("bodyInt"), 1);
inMsg.setField(inMsgDesc.findFieldByName("bodyString"), "one");
Map<String, Object> expectedValues = new HashMap<>();
expectedValues.put("bodyInt", 1);
expectedValues.put("bodyString", "one");
expectedValues.put(ProtoConcatenator.TIMESTAMP_FIELD_NAME, 21L);
expectedValues.put(ProtoConcatenator.KAFKA_OFFSET, kafkaOffset);
testAllOutTypesWith(21L, Collections.singletonList(inMsg.build()), expectedValues, kafkaOffset);
}
@Test
public void concatenateDifferentMessages() throws Descriptors.DescriptorValidationException {
long kafkaOffset = 0L;
DynamicMessage.Builder headerMessageBuilder = createHeaderMessageBuilder();
Descriptors.Descriptor headerMsgDesc = headerMessageBuilder.getDescriptorForType();
headerMessageBuilder.setField(headerMsgDesc.findFieldByName("id"), 1)
.setField(headerMsgDesc.findFieldByName("name"), "one");
DynamicMessage.Builder bodyMessageBuilder = createBodyBuilder();
Descriptors.Descriptor bodyMsgDesc = bodyMessageBuilder.getDescriptorForType();
bodyMessageBuilder.setField(bodyMsgDesc.findFieldByName("bodyInt"), 2)
.setField(bodyMsgDesc.findFieldByName("bodyString"), "two");
Map<String, Object> expectedValues = new HashMap<>();
expectedValues.put("id", 1);
expectedValues.put("name", "one");
expectedValues.put("bodyInt", 2);
expectedValues.put("bodyString", "two");
expectedValues.put(ProtoConcatenator.TIMESTAMP_FIELD_NAME, 0L);
expectedValues.put(ProtoConcatenator.KAFKA_OFFSET, kafkaOffset);
testAllOutTypesWith(0L, Arrays.asList(headerMessageBuilder.build(), bodyMessageBuilder.build()), expectedValues, kafkaOffset);
}
@Test
public void testProtoToSdcExtensionFields() throws Exception {
List<DynamicMessage> messages = ProtobufTestUtil.getMessages(
md,
extensionRegistry,
ProtobufTestUtil.getProtoBufData()
);
for (int i = 0; i < messages.size(); i++) {
DynamicMessage m = messages.get(i);
Record record = RecordCreator.create();
Field field = ProtobufTypeUtil.protobufToSdcField(record, "", md, typeToExtensionMap, m);
Assert.assertNotNull(field);
ProtobufTestUtil.checkProtobufRecordsForExtensions(field, i);
}
}
@Nonnull
@Override
public byte[] serialize(@Nonnull RecordMetaData metaData,
@Nonnull RecordType recordType,
@Nonnull Message record,
@Nullable StoreTimer timer) {
long startTime = System.nanoTime();
try {
// Wrap in union message, if needed.
Message storedRecord = record;
Descriptors.Descriptor unionDescriptor = metaData.getUnionDescriptor();
if (unionDescriptor != null) {
DynamicMessage.Builder unionBuilder = DynamicMessage.newBuilder(unionDescriptor);
Descriptors.FieldDescriptor unionField = metaData.getUnionFieldForRecordType(recordType);
unionBuilder.setField(unionField, record);
storedRecord = unionBuilder.build();
}
return serializeToBytes(storedRecord);
} finally {
if (timer != null) {
timer.recordSinceNanoTime(Events.SERIALIZE_PROTOBUF_RECORD, startTime);
}
}
}
@Nonnull
@Override
@SpotBugsSuppressWarnings("RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
public Message deserialize(@Nonnull final RecordMetaData metaData,
@Nonnull final Tuple primaryKey,
@Nonnull final byte[] serialized,
@Nullable StoreTimer timer) {
final long startTime = System.nanoTime();
try {
final Descriptors.Descriptor unionDescriptor = metaData.getUnionDescriptor();
final DynamicMessage unionMessage = deserializeUnion(unionDescriptor, primaryKey, serialized, metaData.getVersion());
return getUnionField(unionMessage, primaryKey).getRight();
} finally {
if (timer != null) {
timer.recordSinceNanoTime(Events.DESERIALIZE_PROTOBUF_RECORD, startTime);
}
}
}
public ProtobufDataParser(
ProtoConfigurableEntity.Context context,
String messageId,
Descriptors.Descriptor descriptor,
Map<String, Set<Descriptors.FieldDescriptor>> messageTypeToExtensionMap,
ExtensionRegistry extensionRegistry,
InputStream inputStream,
String readerOffset,
int maxObjectLength,
boolean isDelimited
) throws IOException, Descriptors.DescriptorValidationException, DataParserException {
this.context = context;
this.inputStream = new OverrunInputStream(inputStream, maxObjectLength, true);
this.messageId = messageId;
this.messageTypeToExtensionMap = messageTypeToExtensionMap;
this.extensionRegistry = extensionRegistry;
this.descriptor = descriptor;
this.builder = DynamicMessage.newBuilder(descriptor);
this.isDelimited = isDelimited;
// skip to the required location
if (readerOffset != null && !readerOffset.isEmpty() && !readerOffset.equals("0")) {
int offset = Integer.parseInt(readerOffset);
this.inputStream.skip(offset);
}
}
@Test
public void testNonEmptyRepeated() throws DataGeneratorException {
Record r = RecordCreator.create();
Map<String, Field> repeated = new HashMap<>();
repeated.put(
"samples",
Field.create(
Field.Type.LIST,
Arrays.asList(
Field.create(1),
Field.create(2),
Field.create(3),
Field.create(4),
Field.create(5)
)
)
);
r.set(Field.create(repeated));
Descriptors.Descriptor descriptor = RepeatedProto.getDescriptor().findMessageTypeByName("Repeated");
// repeated field samples is null and ignored
DynamicMessage dynamicMessage = ProtobufTypeUtil.sdcFieldToProtobufMsg(
r,
descriptor,
typeToExtensionMap,
defaultValueMap
);
// null repeated fields are treated as empty arrays
Object samples = dynamicMessage.getField(descriptor.findFieldByName("samples"));
Assert.assertNotNull(samples);
Assert.assertTrue(samples instanceof List);
Assert.assertEquals(5, ((List)samples).size());
}
@Test
public void testDynamicMessage() throws Exception {
DynamicMessage message =
DynamicMessage.newBuilder(MessageA.getDescriptor())
.setField(
MessageA.getDescriptor().findFieldByNumber(MessageA.FIELD1_FIELD_NUMBER), "foo")
.build();
Coder<DynamicMessage> coder = DynamicProtoCoder.of(message.getDescriptorForType());
// Special code to check the DynamicMessage equality (@see IsDynamicMessageEqual)
for (Coder.Context context : ALL_CONTEXTS) {
CoderProperties.coderDecodeEncodeInContext(
coder, context, message, IsDynamicMessageEqual.equalTo(message));
}
}
@Test
public void testRepeatedRowToProto() {
ProtoDynamicMessageSchema schemaProvider =
schemaFromDescriptor(RepeatPrimitive.getDescriptor());
SerializableFunction<Row, DynamicMessage> fromRow = schemaProvider.getFromRowFunction();
assertEquals(REPEATED_PROTO.toString(), fromRow.apply(REPEATED_ROW).toString());
}
private Map<String, Object> getMappings(DynamicMessage message, ColumnMapping columnMapping) {
if (message == null || columnMapping == null || columnMapping.isEmpty()) {
return Collections.emptyMap();
}
Descriptors.Descriptor descriptorForType = message.getDescriptorForType();
Map<String, Object> row = new HashMap<>(columnMapping.size());
columnMapping.forEach((key, value) -> {
String columnName = value.toString();
String column = key.toString();
if (column.equals(Config.RECORD_NAME)) {
return;
}
Integer protoIndex = Integer.valueOf(column);
Descriptors.FieldDescriptor fieldDesc = descriptorForType.findFieldByNumber(protoIndex);
if (fieldDesc != null && !message.getField(fieldDesc).toString().isEmpty()) {
Object field = message.getField(fieldDesc);
ProtoField protoField = FieldFactory.getField(fieldDesc, field);
Object fieldValue = protoField.getValue();
if (fieldValue instanceof List) {
addRepeatedFields(row, (String) key, value, (List<Object>) fieldValue);
return;
}
if (protoField.getClass().getName().equals(NestedField.class.getName())) {
try {
columnName = getNestedColumnName((ColumnMapping) value);
fieldValue = getMappings((DynamicMessage) field, (ColumnMapping) value);
} catch (Exception e) {
log.error("Exception::Handling nested field failure: {}", e.getMessage());
}
}
row.put(columnName, fieldValue);
}
});
return row;
}
/**
* Validate that it is legal to change the records descriptor from proto2 to proto3 as long as all of the records
* contained within that file are still the same syntax.
*/
@Test
public void onlyFileProto2ToProto3() throws InvalidProtocolBufferException {
assertNotEquals(TestRecords1Proto.getDescriptor().getSyntax(), TestRecords1ImportedProto.getDescriptor().getSyntax());
MetaDataEvolutionValidator.getDefaultInstance().validateUnion(
TestRecords1Proto.RecordTypeUnion.getDescriptor(),
TestRecords1ImportedProto.RecordTypeUnion.getDescriptor()
);
MetaDataEvolutionValidator.getDefaultInstance().validateUnion(
TestRecords1ImportedProto.RecordTypeUnion.getDescriptor(),
TestRecords1Proto.RecordTypeUnion.getDescriptor()
);
RecordMetaData metaData1 = RecordMetaData.build(TestRecords1Proto.getDescriptor());
RecordMetaData metaData2 = replaceRecordsDescriptor(metaData1, TestRecords1ImportedProto.getDescriptor());
MetaDataEvolutionValidator.getDefaultInstance().validate(metaData1, metaData2);
// Validate that the nested proto2 records in the proto3 file have proper nullability semantics
TestRecords1Proto.RecordTypeUnion unionRecordProto2 = TestRecords1Proto.RecordTypeUnion.newBuilder()
.setMySimpleRecord(TestRecords1Proto.MySimpleRecord.newBuilder().setRecNo(0).setStrValueIndexed(""))
.build();
assertThat(unionRecordProto2.getMySimpleRecord().hasNumValue2(), is(false));
assertThat(unionRecordProto2.getMySimpleRecord().hasStrValueIndexed(), is(true));
TestRecords1ImportedProto.RecordTypeUnion unionRecordProto3 = TestRecords1ImportedProto.RecordTypeUnion.parseFrom(unionRecordProto2.toByteString());
assertThat(unionRecordProto3.getMySimpleRecord().hasNumValue2(), is(false));
assertThat(unionRecordProto3.getMySimpleRecord().hasStrValueIndexed(), is(true));
final FieldDescriptor unionField = TestRecords1ImportedProto.RecordTypeUnion.getDescriptor().findFieldByName("_MySimpleRecord");
final FieldDescriptor numValue2Field = TestRecords1Proto.MySimpleRecord.getDescriptor().findFieldByName("num_value_2");
final FieldDescriptor strValueIndexedField = TestRecords1Proto.MySimpleRecord.getDescriptor().findFieldByName("str_value_indexed");
DynamicMessage dynamicUnionRecordProto3 = DynamicMessage.parseFrom(TestRecords1ImportedProto.RecordTypeUnion.getDescriptor(), unionRecordProto2.toByteString());
Message dynamicSimpleRecord = (Message)dynamicUnionRecordProto3.getField(unionField);
assertThat(dynamicSimpleRecord.hasField(numValue2Field), is(false));
assertThat(dynamicSimpleRecord.hasField(strValueIndexedField), is(true));
}
/**
* testBasic - basic usage
*/
@Test
public void testBasic() throws Exception {
log("--- testBasic ---");
// Create dynamic schema
DynamicSchema.Builder schemaBuilder = DynamicSchema.newBuilder();
schemaBuilder.setName("PersonSchemaDynamic.proto");
MessageDefinition msgDef = MessageDefinition.newBuilder("Person") // message Person
.addField("required", "int32", "id", 1) // required int32 id = 1
.addField("required", "string", "name", 2) // required string name = 2
.addField("optional", "string", "email", 3) // optional string email = 3
.build();
schemaBuilder.addMessageDefinition(msgDef);
DynamicSchema schema = schemaBuilder.build();
log(schema);
// Create dynamic message from schema
DynamicMessage.Builder msgBuilder = schema.newMessageBuilder("Person");
Descriptor msgDesc = msgBuilder.getDescriptorForType();
DynamicMessage msg = msgBuilder
.setField(msgDesc.findFieldByName("id"), 1)
.setField(msgDesc.findFieldByName("name"), "Alan Turing")
.setField(msgDesc.findFieldByName("email"), "[email protected]")
.build();
log(msg);
// Create data object traditional way using generated code
PersonSchema.Person person = PersonSchema.Person.newBuilder()
.setId(1)
.setName("Alan Turing")
.setEmail("[email protected]")
.build();
// Should be equivalent
Assert.assertEquals(person.toString(), msg.toString());
}
@Test
public void shouldParseTestMessage() throws InvalidProtocolBufferException {
TestMessage testMessage = TestMessage.newBuilder().setOrderNumber("order").build();
DynamicMessage dynamicMessage = testMessageParser.parse(testMessage.toByteArray());
Descriptors.FieldDescriptor fieldDescriptor = dynamicMessage.getDescriptorForType().getFields().get(0);
assertEquals(dynamicMessage.getField(fieldDescriptor), "order");
assertEquals(dynamicMessage.toString(), testMessage.toString());
}
@Test
public void shouldNotParseRandomLogMessage() throws InvalidProtocolBufferException {
TestNestedMessage protoMessage = TestNestedMessage.newBuilder().build();
DynamicMessage message = testMessageParser.parse(protoMessage.toByteArray());
assertNotEquals(message.toByteArray(), "random".getBytes());
assertEquals(message.getAllFields().size(), 0);
}
public static List<DynamicMessage> parseToMessages(TypeRegistry registry, Descriptor descriptor,
List<String> jsonTexts) {
Parser parser = JsonFormat.parser().usingTypeRegistry(registry);
List<DynamicMessage> messages = new ArrayList<>();
try {
for (String jsonText : jsonTexts) {
DynamicMessage.Builder messageBuilder = DynamicMessage.newBuilder(descriptor);
parser.merge(jsonText, messageBuilder);
messages.add(messageBuilder.build());
}
return messages;
} catch (InvalidProtocolBufferException e) {
throw new IllegalArgumentException("Unable to parse json text", e);
}
}
public CallResults invokeMethod(GrpcMethodDefinition definition, Channel channel, CallOptions callOptions,
List<String> requestJsonTexts) {
FileDescriptorSet fileDescriptorSet = GrpcReflectionUtils.resolveService(channel, definition.getFullServiceName());
if (fileDescriptorSet == null) {
return null;
}
ServiceResolver serviceResolver = ServiceResolver.fromFileDescriptorSet(fileDescriptorSet);
MethodDescriptor methodDescriptor = serviceResolver.resolveServiceMethod(definition);
TypeRegistry registry = TypeRegistry.newBuilder().add(serviceResolver.listMessageTypes()).build();
List<DynamicMessage> requestMessages = GrpcReflectionUtils.parseToMessages(registry, methodDescriptor.getInputType(),
requestJsonTexts);
CallResults results = new CallResults();
StreamObserver<DynamicMessage> streamObserver = MessageWriter.newInstance(registry, results);
CallParams callParams = CallParams.builder()
.methodDescriptor(methodDescriptor)
.channel(channel)
.callOptions(callOptions)
.requests(requestMessages)
.responseObserver(streamObserver)
.build();
try {
grpcClientService.call(callParams).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Caught exception while waiting for rpc", e);
}
return results;
}
@Override
public void packRow(FDBStore store, Session session,
FDBStoreData storeData, Row row) {
ensureRowConverter();
DynamicMessage msg = rowConverter.encode(row);
storeData.rawValue = msg.toByteArray();
}
@Override
public DynamicMessage parse(InputStream inputStream) {
try {
return DynamicMessage.newBuilder(messageDescriptor)
.mergeFrom(inputStream, ExtensionRegistryLite.getEmptyRegistry())
.build();
} catch (IOException e) {
throw new RuntimeException("Unable to merge from the supplied input stream", e);
}
}
@Test
public void exampleUsage() {
Message originalMessage = SimpleMessage.newBuilder().setName("bob").build();
DynamicMessage message = dynamic(originalMessage);
AddressResolver addressResolver =
fromAddressTemplate(
originalMessage.getDescriptorForType(), "com.ververica/python-function/{{$.name}}");
assertThat(
addressResolver.evaluate(message), is(address("com.ververica", "python-function", "bob")));
}
@Test
public void multipleReplacements() {
Message originalMessage = SimpleMessage.newBuilder().setName("bob").build();
DynamicMessage message = dynamic(originalMessage);
AddressResolver addressResolver =
fromAddressTemplate(
originalMessage.getDescriptorForType(), "com.{{$.name}}/python-{{$.name}}/{{$.name}}");
assertThat(addressResolver.evaluate(message), is(address("com.bob", "python-bob", "bob")));
}
@Override
public Record parse() throws IOException, DataParserException {
DynamicMessage message;
long pos = inputStream.getPos();
inputStream.resetCount();
if (!isDelimited) {
if (!eof) {
builder.mergeFrom(inputStream, extensionRegistry);
// Set EOF since non-delimited can only contain a single message.
eof = true;
} else {
return null;
}
} else {
if (!builder.mergeDelimitedFrom(inputStream, extensionRegistry)) {
// No more messages to process in this stream.
eof = true;
return null;
}
}
message = builder.build();
// If the message does not contain required fields then the above call throws UninitializedMessageException
// with a message similar to the following:
// com.google.protobuf.UninitializedMessageException: Message missing required fields: phone[0].type
builder.clear();
Record record = context.createRecord(messageId + OFFSET_SEPARATOR + pos);
record.set(ProtobufTypeUtil.protobufToSdcField(record, "", descriptor, messageTypeToExtensionMap, message));
return record;
}