下面列出了怎么用org.apache.spark.sql.types.Decimal的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void write(int repetitionLevel, Decimal decimal) {
Preconditions.checkArgument(decimal.scale() == scale,
"Cannot write value as decimal(%s,%s), wrong scale: %s", precision, scale, decimal);
Preconditions.checkArgument(decimal.precision() <= precision,
"Cannot write value as decimal(%s,%s), too large: %s", precision, scale, decimal);
BigDecimal bigDecimal = decimal.toJavaBigDecimal();
byte fillByte = (byte) (bigDecimal.signum() < 0 ? 0xFF : 0x00);
byte[] unscaled = bigDecimal.unscaledValue().toByteArray();
byte[] buf = bytes.get();
int offset = length - unscaled.length;
for (int i = 0; i < length; i += 1) {
if (i < offset) {
buf[i] = fillByte;
} else {
buf[i] = unscaled[i - offset];
}
}
column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(buf));
}
@Override
public void write(Decimal d, Encoder encoder) throws IOException {
Preconditions.checkArgument(d.scale() == scale,
"Cannot write value as decimal(%s,%s), wrong scale: %s", precision, scale, d);
Preconditions.checkArgument(d.precision() <= precision,
"Cannot write value as decimal(%s,%s), too large: %s", precision, scale, d);
BigDecimal decimal = d.toJavaBigDecimal();
byte fillByte = (byte) (decimal.signum() < 0 ? 0xFF : 0x00);
byte[] unscaled = decimal.unscaledValue().toByteArray();
byte[] buf = bytes.get();
int offset = length - unscaled.length;
for (int i = 0; i < length; i += 1) {
if (i < offset) {
buf[i] = fillByte;
} else {
buf[i] = unscaled[i - offset];
}
}
encoder.writeFixed(buf);
}
@Override
public void convert(UnsafeRowWriter writer, int column, ColumnVector vector,
int row) {
if (vector.isRepeating) {
row = 0;
}
if (!vector.noNulls && vector.isNull[row]) {
writer.setNullAt(column);
} else {
BigDecimal v = ((DecimalColumnVector) vector).vector[row]
.getHiveDecimal().bigDecimalValue();
writer.write(column,
new Decimal().set(new scala.math.BigDecimal(v), precision, scale),
precision, scale);
}
}
@Override
public void convert(UnsafeArrayWriter writer, int element,
ColumnVector vector, int row) {
if (vector.isRepeating) {
row = 0;
}
if (!vector.noNulls && vector.isNull[row]) {
writer.setNull(element);
} else {
BigDecimal v = ((DecimalColumnVector) vector).vector[row]
.getHiveDecimal().bigDecimalValue();
writer.write(element,
new Decimal().set(new scala.math.BigDecimal(v), precision, scale),
precision, scale);
}
}
@Override
public void write(Decimal d, Encoder encoder) throws IOException {
Preconditions.checkArgument(d.scale() == scale,
"Cannot write value as decimal(%s,%s), wrong scale: %s", precision, scale, d);
Preconditions.checkArgument(d.precision() <= precision,
"Cannot write value as decimal(%s,%s), too large: %s", precision, scale, d);
BigDecimal decimal = d.toJavaBigDecimal();
byte fillByte = (byte) (decimal.signum() < 0 ? 0xFF : 0x00);
byte[] unscaled = decimal.unscaledValue().toByteArray();
byte[] buf = bytes.get();
int offset = length - unscaled.length;
for (int i = 0; i < length; i += 1) {
if (i < offset) {
buf[i] = fillByte;
} else {
buf[i] = unscaled[i - offset];
}
}
encoder.writeFixed(buf);
}
@Override
public BigDecimal getDecimal(int i) {
if (isNullAt(i)) {
return null;
}
DataType dt = structType.fields()[i].dataType();
int precision = ((DecimalType) dt).precision();
int scale = ((DecimalType) dt).scale();
if (DecimalType.isByteArrayDecimalType(dt)) {
byte[] bytes = row.getBinary(i);
BigInteger bigInteger = new BigInteger(bytes);
BigDecimal javaDecimal = new BigDecimal(bigInteger, scale);
return Decimal.apply(javaDecimal, precision, scale).toJavaBigDecimal();
} else {
return Decimal.apply(DecimalType.is32BitDecimalType(dt) ? getInt(i) : getLong(i), precision, scale).toJavaBigDecimal();
}
}
@Override
public Decimal getDecimal(int rowId, int precision, int scale) {
if (isNullAt(rowId)) {
return null;
}
return accessor.getDecimal(rowId, precision, scale);
}
@Override
final Decimal getDecimal(int rowId, int precision, int scale) {
if (isNullAt(rowId)) {
return null;
}
return Decimal.apply(accessor.getObject(rowId), precision, scale);
}
@Override
public void write(int repetitionLevel, Decimal decimal) {
Preconditions.checkArgument(decimal.scale() == scale,
"Cannot write value as decimal(%s,%s), wrong scale: %s", precision, scale, decimal);
Preconditions.checkArgument(decimal.precision() <= precision,
"Cannot write value as decimal(%s,%s), too large: %s", precision, scale, decimal);
column.writeInteger(repetitionLevel, (int) decimal.toUnscaledLong());
}
@Override
public void write(int repetitionLevel, Decimal decimal) {
Preconditions.checkArgument(decimal.scale() == scale,
"Cannot write value as decimal(%s,%s), wrong scale: %s", precision, scale, decimal);
Preconditions.checkArgument(decimal.precision() <= precision,
"Cannot write value as decimal(%s,%s), too large: %s", precision, scale, decimal);
column.writeLong(repetitionLevel, decimal.toUnscaledLong());
}
@Override
public Decimal getDecimal(int rowId, int precision, int scale) {
if (isNullAt(rowId)) {
return null;
}
return accessor.getDecimal(rowId, precision, scale);
}
@Override
final Decimal getDecimal(int rowId, int precision, int scale) {
int dictId = offsetVector.get(rowId);
if (cache[dictId] == null) {
cache[dictId] = Decimal.apply(
new BigInteger(parquetDictionary.decodeToBinary(dictId).getBytes()).longValue(),
precision,
scale);
}
return cache[dictId];
}
@Override
final Decimal getDecimal(int rowId, int precision, int scale) {
int dictId = offsetVector.get(rowId);
if (cache[dictId] == null) {
cache[dictId] = Decimal.apply(parquetDictionary.decodeToLong(dictId), precision, scale);
}
return cache[dictId];
}
@Override
final Decimal getDecimal(int rowId, int precision, int scale) {
int dictId = offsetVector.get(rowId);
if (cache[dictId] == null) {
cache[dictId] = Decimal.apply(parquetDictionary.decodeToInt(dictId), precision, scale);
}
return cache[dictId];
}
private ArrayData collectionToArrayData(Type elementType, Collection<?> values) {
switch (elementType.typeId()) {
case BOOLEAN:
case INTEGER:
case DATE:
case TIME:
case LONG:
case TIMESTAMP:
case FLOAT:
case DOUBLE:
return fillArray(values, array -> (pos, value) -> array[pos] = value);
case STRING:
return fillArray(values, array ->
(BiConsumer<Integer, CharSequence>) (pos, seq) -> array[pos] = UTF8String.fromString(seq.toString()));
case FIXED:
case BINARY:
return fillArray(values, array ->
(BiConsumer<Integer, ByteBuffer>) (pos, buf) -> array[pos] = ByteBuffers.toByteArray(buf));
case DECIMAL:
return fillArray(values, array ->
(BiConsumer<Integer, BigDecimal>) (pos, dec) -> array[pos] = Decimal.apply(dec));
case STRUCT:
return fillArray(values, array -> (BiConsumer<Integer, StructLike>) (pos, tuple) ->
array[pos] = new StructInternalRow(elementType.asStructType(), tuple));
case LIST:
return fillArray(values, array -> (BiConsumer<Integer, Collection<?>>) (pos, list) ->
array[pos] = collectionToArrayData(elementType.asListType(), list));
case MAP:
return fillArray(values, array -> (BiConsumer<Integer, Map<?, ?>>) (pos, map) ->
array[pos] = mapToMapData(elementType.asMapType(), map));
default:
throw new UnsupportedOperationException("Unsupported array element type: " + elementType);
}
}
private static Object convertConstant(Type type, Object value) {
if (value == null) {
return null;
}
switch (type.typeId()) {
case DECIMAL:
return Decimal.apply((BigDecimal) value);
case STRING:
if (value instanceof Utf8) {
Utf8 utf8 = (Utf8) value;
return UTF8String.fromBytes(utf8.getBytes(), 0, utf8.getByteLength());
}
return UTF8String.fromString(value.toString());
case FIXED:
if (value instanceof byte[]) {
return value;
} else if (value instanceof GenericData.Fixed) {
return ((GenericData.Fixed) value).bytes();
}
return ByteBuffers.toByteArray((ByteBuffer) value);
case BINARY:
return ByteBuffers.toByteArray((ByteBuffer) value);
default:
}
return value;
}
@Override
public Object get(InternalRow row) {
if (row.isNullAt(position())) {
return null;
}
return ((Decimal) row.get(position(), type())).toJavaBigDecimal();
}
private static Object valueFromSpark(Literal lit) {
if (lit.value() instanceof UTF8String) {
return lit.value().toString();
} else if (lit.value() instanceof Decimal) {
return ((Decimal) lit.value()).toJavaBigDecimal();
}
return lit.value();
}
@Override
public void convert(UnsafeRowWriter writer, int column, ColumnVector vector,
int row) {
if (vector.isRepeating) {
row = 0;
}
if (!vector.noNulls && vector.isNull[row]) {
writer.setNullAt(column);
} else {
HiveDecimalWritable v = ((DecimalColumnVector) vector).vector[row];
writer.write(column,
new Decimal().set(hack.unscaledLong(v), precision, v.scale()),
precision, scale);
}
}
@Override
public void convert(UnsafeArrayWriter writer, int element,
ColumnVector vector, int row) {
if (vector.isRepeating) {
row = 0;
}
if (!vector.noNulls && vector.isNull[row]) {
writer.setNull(element);
} else {
HiveDecimalWritable v = ((DecimalColumnVector) vector).vector[row];
writer.write(element,
new Decimal().set(hack.unscaledLong(v), precision, v.scale()),
precision, scale);
}
}
static Converter buildConverter(BufferHolder holder, TypeDescription schema) {
switch (schema.getCategory()) {
case BOOLEAN:
return new BooleanConverter();
case BYTE:
return new ByteConverter();
case SHORT:
return new ShortConverter();
case DATE:
case INT:
return new IntConverter();
case LONG:
return new LongConverter();
case FLOAT:
return new FloatConverter();
case DOUBLE:
return new DoubleConverter();
case TIMESTAMP:
return new TimestampConverter();
case DECIMAL:
if (schema.getPrecision() <= Decimal.MAX_LONG_DIGITS()) {
return new Decimal18Converter(schema.getPrecision(), schema.getScale());
} else {
return new Decimal38Converter(schema.getPrecision(), schema.getScale());
}
case BINARY:
case STRING:
case CHAR:
case VARCHAR:
return new BinaryConverter(holder);
case STRUCT:
return new StructConverter(holder, schema);
case LIST:
return new ListConverter(holder, schema);
case MAP:
return new MapConverter(holder, schema);
default:
throw new IllegalArgumentException("Unhandled type " + schema);
}
}
/**
* Converts the objects into instances used by Spark's InternalRow.
*
* @param value a data value
* @param type the Spark data type
* @return the value converted to the representation expected by Spark's InternalRow.
*/
private static Object convert(Object value, DataType type) {
if (type instanceof StringType) {
return UTF8String.fromString(value.toString());
} else if (type instanceof BinaryType) {
ByteBuffer buffer = (ByteBuffer) value;
return buffer.get(new byte[buffer.remaining()]);
} else if (type instanceof DecimalType) {
return Decimal.fromDecimal(value);
}
return value;
}
@Override
public Object get(InternalRow row) {
if (row.isNullAt(p)) {
return null;
}
return ((Decimal) row.get(p, type)).toJavaBigDecimal();
}
Decimal getDecimal(int rowId, int precision, int scale) {
throw new UnsupportedOperationException();
}
private static PrimitiveWriter<Decimal> decimalAsInteger(ColumnDescriptor desc,
int precision, int scale) {
return new IntegerDecimalWriter(desc, precision, scale);
}
private static PrimitiveWriter<Decimal> decimalAsLong(ColumnDescriptor desc,
int precision, int scale) {
return new LongDecimalWriter(desc, precision, scale);
}
private static PrimitiveWriter<Decimal> decimalAsFixed(ColumnDescriptor desc,
int precision, int scale) {
return new FixedDecimalWriter(desc, precision, scale);
}
static ValueWriter<Decimal> decimal(int precision, int scale) {
return new DecimalWriter(precision, scale);
}
@Override
final Decimal getDecimal(int rowId, int precision, int scale) {
return Decimal.apply(vector.getObject(rowId), precision, scale);
}
private DictionaryDecimalAccessor(IntVector vector, Dictionary dictionary) {
super(vector);
this.offsetVector = vector;
this.parquetDictionary = dictionary;
this.cache = new Decimal[dictionary.getMaxId() + 1];
}