下面列出了怎么用org.apache.spark.sql.types.DoubleType的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testDataFrameSumDMLDoublesWithNoIDColumn() {
System.out.println("MLContextTest - DataFrame sum DML, doubles with no ID column");
List<String> list = new ArrayList<>();
list.add("10,20,30");
list.add("40,50,60");
list.add("70,80,90");
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow());
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema);
MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_DOUBLES);
Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame, mm);
setExpectedStdOut("sum: 450.0");
ml.execute(script);
}
@Test
public void testDataFrameSumDMLDoublesWithIDColumn() {
System.out.println("MLContextTest - DataFrame sum DML, doubles with ID column");
List<String> list = new ArrayList<>();
list.add("1,1,2,3");
list.add("2,4,5,6");
list.add("3,7,8,9");
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow());
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema);
MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_DOUBLES_WITH_INDEX);
Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame, mm);
setExpectedStdOut("sum: 45.0");
ml.execute(script);
}
@Test
public void testDataFrameSumDMLDoublesWithIDColumnSortCheck() {
System.out.println("MLContextTest - DataFrame sum DML, doubles with ID column sort check");
List<String> list = new ArrayList<>();
list.add("3,7,8,9");
list.add("1,1,2,3");
list.add("2,4,5,6");
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow());
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema);
MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_DOUBLES_WITH_INDEX);
Script script = dml("print('M[1,1]: ' + as.scalar(M[1,1]));").in("M", dataFrame, mm);
setExpectedStdOut("M[1,1]: 1.0");
ml.execute(script);
}
@Test
public void testDataFrameSumDMLVectorWithIDColumn() {
System.out.println("MLContextTest - DataFrame sum DML, vector with ID column");
List<Tuple2<Double, Vector>> list = new ArrayList<>();
list.add(new Tuple2<>(1.0, Vectors.dense(1.0, 2.0, 3.0)));
list.add(new Tuple2<>(2.0, Vectors.dense(4.0, 5.0, 6.0)));
list.add(new Tuple2<>(3.0, Vectors.dense(7.0, 8.0, 9.0)));
JavaRDD<Tuple2<Double, Vector>> javaRddTuple = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddTuple.map(new DoubleVectorRow());
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C1", new VectorUDT(), true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema);
MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_VECTOR_WITH_INDEX);
Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame, mm);
setExpectedStdOut("sum: 45.0");
ml.execute(script);
}
@Test
public void testDataFrameSumDMLMllibVectorWithIDColumn() {
System.out.println("MLContextTest - DataFrame sum DML, mllib vector with ID column");
List<Tuple2<Double, org.apache.spark.mllib.linalg.Vector>> list = new ArrayList<>();
list.add(new Tuple2<>(1.0, org.apache.spark.mllib.linalg.Vectors.dense(1.0, 2.0, 3.0)));
list.add(new Tuple2<>(2.0, org.apache.spark.mllib.linalg.Vectors.dense(4.0, 5.0, 6.0)));
list.add(new Tuple2<>(3.0, org.apache.spark.mllib.linalg.Vectors.dense(7.0, 8.0, 9.0)));
JavaRDD<Tuple2<Double, org.apache.spark.mllib.linalg.Vector>> javaRddTuple = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddTuple.map(new DoubleMllibVectorRow());
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C1", new org.apache.spark.mllib.linalg.VectorUDT(), true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema);
MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_VECTOR_WITH_INDEX);
Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame, mm);
setExpectedStdOut("sum: 45.0");
ml.execute(script);
}
@Test
public void testDataFrameGoodMetadataDML() {
System.out.println("MLContextTest - DataFrame good metadata DML");
List<String> list = new ArrayList<>();
list.add("10,20,30");
list.add("40,50,60");
list.add("70,80,90");
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow());
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema);
MatrixMetadata mm = new MatrixMetadata(3, 3, 9);
Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame, mm);
setExpectedStdOut("sum: 450.0");
ml.execute(script);
}
@Test
public void testDataFrameSumDMLDoublesWithNoIDColumnNoFormatSpecified() {
System.out.println("MLContextTest - DataFrame sum DML, doubles with no ID column, no format specified");
List<String> list = new ArrayList<>();
list.add("2,2,2");
list.add("3,3,3");
list.add("4,4,4");
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow());
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema);
Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame);
setExpectedStdOut("sum: 27.0");
ml.execute(script);
}
@Test
public void testDataFrameSumDMLDoublesWithIDColumnNoFormatSpecified() {
System.out.println("MLContextTest - DataFrame sum DML, doubles with ID column, no format specified");
List<String> list = new ArrayList<>();
list.add("1,2,2,2");
list.add("2,3,3,3");
list.add("3,4,4,4");
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow());
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema);
Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame);
setExpectedStdOut("sum: 27.0");
ml.execute(script);
}
@Test
public void testDataFrameSumDMLVectorWithIDColumnNoFormatSpecified() {
System.out.println("MLContextTest - DataFrame sum DML, vector with ID column, no format specified");
List<Tuple2<Double, Vector>> list = new ArrayList<>();
list.add(new Tuple2<>(1.0, Vectors.dense(1.0, 2.0, 3.0)));
list.add(new Tuple2<>(2.0, Vectors.dense(4.0, 5.0, 6.0)));
list.add(new Tuple2<>(3.0, Vectors.dense(7.0, 8.0, 9.0)));
JavaRDD<Tuple2<Double, Vector>> javaRddTuple = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddTuple.map(new DoubleVectorRow());
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C1", new VectorUDT(), true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema);
Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame);
setExpectedStdOut("sum: 45.0");
ml.execute(script);
}
@Test
public void testDataFrameSumPYDMLVectorWithIDColumnNoFormatSpecified() {
System.out.println("MLContextTest - DataFrame sum PYDML, vector with ID column, no format specified");
List<Tuple2<Double, Vector>> list = new ArrayList<>();
list.add(new Tuple2<>(1.0, Vectors.dense(1.0, 2.0, 3.0)));
list.add(new Tuple2<>(2.0, Vectors.dense(4.0, 5.0, 6.0)));
list.add(new Tuple2<>(3.0, Vectors.dense(7.0, 8.0, 9.0)));
JavaRDD<Tuple2<Double, Vector>> javaRddTuple = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddTuple.map(new DoubleVectorRow());
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C1", new VectorUDT(), true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema);
Script script = dml("print('sum: ' + sum(M))").in("M", dataFrame);
setExpectedStdOut("sum: 45.0");
ml.execute(script);
}
@Test
public void testGetTuple1DML() {
System.out.println("MLContextTest - Get Tuple1<Matrix> DML");
JavaRDD<String> javaRddString = sc
.parallelize(Stream.of("1,2,3", "4,5,6", "7,8,9").collect(Collectors.toList()));
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow());
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> df = spark.createDataFrame(javaRddRow, schema);
Script script = dml("N=M*2").in("M", df).out("N");
Tuple1<Matrix> tuple = ml.execute(script).getTuple("N");
double[][] n = tuple._1().to2DDoubleArray();
Assert.assertEquals(2.0, n[0][0], 0);
Assert.assertEquals(4.0, n[0][1], 0);
Assert.assertEquals(6.0, n[0][2], 0);
Assert.assertEquals(8.0, n[1][0], 0);
Assert.assertEquals(10.0, n[1][1], 0);
Assert.assertEquals(12.0, n[1][2], 0);
Assert.assertEquals(14.0, n[2][0], 0);
Assert.assertEquals(16.0, n[2][1], 0);
Assert.assertEquals(18.0, n[2][2], 0);
}
static
public DataType translateDataType(org.apache.spark.sql.types.DataType sparkDataType){
if(sparkDataType instanceof StringType){
return DataType.STRING;
} else
if(sparkDataType instanceof IntegralType){
return DataType.INTEGER;
} else
if(sparkDataType instanceof DoubleType){
return DataType.DOUBLE;
} else
if(sparkDataType instanceof BooleanType){
return DataType.BOOLEAN;
} else
{
throw new IllegalArgumentException("Expected string, integral, double or boolean data type, got " + sparkDataType.typeName() + " data type");
}
}
public static SegmentSchema sparkSchemaToIndexRSchema(List<StructField> sparkSchema, IsIndexed isIndexed) {
List<ColumnSchema> columns = new ArrayList<>();
for (StructField f : sparkSchema) {
SQLType type;
if (f.dataType() instanceof IntegerType) {
type = SQLType.INT;
} else if (f.dataType() instanceof LongType) {
type = SQLType.BIGINT;
} else if (f.dataType() instanceof FloatType) {
type = SQLType.FLOAT;
} else if (f.dataType() instanceof DoubleType) {
type = SQLType.DOUBLE;
} else if (f.dataType() instanceof StringType) {
type = SQLType.VARCHAR;
} else if (f.dataType() instanceof DateType) {
type = SQLType.DATE;
} else if (f.dataType() instanceof TimestampType) {
type = SQLType.DATETIME;
} else {
throw new IllegalStateException("Unsupported type: " + f.dataType());
}
columns.add(new ColumnSchema(f.name(), type, isIndexed.apply(f.name())));
}
return new SegmentSchema(columns);
}
@Test
public void testDataFrameSumDMLDoublesWithNoIDColumn() {
System.out.println("MLContextTest - DataFrame sum DML, doubles with no ID column");
List<String> list = new ArrayList<>();
list.add("10,20,30");
list.add("40,50,60");
list.add("70,80,90");
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow());
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema);
MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_DOUBLES);
Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame, mm);
setExpectedStdOut("sum: 450.0");
ml.execute(script);
}
@Test
public void testDataFrameSumDMLDoublesWithIDColumn() {
System.out.println("MLContextTest - DataFrame sum DML, doubles with ID column");
List<String> list = new ArrayList<>();
list.add("1,1,2,3");
list.add("2,4,5,6");
list.add("3,7,8,9");
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow());
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema);
MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_DOUBLES_WITH_INDEX);
Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame, mm);
setExpectedStdOut("sum: 45.0");
ml.execute(script);
}
@Test
public void testDataFrameSumDMLDoublesWithIDColumnSortCheck() {
System.out.println("MLContextTest - DataFrame sum DML, doubles with ID column sort check");
List<String> list = new ArrayList<>();
list.add("3,7,8,9");
list.add("1,1,2,3");
list.add("2,4,5,6");
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow());
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema);
MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_DOUBLES_WITH_INDEX);
Script script = dml("print('M[1,1]: ' + as.scalar(M[1,1]));").in("M", dataFrame, mm);
setExpectedStdOut("M[1,1]: 1.0");
ml.execute(script);
}
@Test
public void testDataFrameSumDMLVectorWithIDColumn() {
System.out.println("MLContextTest - DataFrame sum DML, vector with ID column");
List<Tuple2<Double, Vector>> list = new ArrayList<>();
list.add(new Tuple2<>(1.0, Vectors.dense(1.0, 2.0, 3.0)));
list.add(new Tuple2<>(2.0, Vectors.dense(4.0, 5.0, 6.0)));
list.add(new Tuple2<>(3.0, Vectors.dense(7.0, 8.0, 9.0)));
JavaRDD<Tuple2<Double, Vector>> javaRddTuple = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddTuple.map(new DoubleVectorRow());
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C1", new VectorUDT(), true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema);
MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_VECTOR_WITH_INDEX);
Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame, mm);
setExpectedStdOut("sum: 45.0");
ml.execute(script);
}
@Test
public void testDataFrameSumDMLMllibVectorWithIDColumn() {
System.out.println("MLContextTest - DataFrame sum DML, mllib vector with ID column");
List<Tuple2<Double, org.apache.spark.mllib.linalg.Vector>> list = new ArrayList<>();
list.add(new Tuple2<>(1.0, org.apache.spark.mllib.linalg.Vectors.dense(1.0, 2.0, 3.0)));
list.add(new Tuple2<>(2.0, org.apache.spark.mllib.linalg.Vectors.dense(4.0, 5.0, 6.0)));
list.add(new Tuple2<>(3.0, org.apache.spark.mllib.linalg.Vectors.dense(7.0, 8.0, 9.0)));
JavaRDD<Tuple2<Double, org.apache.spark.mllib.linalg.Vector>> javaRddTuple = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddTuple.map(new DoubleMllibVectorRow());
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C1", new org.apache.spark.mllib.linalg.VectorUDT(), true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema);
MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_VECTOR_WITH_INDEX);
Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame, mm);
setExpectedStdOut("sum: 45.0");
ml.execute(script);
}
@Test
public void testDataFrameGoodMetadataDML() {
System.out.println("MLContextTest - DataFrame good metadata DML");
List<String> list = new ArrayList<>();
list.add("10,20,30");
list.add("40,50,60");
list.add("70,80,90");
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow());
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema);
MatrixMetadata mm = new MatrixMetadata(3, 3, 9);
Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame, mm);
setExpectedStdOut("sum: 450.0");
ml.execute(script);
}
@Test
public void testDataFrameSumDMLDoublesWithNoIDColumnNoFormatSpecified() {
System.out.println("MLContextTest - DataFrame sum DML, doubles with no ID column, no format specified");
List<String> list = new ArrayList<>();
list.add("2,2,2");
list.add("3,3,3");
list.add("4,4,4");
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow());
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema);
Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame);
setExpectedStdOut("sum: 27.0");
ml.execute(script);
}
@Test
public void testDataFrameSumDMLDoublesWithIDColumnNoFormatSpecified() {
System.out.println("MLContextTest - DataFrame sum DML, doubles with ID column, no format specified");
List<String> list = new ArrayList<>();
list.add("1,2,2,2");
list.add("2,3,3,3");
list.add("3,4,4,4");
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow());
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema);
Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame);
setExpectedStdOut("sum: 27.0");
ml.execute(script);
}
@Test
public void testDataFrameSumDMLVectorWithIDColumnNoFormatSpecified() {
System.out.println("MLContextTest - DataFrame sum DML, vector with ID column, no format specified");
List<Tuple2<Double, Vector>> list = new ArrayList<>();
list.add(new Tuple2<>(1.0, Vectors.dense(1.0, 2.0, 3.0)));
list.add(new Tuple2<>(2.0, Vectors.dense(4.0, 5.0, 6.0)));
list.add(new Tuple2<>(3.0, Vectors.dense(7.0, 8.0, 9.0)));
JavaRDD<Tuple2<Double, Vector>> javaRddTuple = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddTuple.map(new DoubleVectorRow());
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C1", new VectorUDT(), true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema);
Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame);
setExpectedStdOut("sum: 45.0");
ml.execute(script);
}
@Test
public void testGetTuple1DML() {
System.out.println("MLContextTest - Get Tuple1<Matrix> DML");
JavaRDD<String> javaRddString = sc
.parallelize(Stream.of("1,2,3", "4,5,6", "7,8,9").collect(Collectors.toList()));
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow());
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> df = spark.createDataFrame(javaRddRow, schema);
Script script = dml("N=M*2").in("M", df).out("N");
Tuple1<Matrix> tuple = ml.execute(script).getTuple("N");
double[][] n = tuple._1().to2DDoubleArray();
Assert.assertEquals(2.0, n[0][0], 0);
Assert.assertEquals(4.0, n[0][1], 0);
Assert.assertEquals(6.0, n[0][2], 0);
Assert.assertEquals(8.0, n[1][0], 0);
Assert.assertEquals(10.0, n[1][1], 0);
Assert.assertEquals(12.0, n[1][2], 0);
Assert.assertEquals(14.0, n[2][0], 0);
Assert.assertEquals(16.0, n[2][1], 0);
Assert.assertEquals(18.0, n[2][2], 0);
}
@Test
public void testOutputDataFrameOfVectorsDML() {
System.out.println("MLContextTest - output DataFrame of vectors DML");
String s = "m=matrix('1 2 3 4',rows=2,cols=2);";
Script script = dml(s).out("m");
MLResults results = ml.execute(script);
Dataset<Row> df = results.getDataFrame("m", true);
Dataset<Row> sortedDF = df.sort(RDDConverterUtils.DF_ID_COLUMN);
// verify column types
StructType schema = sortedDF.schema();
StructField[] fields = schema.fields();
StructField idColumn = fields[0];
StructField vectorColumn = fields[1];
Assert.assertTrue(idColumn.dataType() instanceof DoubleType);
Assert.assertTrue(vectorColumn.dataType() instanceof VectorUDT);
List<Row> list = sortedDF.collectAsList();
Row row1 = list.get(0);
Assert.assertEquals(1.0, row1.getDouble(0), 0.0);
Vector v1 = (DenseVector) row1.get(1);
double[] arr1 = v1.toArray();
Assert.assertArrayEquals(new double[] { 1.0, 2.0 }, arr1, 0.0);
Row row2 = list.get(1);
Assert.assertEquals(2.0, row2.getDouble(0), 0.0);
Vector v2 = (DenseVector) row2.get(1);
double[] arr2 = v2.toArray();
Assert.assertArrayEquals(new double[] { 3.0, 4.0 }, arr2, 0.0);
}
@Test
public void testDataFrameToBinaryBlocks() {
System.out.println("MLContextTest - DataFrame to binary blocks");
List<String> list = new ArrayList<>();
list.add("1,2,3");
list.add("4,5,6");
list.add("7,8,9");
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow());
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema);
JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlocks = MLContextConversionUtil
.dataFrameToMatrixBinaryBlocks(dataFrame);
Tuple2<MatrixIndexes, MatrixBlock> first = binaryBlocks.first();
MatrixBlock mb = first._2();
double[][] matrix = DataConverter.convertToDoubleMatrix(mb);
Assert.assertArrayEquals(new double[] { 1.0, 2.0, 3.0 }, matrix[0], 0.0);
Assert.assertArrayEquals(new double[] { 4.0, 5.0, 6.0 }, matrix[1], 0.0);
Assert.assertArrayEquals(new double[] { 7.0, 8.0, 9.0 }, matrix[2], 0.0);
}
@Override
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public Object get(int ordinal, DataType dataType) {
if (dataType instanceof IntegerType) {
return getInt(ordinal);
} else if (dataType instanceof LongType) {
return getLong(ordinal);
} else if (dataType instanceof StringType) {
return getUTF8String(ordinal);
} else if (dataType instanceof FloatType) {
return getFloat(ordinal);
} else if (dataType instanceof DoubleType) {
return getDouble(ordinal);
} else if (dataType instanceof DecimalType) {
DecimalType decimalType = (DecimalType) dataType;
return getDecimal(ordinal, decimalType.precision(), decimalType.scale());
} else if (dataType instanceof BinaryType) {
return getBinary(ordinal);
} else if (dataType instanceof StructType) {
return getStruct(ordinal, ((StructType) dataType).size());
} else if (dataType instanceof ArrayType) {
return getArray(ordinal);
} else if (dataType instanceof MapType) {
return getMap(ordinal);
} else if (dataType instanceof BooleanType) {
return getBoolean(ordinal);
} else if (dataType instanceof ByteType) {
return getByte(ordinal);
} else if (dataType instanceof ShortType) {
return getShort(ordinal);
}
return null;
}
public DataField createDataField(FieldName name){
StructType schema = getSchema();
StructField field = schema.apply(name.getValue());
org.apache.spark.sql.types.DataType sparkDataType = field.dataType();
if(sparkDataType instanceof StringType){
return createDataField(name, OpType.CATEGORICAL, DataType.STRING);
} else
if(sparkDataType instanceof IntegralType){
return createDataField(name, OpType.CONTINUOUS, DataType.INTEGER);
} else
if(sparkDataType instanceof DoubleType){
return createDataField(name, OpType.CONTINUOUS, DataType.DOUBLE);
} else
if(sparkDataType instanceof BooleanType){
return createDataField(name, OpType.CATEGORICAL, DataType.BOOLEAN);
} else
{
throw new IllegalArgumentException("Expected string, integral, double or boolean data type, got " + sparkDataType.typeName() + " data type");
}
}
public static List<StructField> indexrSchemaToSparkSchema(SegmentSchema schema) {
List<StructField> fields = new ArrayList<>();
for (ColumnSchema cs : schema.getColumns()) {
DataType dataType;
switch (cs.getSqlType()) {
case INT:
dataType = DataTypes.IntegerType;
break;
case BIGINT:
dataType = DataTypes.LongType;
break;
case FLOAT:
dataType = DataTypes.FloatType;
break;
case DOUBLE:
dataType = DataTypes.DoubleType;
break;
case VARCHAR:
dataType = DataTypes.StringType;
break;
case DATE:
dataType = DataTypes.DateType;
break;
case DATETIME:
dataType = DataTypes.TimestampType;
break;
default:
throw new IllegalStateException("Unsupported type: " + cs.getSqlType());
}
fields.add(new StructField(cs.getName(), dataType, scala.Boolean.box(false), Metadata.empty()));
}
return fields;
}
@Test
public void testOutputDataFrameOfVectorsDML() {
System.out.println("MLContextTest - output DataFrame of vectors DML");
String s = "m=matrix('1 2 3 4',rows=2,cols=2);";
Script script = dml(s).out("m");
MLResults results = ml.execute(script);
Dataset<Row> df = results.getDataFrame("m", true);
Dataset<Row> sortedDF = df.sort(RDDConverterUtils.DF_ID_COLUMN);
// verify column types
StructType schema = sortedDF.schema();
StructField[] fields = schema.fields();
StructField idColumn = fields[0];
StructField vectorColumn = fields[1];
Assert.assertTrue(idColumn.dataType() instanceof DoubleType);
Assert.assertTrue(vectorColumn.dataType() instanceof VectorUDT);
List<Row> list = sortedDF.collectAsList();
Row row1 = list.get(0);
Assert.assertEquals(1.0, row1.getDouble(0), 0.0);
Vector v1 = (DenseVector) row1.get(1);
double[] arr1 = v1.toArray();
Assert.assertArrayEquals(new double[] { 1.0, 2.0 }, arr1, 0.0);
Row row2 = list.get(1);
Assert.assertEquals(2.0, row2.getDouble(0), 0.0);
Vector v2 = (DenseVector) row2.get(1);
double[] arr2 = v2.toArray();
Assert.assertArrayEquals(new double[] { 3.0, 4.0 }, arr2, 0.0);
}
@Test
public void testDataFrameToBinaryBlocks() {
System.out.println("MLContextTest - DataFrame to binary blocks");
List<String> list = new ArrayList<>();
list.add("1,2,3");
list.add("4,5,6");
list.add("7,8,9");
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow());
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema);
JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlocks = MLContextConversionUtil
.dataFrameToMatrixBinaryBlocks(dataFrame);
Tuple2<MatrixIndexes, MatrixBlock> first = binaryBlocks.first();
MatrixBlock mb = first._2();
double[][] matrix = DataConverter.convertToDoubleMatrix(mb);
Assert.assertArrayEquals(new double[] { 1.0, 2.0, 3.0 }, matrix[0], 0.0);
Assert.assertArrayEquals(new double[] { 4.0, 5.0, 6.0 }, matrix[1], 0.0);
Assert.assertArrayEquals(new double[] { 7.0, 8.0, 9.0 }, matrix[2], 0.0);
}