下面列出了怎么用org.apache.spark.sql.api.java.UDF2的API类实例代码及写法,或者点击链接到github查看源代码。
private void registerUDFs() {
// register our own aggregation function
sparkSession.udf().register("AllButEmptyString", new AllButEmptyStringAggregationFunction());
sparkSession.udf().register("ProcessState", new ProcessStatesAggregationFunction());
sparkSession.udf().register("isALong", (UDF1<Object, Boolean>) o -> {
if(o instanceof Long)
return true;
if(o instanceof String && Longs.tryParse((String) o) != null)
return true;
return false;
}, DataTypes.BooleanType);
sparkSession.udf().register("timestampStringToLong", (UDF1<Object, Long>) o -> {
if(o instanceof String && Longs.tryParse((String) o) != null) {
return Longs.tryParse((String) o) / 1000;
}
return null;
}, DataTypes.LongType);
sparkSession.udf().register("activityBeforeTimestamp", (UDF2<String, String, String>) (s, s2) -> {
// get broadcast
Map<String, String> activities = (Map<String, String>) SparkBroadcastHelper.getInstance().getBroadcastVariable(SparkBroadcastHelper.BROADCAST_VARIABLE.PROCESS_INSTANCE_TIMESTAMP_MAP);
// is pid contained in broadcast?
if (activities == null || activities.isEmpty()){
return "Error: Broadcast not found";
} else {
if (activities.containsKey(s)) {
Timestamp tsAct = new Timestamp(Long.parseLong(activities.get(s)));
if(s2 == null || s2.isEmpty()){
return "FALSE";
}
Timestamp tsObject = new Timestamp(Long.parseLong(s2));
if (tsObject.after(tsAct)) {
return "FALSE";
} else {
return "TRUE";
}
}
}
return "FALSE";
}, DataTypes.StringType);
}
/**
* Read the input GeoJSON files into a Spark Dataset.
* GeoJSON attributes are located in the column "Properties" and the geometry in the column "Geometry",
* and hence it expands them. Then convert the GeoJSON Geometry into WKT using a UDF.
*
* @return a Spark's Dataset containing the data.
*/
private Dataset<Row> readGeoJSON(){
Dataset<Row> dataset = spark.read()
.option("multyLine", true)
.format("json")
.json(filenames);
//Expand the fields
dataset = dataset.drop("_corrupt_record").filter(dataset.col("geometry").isNotNull());
StructType schema = dataset.schema();
StructField[] gj_fields = schema.fields();
for (StructField sf : gj_fields){
DataType dt = sf.dataType();
if (dt instanceof StructType) {
StructType st = (StructType) dt;
if (st.fields().length > 0) {
String column_name = sf.name();
for (String field : st.fieldNames())
dataset = dataset.withColumn(field, functions.explode(functions.array(column_name + "." + field)));
dataset = dataset.drop(column_name);
}
}
}
//Convert GeoJSON Geometry into WKT
UDF2<String, WrappedArray, String> coords2WKT =
(String type, WrappedArray coords) ->{ return Coordinates2WKT.convert.apply(type, coords); };
spark.udf().register("coords2WKT", coords2WKT, DataTypes.StringType);
dataset = dataset.withColumn("geometry",
functions.callUDF("coords2WKT", dataset.col("type"), dataset.col("coordinates")));
dataset = dataset.drop(dataset.col("type")).drop(dataset.col("coordinates"));
return dataset;
}
public static void main(String[] args) {
//Window Specific property if Hadoop is not instaalled or HADOOP_HOME is not set
System.setProperty("hadoop.home.dir", "E:\\hadoop");
//Build a Spark Session
SparkSession sparkSession = SparkSession
.builder()
.master("local")
.config("spark.sql.warehouse.dir","file:///E:/hadoop/warehouse")
.appName("EdgeBuilder")
.getOrCreate();
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.WARN);
// Read the CSV data
Dataset<Row> emp_ds = sparkSession.read()
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load("src/main/resources/employee.txt");
UDF2 calcDays=new CalcDaysUDF();
//Registering the UDFs in Spark Session created above
sparkSession.udf().register("calcDays", calcDays, DataTypes.LongType);
emp_ds.createOrReplaceTempView("emp_ds");
emp_ds.printSchema();
emp_ds.show();
sparkSession.sql("select calcDays(hiredate,'dd-MM-yyyy') from emp_ds").show();
//Instantiate UDAF
AverageUDAF calcAvg= new AverageUDAF();
//Register UDAF to SparkSession
sparkSession.udf().register("calAvg", calcAvg);
//Use UDAF
sparkSession.sql("select deptno,calAvg(salary) from emp_ds group by deptno ").show();
//
TypeSafeUDAF typeSafeUDAF=new TypeSafeUDAF();
Dataset<Employee> emf = emp_ds.as(Encoders.bean(Employee.class));
emf.printSchema();
emf.show();
TypedColumn<Employee, Double> averageSalary = typeSafeUDAF.toColumn().name("averageTypeSafe");
Dataset<Double> result = emf.select(averageSalary);
result.show();
}