类org.apache.spark.sql.api.java.UDF2源码实例Demo

下面列出了怎么用org.apache.spark.sql.api.java.UDF2的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: bpmn.ai   文件: SparkRunner.java
private void registerUDFs() {
    // register our own aggregation function
    sparkSession.udf().register("AllButEmptyString", new AllButEmptyStringAggregationFunction());
    sparkSession.udf().register("ProcessState", new ProcessStatesAggregationFunction());
    sparkSession.udf().register("isALong", (UDF1<Object, Boolean>) o -> {
        if(o instanceof Long)
            return true;
        if(o instanceof String && Longs.tryParse((String) o) != null)
            return true;
        return false;
    }, DataTypes.BooleanType);
    sparkSession.udf().register("timestampStringToLong", (UDF1<Object, Long>) o -> {
        if(o instanceof String && Longs.tryParse((String) o) != null) {
            return Longs.tryParse((String) o) / 1000;
        }
        return null;
    }, DataTypes.LongType);
    sparkSession.udf().register("activityBeforeTimestamp", (UDF2<String, String, String>) (s, s2) -> {
        // get broadcast
        Map<String, String> activities = (Map<String, String>) SparkBroadcastHelper.getInstance().getBroadcastVariable(SparkBroadcastHelper.BROADCAST_VARIABLE.PROCESS_INSTANCE_TIMESTAMP_MAP);
        // is pid contained in broadcast?
        if (activities == null || activities.isEmpty()){
            return "Error: Broadcast not found";
        } else {
            if (activities.containsKey(s)) {
                Timestamp tsAct = new Timestamp(Long.parseLong(activities.get(s)));
                if(s2 == null || s2.isEmpty()){
                    return "FALSE";
                }
                Timestamp tsObject = new Timestamp(Long.parseLong(s2));
                if (tsObject.after(tsAct)) {
                    return "FALSE";
                } else {
                    return "TRUE";
                }
            }
        }
        return "FALSE";
    }, DataTypes.StringType);
}
 
源代码2 项目: GeoTriples   文件: SparkReader.java
/**
 * Read the input GeoJSON files into a Spark Dataset.
 * GeoJSON attributes are located in the column "Properties" and the geometry in the column "Geometry",
 * and hence it expands them. Then convert the GeoJSON Geometry into WKT using a UDF.
 *
 * @return a Spark's Dataset containing the data.
 */
private Dataset<Row> readGeoJSON(){
    Dataset<Row> dataset = spark.read()
            .option("multyLine", true)
            .format("json")
            .json(filenames);

    //Expand the fields
    dataset = dataset.drop("_corrupt_record").filter(dataset.col("geometry").isNotNull());
    StructType schema = dataset.schema();
    StructField[] gj_fields =  schema.fields();
    for (StructField sf : gj_fields){
        DataType dt =  sf.dataType();
        if (dt instanceof  StructType) {
            StructType st = (StructType) dt;
            if (st.fields().length > 0) {
                String column_name = sf.name();
                for (String field : st.fieldNames())
                    dataset = dataset.withColumn(field, functions.explode(functions.array(column_name + "." + field)));
                dataset = dataset.drop(column_name);
            }
        }
    }
    //Convert GeoJSON Geometry into WKT
    UDF2<String, WrappedArray, String> coords2WKT =
            (String type, WrappedArray coords) ->{ return Coordinates2WKT.convert.apply(type, coords); };

    spark.udf().register("coords2WKT", coords2WKT, DataTypes.StringType);
    dataset = dataset.withColumn("geometry",
            functions.callUDF("coords2WKT", dataset.col("type"), dataset.col("coordinates")));
    dataset = dataset.drop(dataset.col("type")).drop(dataset.col("coordinates"));

    return dataset;
}
 
public static void main(String[] args) {
	//Window Specific property if Hadoop is not instaalled or HADOOP_HOME is not set
	 System.setProperty("hadoop.home.dir", "E:\\hadoop");
	
	 //Build a Spark Session	
      SparkSession sparkSession = SparkSession
      .builder()
      .master("local")
	  .config("spark.sql.warehouse.dir","file:///E:/hadoop/warehouse")
      .appName("EdgeBuilder")
      .getOrCreate();
      Logger rootLogger = LogManager.getRootLogger();
	  rootLogger.setLevel(Level.WARN); 
	// Read the CSV data
		 Dataset<Row> emp_ds = sparkSession.read()
				 .format("com.databricks.spark.csv")
   		         .option("header", "true")
   		         .option("inferSchema", "true")
   		         .load("src/main/resources/employee.txt");    
    		
	    UDF2 calcDays=new CalcDaysUDF();
	  //Registering the UDFs in Spark Session created above      
	    sparkSession.udf().register("calcDays", calcDays, DataTypes.LongType);
	    
	    emp_ds.createOrReplaceTempView("emp_ds");
	    
	    emp_ds.printSchema();
	    emp_ds.show();
	    
	    sparkSession.sql("select calcDays(hiredate,'dd-MM-yyyy') from emp_ds").show();   
	    //Instantiate UDAF
	    AverageUDAF calcAvg= new AverageUDAF();
	    //Register UDAF to SparkSession
	    sparkSession.udf().register("calAvg", calcAvg);
	    //Use UDAF
	    sparkSession.sql("select deptno,calAvg(salary) from emp_ds group by deptno ").show(); 
	   
	    //
	    TypeSafeUDAF typeSafeUDAF=new TypeSafeUDAF();
	    
	    Dataset<Employee> emf = emp_ds.as(Encoders.bean(Employee.class));
	    emf.printSchema();
	    emf.show();
	    
	    TypedColumn<Employee, Double> averageSalary = typeSafeUDAF.toColumn().name("averageTypeSafe");
	    Dataset<Double> result = emf.select(averageSalary);
	   result.show();
	    

}
 
 类所在包
 类方法
 同包方法