下面列出了怎么用org.apache.spark.sql.types.ArrayType的API类实例代码及写法,或者点击链接到github查看源代码。
private static <T> T visitArray(DataType type, Schema array, AvroWithSparkSchemaVisitor<T> visitor) {
if (array.getLogicalType() instanceof LogicalMap || type instanceof MapType) {
Preconditions.checkState(
AvroSchemaUtil.isKeyValueSchema(array.getElementType()),
"Cannot visit invalid logical map type: %s", array);
Preconditions.checkArgument(type instanceof MapType, "Invalid map: %s is not a map", type);
MapType map = (MapType) type;
List<Schema.Field> keyValueFields = array.getElementType().getFields();
return visitor.map(map, array,
visit(map.keyType(), keyValueFields.get(0).schema(), visitor),
visit(map.valueType(), keyValueFields.get(1).schema(), visitor));
} else {
Preconditions.checkArgument(type instanceof ArrayType, "Invalid array: %s is not an array", type);
ArrayType list = (ArrayType) type;
return visitor.array(list, array, visit(list.elementType(), array.getElementType(), visitor));
}
}
@Override
public HapiConverter visitContained(String elementPath,
String elementTypeUrl,
Map<String, StructureField<HapiConverter<DataType>>> contained) {
StructField[] fields = contained.values()
.stream()
.map(containedEntry -> new StructField(containedEntry.fieldName(),
containedEntry.result().getDataType(),
true,
Metadata.empty()))
.toArray(StructField[]::new);
ArrayType container = new ArrayType(new StructType(fields), true);
return new HapiContainedToSparkConverter(contained, container);
}
@Override
public ParquetValueWriter<?> list(ArrayType sArray, GroupType array, ParquetValueWriter<?> elementWriter) {
GroupType repeated = array.getFields().get(0).asGroupType();
String[] repeatedPath = currentPath();
int repeatedD = type.getMaxDefinitionLevel(repeatedPath);
int repeatedR = type.getMaxRepetitionLevel(repeatedPath);
return new ArrayDataWriter<>(repeatedD, repeatedR,
newOption(repeated.getType(0), elementWriter),
sArray.elementType());
}
static <T> T visit(DataType type, SparkTypeVisitor<T> visitor) {
if (type instanceof StructType) {
StructField[] fields = ((StructType) type).fields();
List<T> fieldResults = Lists.newArrayListWithExpectedSize(fields.length);
for (StructField field : fields) {
fieldResults.add(visitor.field(
field,
visit(field.dataType(), visitor)));
}
return visitor.struct((StructType) type, fieldResults);
} else if (type instanceof MapType) {
return visitor.map((MapType) type,
visit(((MapType) type).keyType(), visitor),
visit(((MapType) type).valueType(), visitor));
} else if (type instanceof ArrayType) {
return visitor.array(
(ArrayType) type,
visit(((ArrayType) type).elementType(), visitor));
} else if (type instanceof UserDefinedType) {
throw new UnsupportedOperationException(
"User-defined types are not supported");
} else {
return visitor.atomic(type);
}
}
@Override
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public Object get(int ordinal, DataType dataType) {
if (dataType instanceof IntegerType) {
return getInt(ordinal);
} else if (dataType instanceof LongType) {
return getLong(ordinal);
} else if (dataType instanceof StringType) {
return getUTF8String(ordinal);
} else if (dataType instanceof FloatType) {
return getFloat(ordinal);
} else if (dataType instanceof DoubleType) {
return getDouble(ordinal);
} else if (dataType instanceof DecimalType) {
DecimalType decimalType = (DecimalType) dataType;
return getDecimal(ordinal, decimalType.precision(), decimalType.scale());
} else if (dataType instanceof BinaryType) {
return getBinary(ordinal);
} else if (dataType instanceof StructType) {
return getStruct(ordinal, ((StructType) dataType).size());
} else if (dataType instanceof ArrayType) {
return getArray(ordinal);
} else if (dataType instanceof MapType) {
return getMap(ordinal);
} else if (dataType instanceof BooleanType) {
return getBoolean(ordinal);
} else if (dataType instanceof ByteType) {
return getByte(ordinal);
} else if (dataType instanceof ShortType) {
return getShort(ordinal);
}
return null;
}
private static void assertEquals(String context, DataType type, Object expected, Object actual) {
if (expected == null && actual == null) {
return;
}
if (type instanceof StructType) {
Assert.assertTrue("Expected should be an InternalRow: " + context,
expected instanceof InternalRow);
Assert.assertTrue("Actual should be an InternalRow: " + context,
actual instanceof InternalRow);
assertEquals(context, (StructType) type, (InternalRow) expected, (InternalRow) actual);
} else if (type instanceof ArrayType) {
Assert.assertTrue("Expected should be an ArrayData: " + context,
expected instanceof ArrayData);
Assert.assertTrue("Actual should be an ArrayData: " + context,
actual instanceof ArrayData);
assertEquals(context, (ArrayType) type, (ArrayData) expected, (ArrayData) actual);
} else if (type instanceof MapType) {
Assert.assertTrue("Expected should be a MapData: " + context,
expected instanceof MapData);
Assert.assertTrue("Actual should be a MapData: " + context,
actual instanceof MapData);
assertEquals(context, (MapType) type, (MapData) expected, (MapData) actual);
} else if (type instanceof BinaryType) {
assertEqualBytes(context, (byte[]) expected, (byte[]) actual);
} else {
Assert.assertEquals("Value should match expected: " + context, expected, actual);
}
}
private static void assertEquals(String context, ArrayType array, ArrayData expected, ArrayData actual) {
Assert.assertEquals("Should have the same number of elements",
expected.numElements(), actual.numElements());
DataType type = array.elementType();
for (int i = 0; i < actual.numElements(); i += 1) {
assertEquals(context + ".element", type, expected.get(i, type), actual.get(i, type));
}
}
static <T> T visit(DataType type, SparkTypeVisitor<T> visitor) {
if (type instanceof StructType) {
StructField[] fields = ((StructType) type).fields();
List<T> fieldResults = Lists.newArrayListWithExpectedSize(fields.length);
for (StructField field : fields) {
fieldResults.add(visitor.field(
field,
visit(field.dataType(), visitor)));
}
return visitor.struct((StructType) type, fieldResults);
} else if (type instanceof MapType) {
return visitor.map((MapType) type,
visit(((MapType) type).keyType(), visitor),
visit(((MapType) type).valueType(), visitor));
} else if (type instanceof ArrayType) {
return visitor.array(
(ArrayType) type,
visit(((ArrayType) type).elementType(), visitor));
} else if (type instanceof UserDefinedType){
throw new UnsupportedOperationException(
"User-defined types are not supported");
} else {
return visitor.atomic(type);
}
}
/**
* Returns the type of a nested field.
*/
DataType getField(DataType dataType, boolean isNullable, String... names) {
StructType schema = dataType instanceof ArrayType
? (StructType) ((ArrayType) dataType).elementType()
: (StructType) dataType;
StructField field = Arrays.stream(schema.fields())
.filter(sf -> sf.name().equalsIgnoreCase(names[0]))
.findFirst()
.get();
DataType child = field.dataType();
// Recurse through children if there are more names.
if (names.length == 1) {
// Check the nullability.
Assert.assertEquals("Unexpected nullability of field " + field.name(),
isNullable,
field.nullable());
return child;
} else {
return getField(child, isNullable, Arrays.copyOfRange(names, 1, names.length));
}
}
@Test
public void codeableConceptToStruct() {
DataType codeableType = getField(conditionSchema, true, "severity");
Assert.assertTrue(codeableType instanceof StructType);
Assert.assertTrue(getField(codeableType, true, "coding") instanceof ArrayType);
Assert.assertTrue(getField(codeableType, true, "text") instanceof StringType);
}
public T list(ArrayType sArray, GroupType array, T element) {
return null;
}
public Row deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)
throws IOException
{
@SuppressWarnings("unchecked")
Map<String, Object> map = MAPPER.readValue(message, Map.class);
String[] names = rowTypeInfo.names();
Object[] values = new Object[names.length];
for (int i = 0; i < names.length; i++) {
String key = names[i];
switch (key) {
case "_topic":
values[i] = topic;
continue;
case "_message":
values[i] = new String(message, UTF_8);
continue;
case "_key":
values[i] = new String(messageKey, UTF_8);
continue;
case "_partition":
values[i] = partition;
continue;
case "_offset":
values[i] = offset;
continue;
}
Object value = map.get(key);
if (value == null) {
continue;
}
DataType type = rowTypeInfo.apply(i).dataType();
if (type instanceof MapType && ((MapType) type).valueType() == DataTypes.StringType) {
scala.collection.mutable.Map convertValue = new scala.collection.mutable.HashMap(); //必须是scala的map
for (Map.Entry entry : ((Map<?, ?>) value).entrySet()) {
convertValue.put(entry.getKey(), entry.getValue() == null ? null : entry.getValue().toString());
}
values[i] = convertValue;
}
else if (value instanceof ArrayType) {
//Class<?> aClass = type.getTypeClass();
//values[i] = MAPPER.convertValue(value, aClass);
//todo: Spark List to Array
values[i] = value;
}
else if (type == DataTypes.LongType) {
values[i] = ((Number) value).longValue();
}
else {
values[i] = value;
}
}
return new GenericRowWithSchema(values, rowTypeInfo);
}
public Row deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)
throws IOException
{
@SuppressWarnings("unchecked")
Map<String, Object> map = MAPPER.readValue(message, Map.class);
String[] names = rowTypeInfo.names();
Object[] values = new Object[names.length];
for (int i = 0; i < names.length; i++) {
String key = names[i];
switch (key) {
case "_topic":
values[i] = topic;
continue;
case "_message":
values[i] = new String(message, UTF_8);
continue;
case "_key":
values[i] = new String(messageKey, UTF_8);
continue;
case "_partition":
values[i] = partition;
continue;
case "_offset":
values[i] = offset;
continue;
}
Object value = map.get(key);
if (value == null) {
continue;
}
DataType type = rowTypeInfo.apply(i).dataType();
if (type instanceof MapType && ((MapType) type).valueType() == DataTypes.StringType) {
scala.collection.mutable.Map convertValue = new scala.collection.mutable.HashMap(); //必须是scala的map
for (Map.Entry entry : ((Map<?, ?>) value).entrySet()) {
convertValue.put(entry.getKey(), entry.getValue() == null ? null : entry.getValue().toString());
}
values[i] = convertValue;
}
else if (value instanceof ArrayType) {
//Class<?> aClass = type.getTypeClass();
//values[i] = MAPPER.convertValue(value, aClass);
//todo: Spark List to Array
values[i] = value;
}
else if (type == DataTypes.LongType) {
values[i] = ((Number) value).longValue();
}
else {
values[i] = value;
}
}
return new GenericRow(values);
}
@Override
protected boolean isMultiValued(DataType schemaType) {
return schemaType instanceof ArrayType;
}
private static Schema typeFor(DataType dataType, boolean isOptional, int recordCount) {
LOG.trace("Converting {} to Avro, optional[{}]", dataType, isOptional);
Schema typeSchema;
SchemaBuilder.BaseTypeBuilder<Schema> typeBuilder = SchemaBuilder.builder();
switch (dataType.typeName()) {
case "binary":
// bytes
typeSchema = typeBuilder.bytesType();
break;
case "boolean":
typeSchema = typeBuilder.booleanType();
break;
case "date":
// int (logical)
typeSchema = LogicalTypes.date().addToSchema(typeBuilder.intType());
break;
case "timestamp":
// long (logical)
typeSchema = LogicalTypes.timestampMillis().addToSchema(typeBuilder.longType());
break;
case "double":
typeSchema = typeBuilder.doubleType();
break;
case "float":
typeSchema = typeBuilder.floatType();
break;
case "integer":
case "byte":
case "short":
typeSchema = typeBuilder.intType();
break;
case "long":
typeSchema = typeBuilder.longType();
break;
case "null":
typeSchema = typeBuilder.nullType();
break;
case "string":
typeSchema = typeBuilder.stringType();
break;
case "array":
ArrayType arrayType = (ArrayType) dataType;
typeSchema = typeBuilder.array().items(typeFor(arrayType.elementType(), arrayType.containsNull(), recordCount));
break;
case "map":
MapType mapType = (MapType) dataType;
// Keys must be strings: mapType.keyType()
typeSchema = typeBuilder.map().values(typeFor(mapType.valueType(), mapType.valueContainsNull(), recordCount));
break;
case "struct":
StructType structType = (StructType) dataType;
// Nested "anonymous" records
typeSchema = schemaFor(structType, null, null, recordCount);
break;
default:
if (dataType.typeName().startsWith("decimal")) {
// byte (logical)
DecimalType decimalType = (DecimalType) dataType;
typeSchema = LogicalTypes.decimal(decimalType.precision(), decimalType.scale()).addToSchema(typeBuilder.bytesType());
} else {
throw new RuntimeException(String.format("DataType[%s] - DataType unrecognized or not yet implemented",
dataType));
}
}
if (isOptional && !typeSchema.getType().equals(NULL)) {
return SchemaBuilder.builder().nullable().type(typeSchema);
}
return typeSchema;
}
@Test (expected = RuntimeException.class)
public void testGetConfigurationDataTypeInvalid() {
ConfigurationDataTypes.getConfigurationDataType(new ArrayType());
}
public ArrayColumnBlock(ColumnVector columnVector, DataType type) {
super(columnVector,type);
arrayType = (ArrayType) type;
columnBlock = BlockFactory.getColumnBlock(columnVector.getChildColumn(0),arrayType.elementType());
}
private HapiContainedToSparkConverter(
Map<String, StructureField<HapiConverter<DataType>>> contained,
DataType dataType) {
super(contained, dataType);
this.containerType = (StructType) ((ArrayType) dataType).elementType();
for (int i = 0; i < containerType.fields().length; i++) {
structTypeHashToIndex.put(containerType.apply(i).dataType().hashCode(), i);
}
}
/**
* Recursively walks the schema to ensure there are no struct fields that are empty.
*/
private void checkNoEmptyStructs(StructType schema, String fieldName) {
Assert.assertNotEquals("Struct field " + fieldName + " is empty",
0,
schema.fields().length);
for (StructField field : schema.fields()) {
if (field.dataType() instanceof StructType) {
checkNoEmptyStructs((StructType) field.dataType(), field.name());
} else if (field.dataType() instanceof ArrayType) {
ArrayType arrayType = (ArrayType) field.dataType();
if (arrayType.elementType() instanceof StructType) {
if (!field.name().equals("contained")) {
checkNoEmptyStructs((StructType) arrayType.elementType(), field.name());
}
}
}
}
}
/**
* Returns a row converter for the given resource type. The resource type can
* either be a relative URL for a base resource (e.g. "Condition" or "Observation"),
* or a URL identifying the structure definition for a given profile, such as
* "http://hl7.org/fhir/us/core/StructureDefinition/us-core-patient".
* <p>
* Resources that would be contained must be statically declared through this method
* via similar URLs.
* </p>
* @param context the FHIR context
* @param resourceTypeUrl the URL of the resource type
* @param containedResourceTypeUrls the list of URLs of contained resource types
* @return an Avro converter instance.
*/
public static SparkRowConverter forResource(FhirContext context,
String resourceTypeUrl,
List<String> containedResourceTypeUrls) {
StructureDefinitions structureDefinitions = StructureDefinitions.create(context);
Map<String, HapiConverter<DataType>> converters = new HashMap<>();
String basePackage;
FhirVersionEnum fhirVersion = context.getVersion().getVersion();
if (FhirVersionEnum.DSTU3.equals(fhirVersion)) {
basePackage = "com.cerner.bunsen.stu3.spark";
} else if (FhirVersionEnum.R4.equals(fhirVersion)) {
basePackage = "com.cerner.bunsen.r4.spark";
} else {
throw new IllegalArgumentException("Unsupported FHIR version " + fhirVersion.toString());
}
DefinitionToSparkVisitor visitor = new DefinitionToSparkVisitor(
structureDefinitions.conversionSupport(), basePackage, converters);
HapiConverter<DataType> converter = structureDefinitions.transform(visitor,
resourceTypeUrl,
containedResourceTypeUrls);
RuntimeResourceDefinition[] resources =
new RuntimeResourceDefinition[1 + containedResourceTypeUrls.size()];
resources[0] = context.getResourceDefinition(converter.getElementType());
for (int i = 0; i < containedResourceTypeUrls.size(); i++) {
StructType parentType = (StructType) converter.getDataType();
ArrayType containerArrayType = (ArrayType) parentType.apply("contained").dataType();
StructType containerType = (StructType) containerArrayType.elementType();
resources[i + 1] = context.getResourceDefinition(containerType.apply(i).name());
}
return new SparkRowConverter(converter, resources);
}