下面列出了怎么用org.apache.spark.sql.types.DataType的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void nextShortVector(DataType type, int items, ColumnVector columnVector, boolean[] isNull)
throws IOException
{
for (int i = 0, j = 0; i < items; i++) {
while (columnVector.isNullAt(i+j)) {
columnVector.appendNull();
j++;
}
if (isNull[i]) {
columnVector.appendNull();
}
else {
columnVector.appendShort((short)next());
}
}
}
private static DataType convertType(org.apache.kylin.metadata.datatype.DataType type) {
if (type.isTimeFamily())
return DataTypes.TimestampType;
if (type.isDateTimeFamily())
return DataTypes.DateType;
if (type.isIntegerFamily())
return DataTypes.LongType;
if (type.isNumberFamily())
return DataTypes.createDecimalType(19, 4);
if (type.isStringFamily())
return DataTypes.StringType;
if (type.isBoolean())
return DataTypes.BooleanType;
throw new IllegalArgumentException("KAP data type: " + type + " can not be converted to spark's type.");
}
@Test
public void testToRowValueBoolean() {
DataType field = DataTypes.BooleanType;
assertEquals("Invalid Boolean", true, RowUtils.toRowValue(true, field));
assertEquals("Invalid 'true'", true, RowUtils.toRowValue("true", field));
assertEquals("Invalid 'true'", true, RowUtils.toRowValue("TrUe", field));
assertEquals("Invalid 'false'", false, RowUtils.toRowValue("false", field));
assertEquals("Invalid 'false'", false, RowUtils.toRowValue("FaLsE", field));
try {
RowUtils.toRowValue(123, field);
fail("Expected a RuntimeException for invalid type");
} catch (RuntimeException e) {
assertThat(e.getMessage(), CoreMatchers.containsString("Invalid or unrecognized input format"));
}
}
/**
* Examine the DataFrame schema to determine whether the data appears to be
* a matrix.
*
* @param df
* the DataFrame
* @return {@code true} if the DataFrame appears to be a matrix,
* {@code false} otherwise
*/
public static boolean doesDataFrameLookLikeMatrix(Dataset<Row> df) {
StructType schema = df.schema();
StructField[] fields = schema.fields();
if (fields == null) {
return true;
}
for (StructField field : fields) {
DataType dataType = field.dataType();
if ((dataType != DataTypes.DoubleType) && (dataType != DataTypes.IntegerType)
&& (dataType != DataTypes.LongType) && (!(dataType instanceof org.apache.spark.ml.linalg.VectorUDT))
&& (!(dataType instanceof org.apache.spark.mllib.linalg.VectorUDT))) {
// uncomment if we support arrays of doubles for matrices
// if (dataType instanceof ArrayType) {
// ArrayType arrayType = (ArrayType) dataType;
// if (arrayType.elementType() == DataTypes.DoubleType) {
// continue;
// }
// }
return false;
}
}
return true;
}
private DataType getKeyDataType() {
if (Arrays.asList(expectedSchema.fieldNames()).contains("key")) {
DataType keyDataType = expectedSchema.fields()[expectedSchema.fieldIndex("key")].dataType();
if (convertToClass(keyDataType) == null) {
throw new RuntimeException("Translator for filesystem input's input format is not compatible"
+ " because it does not use a supported type for the 'key' field");
}
return keyDataType;
}
else {
// If the translator doesn't want the key field we don't specify a type so as to signal
// that we want the key to be discarded. This is important for the 'input-format' format
// because it is not known at runtime what the data type of an input format key should be.
// We don't do the same for the value field because that is mandatory and so the translator
// would typically be specifying which data type to use.
return null;
}
}
private static <T> T visitArray(DataType type, Schema array, AvroWithSparkSchemaVisitor<T> visitor) {
if (array.getLogicalType() instanceof LogicalMap || type instanceof MapType) {
Preconditions.checkState(
AvroSchemaUtil.isKeyValueSchema(array.getElementType()),
"Cannot visit invalid logical map type: %s", array);
Preconditions.checkArgument(type instanceof MapType, "Invalid map: %s is not a map", type);
MapType map = (MapType) type;
List<Schema.Field> keyValueFields = array.getElementType().getFields();
return visitor.map(map, array,
visit(map.keyType(), keyValueFields.get(0).schema(), visitor),
visit(map.valueType(), keyValueFields.get(1).schema(), visitor));
} else {
Preconditions.checkArgument(type instanceof ArrayType, "Invalid array: %s is not an array", type);
ArrayType list = (ArrayType) type;
return visitor.array(list, array, visit(list.elementType(), array.getElementType(), visitor));
}
}
@Override
public void nextShortVector(DataType type, int items, ColumnVector columnVector, boolean[] isNull)
throws IOException {
for (int i = 0, j = 0; i < items; i++) {
while (columnVector.isNullAt(i+j)) {
columnVector.appendNull();
j++;
}
if (isNull[i]) {
columnVector.appendNull();
}
else {
columnVector.appendShort((short)next());
}
}
}
@Test
public void convertToRowValidNullValue(
final @Mocked RowUtils utils
) throws Exception {
Record record = new Record();
record.put("field1", null);
StructType schema = DataTypes.createStructType(Lists.newArrayList(
DataTypes.createStructField("field1", DataTypes.StringType, true))
);
assertEquals("Invalid conversion", null, MorphlineUtils.convertToRow(schema, record).get(0));
new Verifications() {{
RowUtils.toRowValue(any, (DataType) any); times = 0;
}};
}
@Override
public void nextIntVector(DataType type, int items, ColumnVector columnVector, boolean[] isNull)
throws IOException {
for (int i = 0, j = 0; i < items; i++) {
while (columnVector.isNullAt(i+j)) {
columnVector.appendNull();
j++;
}
if (isNull[i]) {
columnVector.appendNull();
}
else {
columnVector.appendInt((int)next());
}
}
}
@Override
public Iterable<Row> translate(Row message) {
String value = message.getAs(Translator.VALUE_FIELD_NAME);
String[] stringValues = value.split((delimiterRegex) ?
delimiter : Pattern.quote(delimiter), schema.length());
values.clear();
for (int valuePos = 0; valuePos < schema.length(); valuePos++) {
Object rowVal = null;
if (valuePos < stringValues.length) {
String fieldValue = stringValues[valuePos];
DataType fieldType = schema.fields()[valuePos].dataType();
if (fieldValue.length() > 0) {
rowVal = RowUtils.toRowValue(fieldValue, fieldType, rowValueMetadata);
}
}
values.add(rowVal);
}
Row row = new RowWithSchema(schema, values.toArray());
return Collections.singleton(row);
}
private DataType getValueDataType() {
DataType valueDataType;
if (Arrays.asList(expectedSchema.fieldNames()).contains(Translator.VALUE_FIELD_NAME)) {
valueDataType = expectedSchema.fields()[expectedSchema.fieldIndex(Translator.VALUE_FIELD_NAME)].dataType();
}
else {
// In the rare situation that the translator does not expect a specific data type for the
// value column then we are in a tricky situation because there is no way to inspect the
// input format class to see what data types it will return. Instead, we compromise and assume
// it is text, since it seems reasonable that a schemaless translator would probably be used
// with the input-format format so that raw lines of text from the file could be retrieved.
valueDataType = DataTypes.StringType;
}
if (convertToClass(valueDataType) == null) {
throw new RuntimeException("Translator for filesystem input's input format is not compatible"
+ " because it does not use a supported type for the '" + Translator.VALUE_FIELD_NAME + "' field");
}
return valueDataType;
}
@Override
public Column getColumnExpression(Dataset<Row> leftDF,
Dataset<Row> rightDF,
Function<String, DataType> convertStringToDataTypeFunction) throws UnsupportedOperationException {
Column leftExpr = getLeftChild().getColumnExpression(leftDF, rightDF, convertStringToDataTypeFunction);
Column rightExpr = getRightChild().getColumnExpression(leftDF, rightDF, convertStringToDataTypeFunction);
if (opKind == PLUS)
return leftExpr.plus(rightExpr);
else if (opKind == MINUS)
return leftExpr.minus(rightExpr);
else if (opKind == TIMES)
return leftExpr.multiply(rightExpr);
else if (opKind == DIVIDE)
return leftExpr.divide(rightExpr);
else
throw new UnsupportedOperationException();
}
@Test
public void convertToRowMissingColumnNotNullable(
final @Mocked RowUtils utils
) throws Exception {
Record record = new Record();
record.put("foo", "one");
StructType schema = DataTypes.createStructType(Lists.newArrayList(
DataTypes.createStructField("field1", DataTypes.StringType, false))
);
try {
MorphlineUtils.convertToRow(schema, record);
fail("Did not throw a RuntimeException");
} catch (Exception e) {
assertThat(e.getMessage(), CoreMatchers.containsString("DataType cannot contain 'null'"));
}
new Verifications() {{
RowUtils.toRowValue(any, (DataType) any); times = 0;
}};
}
@Override
public Validations getValidations() {
return Validations.builder()
.mandatoryPath(KVP_DELIMITER_CONFIG_NAME, ConfigValueType.STRING)
.mandatoryPath(FIELD_DELIMITER_CONFIG_NAME, ConfigValueType.STRING)
.mandatoryPath(SCHEMA_CONFIG, ConfigValueType.OBJECT)
.add(new SupportedFieldTypesValidation(SCHEMA_CONFIG,
new HashSet<DataType>(Arrays.asList(new DecimalType(), DataTypes.StringType,
DataTypes.FloatType, DataTypes.DoubleType,
DataTypes.ShortType, DataTypes.IntegerType,
DataTypes.LongType, DataTypes.BooleanType,
DataTypes.BinaryType, DataTypes.DateType,
DataTypes.TimestampType))))
.optionalPath(TIMESTAMP_FORMAT_CONFIG_NAME, ConfigValueType.LIST)
.handlesOwnValidationPath(SCHEMA_CONFIG)
.build();
}
@Override
public void nextLongVector(DataType type, int items, ColumnVector columnVector, boolean[] isNull)
throws IOException
{
for (int i = 0, j = 0; i < items; i++) {
while (columnVector.isNullAt(i+j)) {
columnVector.appendNull();
j++;
}
if (isNull[i]) {
columnVector.appendNull();
}
else {
columnVector.appendLong(next());
}
}
}
@Test
public void testToRowValueMapNested() {
DataType field = DataTypes.createMapType(DataTypes.StringType,
DataTypes.createMapType(DataTypes.LongType, DataTypes.IntegerType, true)
);
Map<Object, Object> expectedInnerMap = Maps.newHashMap();
expectedInnerMap.put(9L, 1);
expectedInnerMap.put(8L, 2);
Map<Object, Object> expectedOuterMap = Maps.newHashMap();
expectedOuterMap.put("outer", expectedInnerMap);
Map<Object, Object> innerMap = Maps.newHashMap();
innerMap.put(9L, 1);
innerMap.put(8L, 2);
Map<Object, Object> outerMap = Maps.newHashMap();
outerMap.put("outer", innerMap);
assertEquals("Invalid list of values", expectedOuterMap, RowUtils.toRowValue(outerMap, field));
}
/**
* Construct a {@code Dataset} schema from a {@code Descriptor}
* <p>
* This iterates and recurses through a {@link com.google.protobuf.Descriptors.Descriptor} and produces a
* {@link StructType} for {@link org.apache.spark.sql.Dataset<Row>}.
* Protobuf {@code oneof} fields are flattened into discrete {@link StructField} instances.
* <p>
* This will pass the value of {@link Descriptors.FieldDescriptor#isRequired()} to the associated {@link StructField}.
*
* @param descriptor the Descriptor to convert
* @return the converted StructType
*/
public static StructType buildSchema(Descriptors.Descriptor descriptor) {
List<StructField> members = new ArrayList<>();
List<Descriptors.FieldDescriptor> protoFields = descriptor.getFields();
for (Descriptors.FieldDescriptor fieldDescriptor : protoFields) {
DataType fieldType = convertType(fieldDescriptor);
StructField structField = DataTypes.createStructField(fieldDescriptor.getName(), fieldType,
!fieldDescriptor.isRequired());
members.add(structField);
LOG.debug("FieldDescriptor[{}] => StructField[{}] ", fieldDescriptor.getFullName(), structField);
}
if (members.isEmpty()) {
throw new RuntimeException("No FieldDescriptors found");
}
return DataTypes.createStructType(members.toArray(new StructField[0]));
}
/**
* Examine the DataFrame schema to determine whether the data appears to be
* a matrix.
*
* @param df
* the DataFrame
* @return {@code true} if the DataFrame appears to be a matrix,
* {@code false} otherwise
*/
public static boolean doesDataFrameLookLikeMatrix(Dataset<Row> df) {
StructType schema = df.schema();
StructField[] fields = schema.fields();
if (fields == null) {
return true;
}
for (StructField field : fields) {
DataType dataType = field.dataType();
if ((dataType != DataTypes.DoubleType) && (dataType != DataTypes.IntegerType)
&& (dataType != DataTypes.LongType) && (!(dataType instanceof org.apache.spark.ml.linalg.VectorUDT))
&& (!(dataType instanceof org.apache.spark.mllib.linalg.VectorUDT))) {
// uncomment if we support arrays of doubles for matrices
// if (dataType instanceof ArrayType) {
// ArrayType arrayType = (ArrayType) dataType;
// if (arrayType.elementType() == DataTypes.DoubleType) {
// continue;
// }
// }
return false;
}
}
return true;
}
/**
* Sets the vector element to true if the bit is set, skipping the null values.
*/
public void getSetBits(DataType type, int batchSize, ColumnVector columnVector, boolean[] isNull)
throws IOException {
for (int i = 0, j = 0; i < batchSize; i++) {
while (columnVector.isNullAt(i+j)) {
columnVector.appendNull();
j++;
}
if (isNull[i]) {
columnVector.appendNull();
}
else {
// read more data if necessary
if (bitsInData == 0) {
readByte();
}
// read bit
columnVector.appendBoolean((data & HIGH_BIT_MASK) != 0);
// mark bit consumed
data <<= 1;
bitsInData--;
}
}
}
PartitionRowConverter(Schema partitionSchema, PartitionSpec spec) {
StructType partitionType = SparkSchemaUtil.convert(partitionSchema);
StructField[] fields = partitionType.fields();
this.types = new DataType[fields.length];
this.positions = new int[types.length];
this.javaTypes = new Class<?>[types.length];
this.reusedRow = new GenericInternalRow(types.length);
List<PartitionField> partitionFields = spec.fields();
for (int rowIndex = 0; rowIndex < fields.length; rowIndex += 1) {
this.types[rowIndex] = fields[rowIndex].dataType();
int sourceId = partitionSchema.columns().get(rowIndex).fieldId();
for (int specIndex = 0; specIndex < partitionFields.size(); specIndex += 1) {
PartitionField field = spec.fields().get(specIndex);
if (field.sourceId() == sourceId && "identity".equals(field.transform().toString())) {
positions[rowIndex] = specIndex;
javaTypes[rowIndex] = spec.javaClasses()[specIndex];
break;
}
}
}
}
/**
* Sets the vector element to true if the bit is set.
*/
public void getSetBits(DataType type, int batchSize, ColumnVector columnVector)
throws IOException {
for (int i = 0, j = 0; i < batchSize; i++) {
while (columnVector.isNullAt(i+j)) {
columnVector.appendNull();
j++;
}
// read more data if necessary
if (bitsInData == 0) {
readByte();
}
// read bit
columnVector.appendBoolean((data & HIGH_BIT_MASK) != 0);
// mark bit consumed
data <<= 1;
bitsInData--;
}
}
private DataType getDataType(int type) {
switch (type) {
case LiteralType.BOOLEAN:
return DataTypes.BooleanType;
case LiteralType.STRING:
return DataTypes.StringType;
case LiteralType.FLOAT:
return DataTypes.FloatType;
case LiteralType.DOUBLE:
return DataTypes.DoubleType;
case LiteralType.INTEGER:
return DataTypes.IntegerType;
case LiteralType.LONG:
return DataTypes.LongType;
case LiteralType.DATETIME:
// datetime not supported due to timezone issues with java.sql.Timestamp
// check the InstanceAggregator for more info
return DataTypes.StringType;
}
throw new NotImplementedException("Not able to write literal type " + type);
}
@Test
public void testToRowValueMapRowNested(
final @Mocked Row inputRow,
final @Mocked StructType innerSchema,
final @Mocked StructType outerSchema
) {
DataType field = DataTypes.createMapType(DataTypes.StringType,
DataTypes.createMapType(DataTypes.StringType, DataTypes.IntegerType, true)
);
Map<Object, Object> expectedInnerMap = Maps.newHashMap();
expectedInnerMap.put("field1", 1);
expectedInnerMap.put("field2", 2);
Map<Object, Object> expectedOuterMap = Maps.newHashMap();
expectedOuterMap.put("outer", expectedInnerMap);
new Expectations() {{
inputRow.schema(); returns(outerSchema, innerSchema);
outerSchema.fieldNames(); result = new String[] {"outer"};
innerSchema.fieldNames(); result = new String[] {"field1", "field2"};
inputRow.get(0); returns(inputRow, 1);
inputRow.get(1); result = 2;
}};
assertEquals("Invalid list of values", expectedOuterMap, RowUtils.toRowValue(inputRow, field));
}
private static void readDictionary(
@Nullable ByteArrayStream dictionaryDataStream,
int dictionarySize,
int[] dictionaryLength,
int dictionaryOutputOffset,
Slice[] dictionary,
DataType type)
throws IOException
{
// build dictionary slices
for (int i = 0; i < dictionarySize; i++) {
int length = dictionaryLength[i];
if (length == 0) {
dictionary[dictionaryOutputOffset + i] = Slices.EMPTY_SLICE;
}
else {
Slice value = Slices.wrappedBuffer(dictionaryDataStream.next(length));
/* DO WE NEED THIS?
if (isVarcharType(type)) {
value = truncateToLength(value, type);
}
if (isCharType(type)) {
value = trimSpacesAndTruncateToLength(value, type);
}
*/
// System.out.println(String.format("Reading Dictionary (%s,%s)",dictionaryOutputOffset + i,value.toStringUtf8()));
dictionary[dictionaryOutputOffset + i] = value;
}
}
}
/**
* This function will convert Frame schema into DataFrame schema
*
* @param fschema frame schema
* @param containsID true if contains ID column
* @return Spark StructType of StructFields representing schema
*/
public static StructType convertFrameSchemaToDFSchema(ValueType[] fschema, boolean containsID)
{
// generate the schema based on the string of schema
List<StructField> fields = new ArrayList<>();
// add id column type
if( containsID )
fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN,
DataTypes.DoubleType, true));
// add remaining types
int col = 1;
for (ValueType schema : fschema) {
DataType dt = null;
switch(schema) {
case STRING: dt = DataTypes.StringType; break;
case FP64: dt = DataTypes.DoubleType; break;
case INT64: dt = DataTypes.LongType; break;
case BOOLEAN: dt = DataTypes.BooleanType; break;
default: dt = DataTypes.StringType;
LOG.warn("Using default type String for " + schema.toString());
}
fields.add(DataTypes.createStructField("C"+col++, dt, true));
}
return DataTypes.createStructType(fields);
}
@Test
public void testToRowValueBinary() {
DataType field = DataTypes.BinaryType;
byte[] byteArray = "Test".getBytes();
ByteBuffer byteBuffer = ByteBuffer.wrap(byteArray);
assertEquals("Invalid byte[]", byteArray, RowUtils.toRowValue(byteArray, field));
assertEquals("Invalid ByteBuffer", byteArray, RowUtils.toRowValue(byteBuffer, field));
thrown.expect(RuntimeException.class);
RowUtils.toRowValue(123, field);
}
@Override
public void nextIntVector(DataType type, int items, ColumnVector columnVector, boolean[] isNull)
throws IOException
{
for (int i = 0; i < items; i++) {
if (isNull[i]) {
columnVector.appendNull();
}
else {
columnVector.appendInt((int)next());
}
}
}
@Override
public HapiConverter<DataType> visitLeafExtension(String elementName,
String extensionUri,
HapiConverter elementConverter) {
return new LeafExtensionConverter<>(extensionUri, elementConverter);
}
public static Row append(Row row, String fieldName, DataType fieldType, boolean nullable, Object value) {
StructType appendedSchema = row.schema().add(fieldName, fieldType, nullable);
Object[] appendedValues = ObjectArrays.concat(valuesFor(row), value);
Row appendedRow = new RowWithSchema(appendedSchema, appendedValues);
return appendedRow;
}
@Override
public Column getColumnExpression(Dataset<Row> leftDF,
Dataset<Row> rightDF,
Function<String, DataType> convertStringToDataTypeFunction) throws UnsupportedOperationException {
Dataset<Row> df = leftDataFrame ? leftDF : rightDF;
return df.col(columnName);
}