下面列出了com.google.protobuf.Descriptors#DescriptorValidationException ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
protected DynamicMessage readData(byte[] schema, ByteBuffer buffer, int start, int length) {
try {
Serde.Schema s = Serde.Schema.parseFrom(schema);
Descriptors.FileDescriptor fileDescriptor = toFileDescriptor(s);
byte[] bytes = new byte[length];
System.arraycopy(buffer.array(), start, bytes, 0, length);
ByteArrayInputStream is = new ByteArrayInputStream(bytes);
Serde.Ref ref = Serde.Ref.parseDelimitedFrom(is);
Descriptors.Descriptor descriptor = fileDescriptor.findMessageTypeByName(ref.getName());
return DynamicMessage.parseFrom(descriptor, is);
} catch (IOException | Descriptors.DescriptorValidationException e) {
throw new IllegalStateException(e);
}
}
@Test
public void concatenateSingleMessage() throws Descriptors.DescriptorValidationException {
long kafkaOffset = 21L;
DynamicMessage.Builder inMsg = createBodyBuilder();
Descriptors.Descriptor inMsgDesc = inMsg.getDescriptorForType();
inMsg.setField(inMsgDesc.findFieldByName("bodyInt"), 1);
inMsg.setField(inMsgDesc.findFieldByName("bodyString"), "one");
Map<String, Object> expectedValues = new HashMap<>();
expectedValues.put("bodyInt", 1);
expectedValues.put("bodyString", "one");
expectedValues.put(ProtoConcatenator.TIMESTAMP_FIELD_NAME, 21L);
expectedValues.put(ProtoConcatenator.KAFKA_OFFSET, kafkaOffset);
testAllOutTypesWith(21L, Collections.singletonList(inMsg.build()), expectedValues, kafkaOffset);
}
@Test
public void concatenateMessagesWithClashingNamesAndIds() throws Descriptors.DescriptorValidationException {
long kafkaOffset = 0L;
DynamicMessage.Builder headerMessageBuilder = createHeaderMessageBuilder();
Descriptors.Descriptor headerMsgDesc = headerMessageBuilder.getDescriptorForType();
headerMessageBuilder.setField(headerMsgDesc.findFieldByName("id"), 1)
.setField(headerMsgDesc.findFieldByName("name"), "one");
DynamicMessage.Builder bodyMessageBuilder = createOtherHeaderMessageBuilder();
Descriptors.Descriptor bodyMsgDesc = bodyMessageBuilder.getDescriptorForType();
bodyMessageBuilder.setField(bodyMsgDesc.findFieldByName("otherId"), 2)
.setField(bodyMsgDesc.findFieldByName("otherName"), "two");
Map<String, Object> expectedValues = new HashMap<>();
expectedValues.put("id", 1);
expectedValues.put("name", "one");
expectedValues.put("otherId", 2);
expectedValues.put("otherName", "two");
expectedValues.put(ProtoConcatenator.TIMESTAMP_FIELD_NAME, 0L);
expectedValues.put(ProtoConcatenator.KAFKA_OFFSET, kafkaOffset);
testAllOutTypesWith(0L, Arrays.asList(headerMessageBuilder.build(), bodyMessageBuilder.build()), expectedValues, kafkaOffset);
}
private void testOutput(
DescriptorProtos.FileDescriptorProto protoBuilder, ProtoDomain container, String expected)
throws Descriptors.DescriptorValidationException {
Descriptors.FileDescriptor[] dependencies = new Descriptors.FileDescriptor[1];
dependencies[0] = Option.getDescriptor();
Descriptors.FileDescriptor fileDescriptor =
Descriptors.FileDescriptor.buildFrom(protoBuilder, dependencies);
testOutput(fileDescriptor, container, expected);
}
private void testOutput(
Descriptors.FileDescriptor fileDescriptor, ProtoDomain container, String expected)
throws Descriptors.DescriptorValidationException {
// expected = expected + "\n// test";
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ProtoLanguageFileWriter.write(fileDescriptor, container, outputStream);
Assert.assertEquals(expected, outputStream.toString());
}
@Test
public void localMetaDataWithRenamed() throws Descriptors.DescriptorValidationException {
// Rename a record type
RecordMetaData metaData = RecordMetaData.build(TestRecords1Proto.getDescriptor());
RecordMetaDataProto.MetaData.Builder protoBuilder = metaData.toProto().toBuilder();
MetaDataProtoEditor.renameRecordType(protoBuilder, "MySimpleRecord", "MyNewSimpleRecord");
Descriptors.FileDescriptor updatedFile = Descriptors.FileDescriptor.buildFrom(protoBuilder.getRecords(), TestRecords1Proto.getDescriptor().getDependencies().toArray(new Descriptors.FileDescriptor[0]));
// Validate that the type name change happened
assertNull(updatedFile.findMessageTypeByName("MySimpleRecord"));
Descriptors.Descriptor newSimpleRecordDescriptor = updatedFile.findMessageTypeByName("MyNewSimpleRecord");
assertNotNull(newSimpleRecordDescriptor);
assertSame(newSimpleRecordDescriptor, updatedFile.findMessageTypeByName(RecordMetaDataBuilder.DEFAULT_UNION_NAME).findFieldByName("_MyNewSimpleRecord").getMessageType());
RecordMetaData metaData2 = RecordMetaData.newBuilder()
.setLocalFileDescriptor(updatedFile)
.setRecords(metaData.toProto())
.build();
assertThrows(MetaDataException.class, () -> metaData2.getRecordType("MySimpleRecord"));
assertNotNull(metaData2.getRecordType("MyNewSimpleRecord"));
assertSame(newSimpleRecordDescriptor, metaData2.getRecordType("MyNewSimpleRecord").getDescriptor());
assertEquals(Collections.singletonList(metaData2.getRecordType("MyNewSimpleRecord")),
metaData2.recordTypesForIndex(metaData2.getIndex("MySimpleRecord$str_value_indexed")));
assertSame(updatedFile.findMessageTypeByName(RecordMetaDataBuilder.DEFAULT_UNION_NAME), metaData2.getUnionDescriptor());
MetaDataException e = assertThrows(MetaDataException.class, metaData2::toProto);
assertEquals("cannot serialize meta-data with a local records descriptor to proto", e.getMessage());
}
@Test
public void renameOuterTypeWithNestedTypeWithSameName() throws Descriptors.DescriptorValidationException {
final DescriptorProtos.FileDescriptorProto.Builder fileBuilder = TestRecordsDoubleNestedProto.getDescriptor().toProto().toBuilder();
fileBuilder.getMessageTypeBuilderList().forEach(message -> {
if (message.getName().equals("OuterRecord")) {
message.getNestedTypeBuilderList().forEach(nestedMessage -> {
if (nestedMessage.getName().equals("MiddleRecord")) {
nestedMessage.setName("OuterRecord");
}
});
renameFieldTypes(message, ".com.apple.foundationdb.record.test.doublenested.OuterRecord.MiddleRecord", "OuterRecord");
} else {
renameFieldTypes(message, ".com.apple.foundationdb.record.test.doublenested.OuterRecord.MiddleRecord", ".com.apple.foundationdb.record.test.doublenested.OuterRecord.OuterRecord");
}
});
// Make sure the types were renamed in a way that preserves type, etc.
Descriptors.FileDescriptor modifiedFile = Descriptors.FileDescriptor.buildFrom(fileBuilder.build(), TestRecordsDoubleNestedProto.getDescriptor().getDependencies().toArray(new Descriptors.FileDescriptor[0]));
Descriptors.Descriptor outerOuterRecord = modifiedFile.findMessageTypeByName("OuterRecord");
assertNotNull(outerOuterRecord);
Descriptors.Descriptor nestedOuterRecord = outerOuterRecord.findNestedTypeByName("OuterRecord");
assertNotNull(nestedOuterRecord);
assertNotSame(outerOuterRecord, nestedOuterRecord);
assertSame(outerOuterRecord, nestedOuterRecord.findNestedTypeByName("InnerRecord").findFieldByName("outer").getMessageType());
assertSame(nestedOuterRecord, outerOuterRecord.findFieldByName("middle").getMessageType());
assertSame(nestedOuterRecord, outerOuterRecord.findFieldByName("inner").getMessageType().getContainingType());
assertSame(nestedOuterRecord, modifiedFile.findMessageTypeByName("MiddleRecord").findFieldByName("other_middle").getMessageType());
RecordMetaData metaData = RecordMetaData.build(modifiedFile);
RecordMetaDataProto.MetaData.Builder metaDataProtoBuilder = metaData.toProto().toBuilder();
MetaDataProtoEditor.AmbiguousTypeNameException e = assertThrows(MetaDataProtoEditor.AmbiguousTypeNameException.class, () -> MetaDataProtoEditor.renameRecordType(metaDataProtoBuilder, "OuterRecord", "OtterRecord"));
assertEquals("Field middle in message .com.apple.foundationdb.record.test.doublenested.OuterRecord of type OuterRecord might be of type .com.apple.foundationdb.record.test.doublenested.OuterRecord", e.getMessage());
}
private boolean fromSelectStatement(Connection connection, String selectStatement, Set<String> keySet, boolean sorted,
KeyValueStoreBuilder keyValueStoreBuilder) throws SQLException, Descriptors.DescriptorValidationException, IOException, InterruptedException {
Statement statement = connection.createStatement();
String viewName = "temp_" + StringUtils.join(keySet.toArray(), "_") + "_" + ++VIEW_NUMBER;
statement.execute("CREATE TEMP VIEW " + viewName + " AS " + selectStatement + ";");
boolean result = fromTable(connection, viewName, keySet, sorted, keyValueStoreBuilder);
statement.close();
return result;
}
private static DynamicMessage.Builder createHeaderMessageBuilder() throws Descriptors.DescriptorValidationException {
String messageName = "Header";
DynamicSchema.Builder schemaBuilder = DynamicSchema.newBuilder();
MessageDefinition msgDef = MessageDefinition.newBuilder(messageName)
.addField("required", "int32", "id", 1)
.addField("optional", "string", "name", 2)
.build();
schemaBuilder.addMessageDefinition(msgDef);
DynamicSchema schema = schemaBuilder.build();
return schema.newMessageBuilder(messageName);
}
private static Descriptors.Descriptor descriptorForTypeWithHeader(Descriptors.Descriptor classDescriptor)
throws Descriptors.DescriptorValidationException {
final Collection<Descriptors.FieldDescriptor> allFields = new ArrayList<>();
allFields.addAll(EventHeaderProtos.Header.getDescriptor().getFields());
allFields.addAll(classDescriptor.getFields());
DynamicMessage.Builder builder = ProtoConcatenator.buildMessageBuilder("OffsetResetter", allFields);
return builder.getDescriptorForType();
}
@Test
public void concatenateEmptyWithEmpty() throws Descriptors.DescriptorValidationException {
long kafkaOffset = 0L;
HashMap<String, Object> expectedValues = new HashMap<>();
expectedValues.put(ProtoConcatenator.TIMESTAMP_FIELD_NAME, 0L);
expectedValues.put(ProtoConcatenator.KAFKA_OFFSET, kafkaOffset);
testAllOutTypesWith(0L, Arrays.asList(createEmptyMessage(), createEmptyMessage()), expectedValues, kafkaOffset);
}
/**
* Loads a Protobuf file descriptor set into an ubermap of file descriptors.
*
* @param set FileDescriptorSet
* @param dependenciesMap FileDescriptor dependency map
* @param fileDescriptorMap The populated map of FileDescriptors
* @throws StageException
*/
public static void getAllFileDescriptors(
DescriptorProtos.FileDescriptorSet set,
Map<String, Set<Descriptors.FileDescriptor>> dependenciesMap,
Map<String, Descriptors.FileDescriptor> fileDescriptorMap
) throws StageException {
List<DescriptorProtos.FileDescriptorProto> fileList = set.getFileList();
try {
for (DescriptorProtos.FileDescriptorProto fdp : fileList) {
if (!fileDescriptorMap.containsKey(fdp.getName())) {
Set<Descriptors.FileDescriptor> dependencies = dependenciesMap.get(fdp.getName());
if (dependencies == null) {
dependencies = new LinkedHashSet<>();
dependenciesMap.put(fdp.getName(), dependencies);
dependencies.addAll(getDependencies(dependenciesMap, fileDescriptorMap, fdp, set));
}
Descriptors.FileDescriptor fileDescriptor = Descriptors.FileDescriptor.buildFrom(
fdp,
dependencies.toArray(new Descriptors.FileDescriptor[dependencies.size()])
);
fileDescriptorMap.put(fdp.getName(), fileDescriptor);
}
}
} catch (Descriptors.DescriptorValidationException e) {
throw new StageException(Errors.PROTOBUF_07, e.getDescription(), e);
}
}
private static DynamicMessage.Builder createMessageWithRepeatedField()
throws Descriptors.DescriptorValidationException {
DynamicSchema.Builder schemaBuilder = DynamicSchema.newBuilder();
MessageDefinition msgDef = MessageDefinition.newBuilder("Repeated")
.addField("repeated", "int32", "repeated_field", 1)
.build();
schemaBuilder.addMessageDefinition(msgDef);
DynamicSchema schema = schemaBuilder.build();
return schema.newMessageBuilder("Repeated");
}
public static Descriptors.Descriptor getDescriptor() throws Descriptors.DescriptorValidationException {
return descriptorForTypeWithHeader(FlinkEventProtos.JobManagerEvent.getDescriptor());
}
public static Descriptors.Descriptor getDescriptor() throws Descriptors.DescriptorValidationException {
return descriptorForTypeWithHeader(FlinkEventProtos.OperatorEvent.getDescriptor());
}
@Test
public void nestedFieldIsTypeUnqualified() throws Descriptors.DescriptorValidationException {
final DescriptorProtos.FileDescriptorProto.Builder fileBuilder = TestRecordsDoubleNestedProto.getDescriptor().toProto().toBuilder();
final DescriptorProtos.FieldDescriptorProto.Builder innerBuilder = fileBuilder.getMessageTypeBuilderList().stream()
.filter(message -> message.getName().equals("OuterRecord"))
.flatMap(message -> message.getFieldBuilderList().stream())
.filter(field -> field.getName().equals("inner"))
.findAny()
.get();
// Unqualify the inner field
innerBuilder.setTypeName("MiddleRecord.InnerRecord");
// Ensure that the type actually resolves to the same type
Descriptors.FileDescriptor modifiedFileDescriptor = Descriptors.FileDescriptor.buildFrom(fileBuilder.build(), TestRecordsDoubleNestedProto.getDescriptor().getDependencies().toArray(new Descriptors.FileDescriptor[0]));
Descriptors.Descriptor innerRecordDescriptor = modifiedFileDescriptor.findMessageTypeByName("OuterRecord").findNestedTypeByName("MiddleRecord").findNestedTypeByName("InnerRecord");
assertNotNull(innerRecordDescriptor);
assertSame(innerRecordDescriptor, modifiedFileDescriptor.findMessageTypeByName("OuterRecord").findFieldByName("inner").getMessageType());
assertEquals(FieldTypeMatch.MIGHT_MATCH,
fieldIsType(fileBuilder, "OuterRecord", "inner", "OuterRecord.MiddleRecord.InnerRecord"));
assertEquals(FieldTypeMatch.MIGHT_MATCH_AS_NESTED,
fieldIsType(fileBuilder, "OuterRecord", "inner", "OuterRecord.MiddleRecord"));
assertEquals(FieldTypeMatch.MIGHT_MATCH_AS_NESTED,
fieldIsType(fileBuilder, "OuterRecord", "inner", "OuterRecord"));
assertEquals(FieldTypeMatch.MIGHT_MATCH,
fieldIsType(fileBuilder, "OuterRecord", "inner", "MiddleRecord.InnerRecord"));
assertEquals(FieldTypeMatch.MIGHT_MATCH_AS_NESTED,
fieldIsType(fileBuilder, "OuterRecord", "inner", "MiddleRecord"));
assertEquals(FieldTypeMatch.DOES_NOT_MATCH,
fieldIsType(fileBuilder, "OuterRecord", "inner", ".com.apple.foundationdb.record.test.doublenested.OtherRecord"));
innerBuilder.setTypeName("OuterRecord.MiddleRecord.InnerRecord");
modifiedFileDescriptor = Descriptors.FileDescriptor.buildFrom(fileBuilder.build(), TestRecordsDoubleNestedProto.getDescriptor().getDependencies().toArray(new Descriptors.FileDescriptor[0]));
innerRecordDescriptor = modifiedFileDescriptor.findMessageTypeByName("OuterRecord").findNestedTypeByName("MiddleRecord").findNestedTypeByName("InnerRecord");
assertNotNull(innerRecordDescriptor);
assertSame(innerRecordDescriptor, modifiedFileDescriptor.findMessageTypeByName("OuterRecord").findFieldByName("inner").getMessageType());
assertEquals(FieldTypeMatch.MIGHT_MATCH,
fieldIsType(fileBuilder, "OuterRecord", "inner", "OuterRecord.MiddleRecord.InnerRecord"));
assertEquals(FieldTypeMatch.MIGHT_MATCH_AS_NESTED,
fieldIsType(fileBuilder, "OuterRecord", "inner", "OuterRecord.MiddleRecord"));
assertEquals(FieldTypeMatch.MIGHT_MATCH_AS_NESTED,
fieldIsType(fileBuilder, "OuterRecord", "inner", "OuterRecord"));
assertEquals(FieldTypeMatch.DOES_NOT_MATCH,
fieldIsType(fileBuilder, "OuterRecord", "inner", "MiddleRecord.InnerRecord"));
assertEquals(FieldTypeMatch.DOES_NOT_MATCH,
fieldIsType(fileBuilder, "OuterRecord", "inner", "MiddleRecord"));
assertEquals(FieldTypeMatch.DOES_NOT_MATCH,
fieldIsType(fileBuilder, "OuterRecord", "inner", ".com.apple.foundationdb.record.test.doublenested.OtherRecord"));
RecordMetaData metaData = RecordMetaData.build(modifiedFileDescriptor);
RecordMetaDataProto.MetaData.Builder metaDataProtoBuilder = metaData.toProto().toBuilder();
MetaDataProtoEditor.AmbiguousTypeNameException e = assertThrows(MetaDataProtoEditor.AmbiguousTypeNameException.class,
() -> MetaDataProtoEditor.renameRecordType(metaDataProtoBuilder, "OuterRecord", "OtterRecord"));
assertEquals("Field inner in message .com.apple.foundationdb.record.test.doublenested.OuterRecord of type OuterRecord.MiddleRecord.InnerRecord might be of type .com.apple.foundationdb.record.test.doublenested.OuterRecord", e.getMessage());
}
private void assertService(DescriptorProtos.FileDescriptorProto proto, ProtoDomain domain)
throws Descriptors.DescriptorValidationException {
testOutput(
proto,
domain,
"syntax = \"proto3\";\n"
+ "\n"
+ "import \"test/v1/option.proto\";\n"
+ "\n"
+ "\n"
+ "\n"
+ "service Service {\n"
+ "\toption deprecated = true;\n"
+ "\toption (test.v1.service_option) = {\n"
+ "\t\tsingle_string: \"testString\"\n"
+ "\t\trepeated_string: [\"test1\",\"test2\"]\n"
+ "\t\tsingle_int32: 2\n"
+ "\t\trepeated_int32: [3,4]\n"
+ "\t\tsingle_int64: 10\n"
+ "\t\tsingle_enum: ENUM2\n"
+ "\t\tsingle_message: {\n"
+ "\t\t\tsingle_string: \"minimal\"\n"
+ "\t\t\trepeated_string: [\"test1\",\"test2\"]\n"
+ "\t\t\tsingle_int32: 2\n"
+ "\t\t\trepeated_int32: [3]\n"
+ "\t\t\tsingle_enum: ENUM2\n"
+ "\t\t}\n"
+ "\t};\n"
+ "\toption (test.v1.service_option_1) = 12;\n"
+ "\toption (test.v1.service_option_2) = \"String\";\n"
+ "\toption (test.v1.service_option_n) = \"Value I\";\n"
+ "\toption (test.v1.service_option_n) = \"Value II\";\n"
+ "\toption (test.v1.service_option_n) = \"Value III\";\n"
+ "\n"
+ "\trpc FirstMethod(MethodRequest) returns (MethodResponse) {}\n"
+ "\trpc ClientStreamingMethod(stream MethodRequest) returns (MethodResponse) {}\n"
+ "\trpc ServerStreamingMethod(MethodRequest) returns (stream MethodResponse) {\n"
+ "\t\toption (test.v1.method_option) = {\n"
+ "\t\t\tsingle_string: \"minimal\"\n"
+ "\t\t\trepeated_string: [\"test1\",\"test2\"]\n"
+ "\t\t\tsingle_int32: 2\n"
+ "\t\t\trepeated_int32: [3]\n"
+ "\t\t\tsingle_enum: ENUM2\n"
+ "\t\t};\n"
+ "\n"
+ "\t}\n"
+ "\trpc BiStreamingMethod(stream MethodRequest) returns (stream MethodResponse) {\n"
+ "\t\toption deprecated = true;\n"
+ "\t\toption (test.v1.method_option) = {\n"
+ "\t\t\tsingle_string: \"testString\"\n"
+ "\t\t\trepeated_string: [\"test1\",\"test2\"]\n"
+ "\t\t\tsingle_int32: 2\n"
+ "\t\t\trepeated_int32: [3,4]\n"
+ "\t\t\tsingle_int64: 10\n"
+ "\t\t\tsingle_enum: ENUM2\n"
+ "\t\t\tsingle_message: {\n"
+ "\t\t\t\tsingle_string: \"minimal\"\n"
+ "\t\t\t\trepeated_string: [\"test1\",\"test2\"]\n"
+ "\t\t\t\tsingle_int32: 2\n"
+ "\t\t\t\trepeated_int32: [3]\n"
+ "\t\t\t\tsingle_enum: ENUM2\n"
+ "\t\t\t}\n"
+ "\t\t};\n"
+ "\t\toption (test.v1.method_option_1) = 12;\n"
+ "\t\toption (test.v1.method_option_2) = \"String\";\n"
+ "\t\toption (test.v1.method_option_n) = \"Value I\";\n"
+ "\t\toption (test.v1.method_option_n) = \"Value II\";\n"
+ "\t\toption (test.v1.method_option_n) = \"Value III\";\n"
+ "\n"
+ "\t}\n"
+ "}\n"
+ "\n"
+ "message MethodRequest {\n"
+ "\n"
+ "}\n"
+ "\n"
+ "message MethodResponse {\n"
+ "\n"
+ "}\n");
}
public static Descriptors.Descriptor getDescriptor() throws Descriptors.DescriptorValidationException {
return descriptorForTypeWithHeader(SparkEventProtos.TaskEvent.getDescriptor());
}
private boolean fromTable(Connection connection, String tableName, Set<String> keySet, boolean sorted, KeyValueStoreBuilder keyValueStoreBuilder)
throws SQLException, IOException, Descriptors.DescriptorValidationException, InterruptedException {
if (!keyValueStoreBuilder.setup()) {
logger.info("Skipping KeyValueStore creation...");
return false;
}
List<String> keys = new ArrayList<>();
List<String> values = new ArrayList<>();
Statement statement = connection.createStatement();
logger.info(
"SQL statement: " + "SELECT column_name, ordinal_position, udt_name, is_nullable, column_default " + "FROM information_schema.columns "
+ "WHERE table_name='" + tableName.toLowerCase() + "' " + "AND column_name IN (" + PostgresUtil.getPostgresEscapedConcatenatedQuery(keySet)
+ ");");
{
ResultSet resultSetKeys = statement.executeQuery(
"SELECT column_name, ordinal_position, udt_name, is_nullable, column_default " + "FROM information_schema.columns " + "WHERE table_name='"
+ tableName.toLowerCase() + "' " + "AND column_name IN (" + PostgresUtil.getPostgresEscapedConcatenatedQuery(keySet)
+ ") ORDER BY ordinal_position;");
while (resultSetKeys.next()) {
String colName = resultSetKeys.getString(1);
keys.add(colName);
}
resultSetKeys.close();
}
{
ResultSet resultSetValues = statement.executeQuery(
"SELECT column_name, ordinal_position, udt_name, is_nullable, column_default " + "FROM information_schema.columns " + "WHERE table_name='"
+ tableName.toLowerCase() + "' " + "AND column_name NOT IN (" + PostgresUtil.getPostgresEscapedConcatenatedQuery(keySet)
+ ") ORDER BY ordinal_position;");
while (resultSetValues.next()) {
String colName = resultSetValues.getString(1);
values.add(colName);
}
resultSetValues.close();
}
long rowCountEstimated = 0;
{
ResultSet rowCountResultSet = statement
.executeQuery("SELECT reltuples::bigint AS estimate FROM pg_class where relname='" + tableName.toLowerCase() + "';");
if (rowCountResultSet.next()) rowCountEstimated = rowCountResultSet.getLong(1);
rowCountResultSet.close();
}
List<String> colList = new ArrayList<>();
StringBuilder sql = new StringBuilder();
StringBuilder keysStringBuilder = new StringBuilder();
for (String key : keys) {
if (keysStringBuilder.length() != 0) keysStringBuilder.append(", ");
keysStringBuilder.append(key);
colList.add(key);
}
String keyString = keysStringBuilder.toString();
sql.append("SELECT ").append(keyString);
for (String value : values) {
sql.append(", ").append(value);
colList.add(value);
}
sql.append(" FROM ").append(tableName.toLowerCase());
if (!sorted) sql.append(" ORDER BY ").append(keyString);
sql.append(";");
connection.setAutoCommit(false);
statement.setFetchSize(FETCH_SIZE);
logger.info("Requesting database: " + sql.toString());
ResultSet rs = statement.executeQuery(sql.toString());
logger.info("Reading Database (estimated rows: " + rowCountEstimated + ")");
Codec codec = Codec.CodecFactory.getCodec(databaseKeyValueStore);
Object key = null;
Object value = getValue(databaseKeyValueStore.getValueClass());
int count = 0;
logger.info("Starting conversion");
while (rs.next()) {
Object nkey = getKey(keys, rs, 0);
if (nkey == null) {
continue;
}
if (key == null) {
key = nkey;
} else if (!nkey.equals(key)) {
addPair(key, value, keyValueStoreBuilder, codec);
key = nkey;
value = clearValue(value, databaseKeyValueStore.getValueClass());
}
value = addValue(value, rs, databaseKeyValueStore.getValueClass(), keys.size(), values);
if (++count % 1_000_000 == 0) logger.info("Read " + count / 1_000_000 + " mio rows");
}
addPair(key, value, keyValueStoreBuilder, codec);
statement.close();
logger.info("Finished reading " + count + " rows.");
keyValueStoreBuilder.build();
return true;
}
public static Descriptors.Descriptor getDescriptor() throws Descriptors.DescriptorValidationException {
return descriptorForTypeWithHeader(SparkEventProtos.RDDStorageStatus.getDescriptor());
}