类org.apache.spark.sql.api.java.UDF1源码实例Demo

下面列出了怎么用org.apache.spark.sql.api.java.UDF1的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: iceberg   文件: TestFilteredScan.java
@BeforeClass
public static void startSpark() {
  TestFilteredScan.spark = SparkSession.builder().master("local[2]").getOrCreate();

  // define UDFs used by partition tests
  Transform<Long, Integer> bucket4 = Transforms.bucket(Types.LongType.get(), 4);
  spark.udf().register("bucket4", (UDF1<Long, Integer>) bucket4::apply, IntegerType$.MODULE$);

  Transform<Long, Integer> day = Transforms.day(Types.TimestampType.withZone());
  spark.udf().register("ts_day",
      (UDF1<Timestamp, Integer>) timestamp -> day.apply((Long) fromJavaTimestamp(timestamp)),
      IntegerType$.MODULE$);

  Transform<Long, Integer> hour = Transforms.hour(Types.TimestampType.withZone());
  spark.udf().register("ts_hour",
      (UDF1<Timestamp, Integer>) timestamp -> hour.apply((Long) fromJavaTimestamp(timestamp)),
      IntegerType$.MODULE$);

  spark.udf().register("data_ident", (UDF1<String, String>) data -> data, StringType$.MODULE$);
  spark.udf().register("id_ident", (UDF1<Long, Long>) id -> id, LongType$.MODULE$);
}
 
源代码2 项目: iceberg   文件: TestFilteredScan.java
@BeforeClass
public static void startSpark() {
  TestFilteredScan.spark = SparkSession.builder().master("local[2]").getOrCreate();

  // define UDFs used by partition tests
  Transform<Long, Integer> bucket4 = Transforms.bucket(Types.LongType.get(), 4);
  spark.udf().register("bucket4", (UDF1<Long, Integer>) bucket4::apply, IntegerType$.MODULE$);

  Transform<Long, Integer> day = Transforms.day(Types.TimestampType.withZone());
  spark.udf().register("ts_day",
      (UDF1<Timestamp, Integer>) timestamp -> day.apply((Long) fromJavaTimestamp(timestamp)),
      IntegerType$.MODULE$);

  Transform<Long, Integer> hour = Transforms.hour(Types.TimestampType.withZone());
  spark.udf().register("ts_hour",
      (UDF1<Timestamp, Integer>) timestamp -> hour.apply((Long) fromJavaTimestamp(timestamp)),
      IntegerType$.MODULE$);

  spark.udf().register("data_ident", (UDF1<String, String>) data -> data, StringType$.MODULE$);
  spark.udf().register("id_ident", (UDF1<Long, Long>) id -> id, LongType$.MODULE$);
}
 
源代码3 项目: iceberg   文件: TestFilteredScan.java
@BeforeClass
public static void startSpark() {
  TestFilteredScan.spark = SparkSession.builder().master("local[2]").getOrCreate();

  // define UDFs used by partition tests
  Transform<Long, Integer> bucket4 = Transforms.bucket(Types.LongType.get(), 4);
  spark.udf().register("bucket4", (UDF1<Long, Integer>) bucket4::apply, IntegerType$.MODULE$);

  Transform<Long, Integer> day = Transforms.day(Types.TimestampType.withZone());
  spark.udf().register("ts_day",
      (UDF1<Timestamp, Integer>) timestamp -> day.apply(fromJavaTimestamp(timestamp)),
      IntegerType$.MODULE$);

  Transform<Long, Integer> hour = Transforms.hour(Types.TimestampType.withZone());
  spark.udf().register("ts_hour",
      (UDF1<Timestamp, Integer>) timestamp -> hour.apply(fromJavaTimestamp(timestamp)),
      IntegerType$.MODULE$);

  Transform<CharSequence, CharSequence> trunc1 = Transforms.truncate(Types.StringType.get(), 1);
  spark.udf().register("trunc1",
      (UDF1<CharSequence, CharSequence>) str -> trunc1.apply(str.toString()),
      StringType$.MODULE$);
}
 
源代码4 项目: net.jgp.labs.spark   文件: BasicUdfFromTextFile.java
private void start() {
  SparkSession spark = SparkSession.builder().appName("CSV to Dataset")
      .master("local").getOrCreate();

  // registers a new internal UDF
  spark.udf().register("x2Multiplier", new UDF1<Integer, Integer>() {
    private static final long serialVersionUID = -5372447039252716846L;

    @Override
    public Integer call(Integer x) {
      return x * 2;
    }
  }, DataTypes.IntegerType);

  String filename = "data/tuple-data-file.csv";
  Dataset<Row> df = spark.read().format("csv").option("inferSchema", "true")
      .option("header", "false").load(filename);
  df = df.withColumn("label", df.col("_c0")).drop("_c0");
  df = df.withColumn("value", df.col("_c1")).drop("_c1");
  df = df.withColumn("x2", callUDF("x2Multiplier", df.col("value").cast(
      DataTypes.IntegerType)));
  df.show();
}
 
源代码5 项目: bpmn.ai   文件: SparkRunner.java
private void registerUDFs() {
    // register our own aggregation function
    sparkSession.udf().register("AllButEmptyString", new AllButEmptyStringAggregationFunction());
    sparkSession.udf().register("ProcessState", new ProcessStatesAggregationFunction());
    sparkSession.udf().register("isALong", (UDF1<Object, Boolean>) o -> {
        if(o instanceof Long)
            return true;
        if(o instanceof String && Longs.tryParse((String) o) != null)
            return true;
        return false;
    }, DataTypes.BooleanType);
    sparkSession.udf().register("timestampStringToLong", (UDF1<Object, Long>) o -> {
        if(o instanceof String && Longs.tryParse((String) o) != null) {
            return Longs.tryParse((String) o) / 1000;
        }
        return null;
    }, DataTypes.LongType);
    sparkSession.udf().register("activityBeforeTimestamp", (UDF2<String, String, String>) (s, s2) -> {
        // get broadcast
        Map<String, String> activities = (Map<String, String>) SparkBroadcastHelper.getInstance().getBroadcastVariable(SparkBroadcastHelper.BROADCAST_VARIABLE.PROCESS_INSTANCE_TIMESTAMP_MAP);
        // is pid contained in broadcast?
        if (activities == null || activities.isEmpty()){
            return "Error: Broadcast not found";
        } else {
            if (activities.containsKey(s)) {
                Timestamp tsAct = new Timestamp(Long.parseLong(activities.get(s)));
                if(s2 == null || s2.isEmpty()){
                    return "FALSE";
                }
                Timestamp tsObject = new Timestamp(Long.parseLong(s2));
                if (tsObject.after(tsAct)) {
                    return "FALSE";
                } else {
                    return "TRUE";
                }
            }
        }
        return "FALSE";
    }, DataTypes.StringType);
}
 
源代码6 项目: mmtf-spark   文件: ProteinSequenceEncoder.java
/**
 * One-hot encodes a protein sequence. The one-hot encoding
    * encodes the 20 natural amino acids, plus X for any other 
    * residue for a total of 21 elements per residue.
 * 
 * @return dataset with feature vector appended
 */
public Dataset<Row> oneHotEncode() {
	SparkSession session = data.sparkSession();
	int maxLength = getMaxSequenceLength(data);

	session.udf().register("encoder", new UDF1<String, Vector>() {
		private static final long serialVersionUID = -6095318836772114908L;

		@Override
		public Vector call(String s) throws Exception {
			int len = AMINO_ACIDS21.size();
               double[] values = new double[len * maxLength];
			char[] seq = s.toCharArray();
			for (int i = 0; i < seq.length; i++) {
				int index = AMINO_ACIDS21.indexOf(seq[i]);
				// replace any non-matching code, e.g., U, with X
				if (index == -1) {
					index = AMINO_ACIDS21.indexOf('X');
				}
				values[i * len + index] = 1;
			}

			return Vectors.dense(values);
		}
	}, new VectorUDT());

	// append feature column
	data.createOrReplaceTempView("table");
	data = session.sql("SELECT *, encoder(" 
	+ inputCol + ") AS " 
			+ outputCol + " from table");
	
	return data;
}
 
源代码7 项目: mmtf-spark   文件: ProteinSequenceEncoder.java
/**
 * Encodes a protein sequence by 7 physicochemical
 * properties. 
 * 
 * <p> See:  Meiler, J., Müller, M., Zeidler, A. et al. J Mol Model (2001) 7: 360. doi:
 * <a href="https://link.springer.com/article/10.1007/s008940100038">10.1007/s008940100038</a>
    *
 * @return dataset with feature vector appended
 */
public Dataset<Row> propertyEncode() {
	SparkSession session = data.sparkSession();
    int maxLength = getMaxSequenceLength(data);

	session.udf().register("encoder", new UDF1<String, Vector>(){
		private static final long serialVersionUID = 1L;

		@Override
		public Vector call(String s) throws Exception {
               double[] values = new double[7*maxLength];
			for (int i = 0, k = 0; i < s.length(); i++) {
				double[] property = properties.get(s.charAt(i));
				if (property != null) {
					for (double p: property) {
						values[k++] = p;
					}
				}	
			}
			return Vectors.dense(values);
		}
	}, new VectorUDT());

	// append feature column
			data.createOrReplaceTempView("table");
			data = session.sql("SELECT *, encoder(" 
			+ inputCol + ") AS " 
					+ outputCol + " from table");
			
			return data;
}
 
源代码8 项目: mmtf-spark   文件: ProteinSequenceEncoder.java
/**
 * Encodes a protein sequence by a Blosum62 matrix.
 * 
 * <p> See: <a href="https://ftp.ncbi.nih.gov/repository/blocks/unix/blosum/BLOSUM/blosum62.blast.new">BLOSUM62 Matrix</a>
    *
 * @return dataset with feature vector appended
 */
public Dataset<Row> blosum62Encode() {
	SparkSession session = data.sparkSession();
    int maxLength = getMaxSequenceLength(data);

	session.udf().register("encoder", new UDF1<String, Vector>(){
		private static final long serialVersionUID = 1L;

		@Override
		public Vector call(String s) throws Exception {
			double[] values = new double[20*maxLength];
			for (int i = 0, k = 0; i < s.length(); i++) {
				double[] property = blosum62.get(s.charAt(i));
				if (property != null) {
					for (double p: property) {
						values[k++] = p;
					}
				}	
			}
			return Vectors.dense(values);
		}
	}, new VectorUDT());

	// append feature column
			data.createOrReplaceTempView("table");
			data = session.sql("SELECT *, encoder(" 
			+ inputCol + ") AS " 
					+ outputCol + " from table");
			
			return data;
}
 
源代码9 项目: mmtf-spark   文件: SequenceNgrammer.java
/**
 * Splits a one-letter sequence column (e.g., protein sequence)
 * into array of non-overlapping n-grams. To generate all possible n-grams,
 * this method needs to be called n times with shift parameters {0, ..., n-1}.
 * 
 * <p> Example 3-gram(shift=0) : IDCGHTVEDQR ... => [IDC, GHT, VED, ...]
 * <p> Example 3-gram(shift=1) : IDCGHTVEDQR ... => [DCG, HTV, EDQ, ...]
 * <p> Example 3-gram(shift=2) : IDCGHTVEDQR ... => [CGH, TVE, DQR, ...]
 * 
 * <p>For an application of shifted n-grams see:
 * E Asgari, MRK Mofrad, PLoS One. 2015; 10(11): e0141287, doi: 
 * <a href="https://dx.doi.org/10.1371/journal.pone.0141287">10.1371/journal.pone.0141287</a>
    *
    * @param data input dataset with column "sequence"
    * @param n size of the n-gram
    * @param shift start index for the n-gram
    * @param outputCol name of the output column
    * @return output dataset with appended ngram column
    */
public static Dataset<Row> shiftedNgram(Dataset<Row> data, int n, int shift, String outputCol) {
	SparkSession session = data.sparkSession();

	session.udf().register("encoder", new UDF1<String, String[]>() {
		private static final long serialVersionUID = 4844644794982507954L;

		@Override
		public String[] call(String s) throws Exception {
			if (shift > s.length()) {
				return new String[0];
			}
			s = s.substring(shift);
			int t = s.length() / n;
			
			String[] ngram = new String[t];

			for (int i = 0, j = 0; j < t; i += n) {
				ngram[j++] = s.substring(i, i + n);
			}
			return ngram;
		}
	}, DataTypes.createArrayType(DataTypes.StringType));

	data.createOrReplaceTempView("table");
	
	// append shifted ngram column
	return session.sql("SELECT *, encoder(sequence) AS " + outputCol + " from table");
}
 
源代码10 项目: SparkDemo   文件: JavaTokenizerExample.java
public static void main(String[] args) {
  SparkSession spark = SparkSession
    .builder()
    .appName("JavaTokenizerExample")
    .getOrCreate();

  // $example on$
  List<Row> data = Arrays.asList(
    RowFactory.create(0, "Hi I heard about Spark"),
    RowFactory.create(1, "I wish Java could use case classes"),
    RowFactory.create(2, "Logistic,regression,models,are,neat")
  );

  StructType schema = new StructType(new StructField[]{
    new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
    new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
  });

  Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);

  Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");

  RegexTokenizer regexTokenizer = new RegexTokenizer()
      .setInputCol("sentence")
      .setOutputCol("words")
      .setPattern("\\W");  // alternatively .setPattern("\\w+").setGaps(false);

  spark.udf().register("countTokens", new UDF1<WrappedArray, Integer>() {
    @Override
    public Integer call(WrappedArray words) {
      return words.size();
    }
  }, DataTypes.IntegerType);

  Dataset<Row> tokenized = tokenizer.transform(sentenceDataFrame);
  tokenized.select("sentence", "words")
      .withColumn("tokens", callUDF("countTokens", col("words"))).show(false);

  Dataset<Row> regexTokenized = regexTokenizer.transform(sentenceDataFrame);
  regexTokenized.select("sentence", "words")
      .withColumn("tokens", callUDF("countTokens", col("words"))).show(false);
  // $example off$

  spark.stop();
}
 
 类所在包
 类方法
 同包方法