下面列出了怎么用org.apache.spark.sql.api.java.UDF1的API类实例代码及写法,或者点击链接到github查看源代码。
@BeforeClass
public static void startSpark() {
TestFilteredScan.spark = SparkSession.builder().master("local[2]").getOrCreate();
// define UDFs used by partition tests
Transform<Long, Integer> bucket4 = Transforms.bucket(Types.LongType.get(), 4);
spark.udf().register("bucket4", (UDF1<Long, Integer>) bucket4::apply, IntegerType$.MODULE$);
Transform<Long, Integer> day = Transforms.day(Types.TimestampType.withZone());
spark.udf().register("ts_day",
(UDF1<Timestamp, Integer>) timestamp -> day.apply((Long) fromJavaTimestamp(timestamp)),
IntegerType$.MODULE$);
Transform<Long, Integer> hour = Transforms.hour(Types.TimestampType.withZone());
spark.udf().register("ts_hour",
(UDF1<Timestamp, Integer>) timestamp -> hour.apply((Long) fromJavaTimestamp(timestamp)),
IntegerType$.MODULE$);
spark.udf().register("data_ident", (UDF1<String, String>) data -> data, StringType$.MODULE$);
spark.udf().register("id_ident", (UDF1<Long, Long>) id -> id, LongType$.MODULE$);
}
@BeforeClass
public static void startSpark() {
TestFilteredScan.spark = SparkSession.builder().master("local[2]").getOrCreate();
// define UDFs used by partition tests
Transform<Long, Integer> bucket4 = Transforms.bucket(Types.LongType.get(), 4);
spark.udf().register("bucket4", (UDF1<Long, Integer>) bucket4::apply, IntegerType$.MODULE$);
Transform<Long, Integer> day = Transforms.day(Types.TimestampType.withZone());
spark.udf().register("ts_day",
(UDF1<Timestamp, Integer>) timestamp -> day.apply((Long) fromJavaTimestamp(timestamp)),
IntegerType$.MODULE$);
Transform<Long, Integer> hour = Transforms.hour(Types.TimestampType.withZone());
spark.udf().register("ts_hour",
(UDF1<Timestamp, Integer>) timestamp -> hour.apply((Long) fromJavaTimestamp(timestamp)),
IntegerType$.MODULE$);
spark.udf().register("data_ident", (UDF1<String, String>) data -> data, StringType$.MODULE$);
spark.udf().register("id_ident", (UDF1<Long, Long>) id -> id, LongType$.MODULE$);
}
@BeforeClass
public static void startSpark() {
TestFilteredScan.spark = SparkSession.builder().master("local[2]").getOrCreate();
// define UDFs used by partition tests
Transform<Long, Integer> bucket4 = Transforms.bucket(Types.LongType.get(), 4);
spark.udf().register("bucket4", (UDF1<Long, Integer>) bucket4::apply, IntegerType$.MODULE$);
Transform<Long, Integer> day = Transforms.day(Types.TimestampType.withZone());
spark.udf().register("ts_day",
(UDF1<Timestamp, Integer>) timestamp -> day.apply(fromJavaTimestamp(timestamp)),
IntegerType$.MODULE$);
Transform<Long, Integer> hour = Transforms.hour(Types.TimestampType.withZone());
spark.udf().register("ts_hour",
(UDF1<Timestamp, Integer>) timestamp -> hour.apply(fromJavaTimestamp(timestamp)),
IntegerType$.MODULE$);
Transform<CharSequence, CharSequence> trunc1 = Transforms.truncate(Types.StringType.get(), 1);
spark.udf().register("trunc1",
(UDF1<CharSequence, CharSequence>) str -> trunc1.apply(str.toString()),
StringType$.MODULE$);
}
private void start() {
SparkSession spark = SparkSession.builder().appName("CSV to Dataset")
.master("local").getOrCreate();
// registers a new internal UDF
spark.udf().register("x2Multiplier", new UDF1<Integer, Integer>() {
private static final long serialVersionUID = -5372447039252716846L;
@Override
public Integer call(Integer x) {
return x * 2;
}
}, DataTypes.IntegerType);
String filename = "data/tuple-data-file.csv";
Dataset<Row> df = spark.read().format("csv").option("inferSchema", "true")
.option("header", "false").load(filename);
df = df.withColumn("label", df.col("_c0")).drop("_c0");
df = df.withColumn("value", df.col("_c1")).drop("_c1");
df = df.withColumn("x2", callUDF("x2Multiplier", df.col("value").cast(
DataTypes.IntegerType)));
df.show();
}
private void registerUDFs() {
// register our own aggregation function
sparkSession.udf().register("AllButEmptyString", new AllButEmptyStringAggregationFunction());
sparkSession.udf().register("ProcessState", new ProcessStatesAggregationFunction());
sparkSession.udf().register("isALong", (UDF1<Object, Boolean>) o -> {
if(o instanceof Long)
return true;
if(o instanceof String && Longs.tryParse((String) o) != null)
return true;
return false;
}, DataTypes.BooleanType);
sparkSession.udf().register("timestampStringToLong", (UDF1<Object, Long>) o -> {
if(o instanceof String && Longs.tryParse((String) o) != null) {
return Longs.tryParse((String) o) / 1000;
}
return null;
}, DataTypes.LongType);
sparkSession.udf().register("activityBeforeTimestamp", (UDF2<String, String, String>) (s, s2) -> {
// get broadcast
Map<String, String> activities = (Map<String, String>) SparkBroadcastHelper.getInstance().getBroadcastVariable(SparkBroadcastHelper.BROADCAST_VARIABLE.PROCESS_INSTANCE_TIMESTAMP_MAP);
// is pid contained in broadcast?
if (activities == null || activities.isEmpty()){
return "Error: Broadcast not found";
} else {
if (activities.containsKey(s)) {
Timestamp tsAct = new Timestamp(Long.parseLong(activities.get(s)));
if(s2 == null || s2.isEmpty()){
return "FALSE";
}
Timestamp tsObject = new Timestamp(Long.parseLong(s2));
if (tsObject.after(tsAct)) {
return "FALSE";
} else {
return "TRUE";
}
}
}
return "FALSE";
}, DataTypes.StringType);
}
/**
* One-hot encodes a protein sequence. The one-hot encoding
* encodes the 20 natural amino acids, plus X for any other
* residue for a total of 21 elements per residue.
*
* @return dataset with feature vector appended
*/
public Dataset<Row> oneHotEncode() {
SparkSession session = data.sparkSession();
int maxLength = getMaxSequenceLength(data);
session.udf().register("encoder", new UDF1<String, Vector>() {
private static final long serialVersionUID = -6095318836772114908L;
@Override
public Vector call(String s) throws Exception {
int len = AMINO_ACIDS21.size();
double[] values = new double[len * maxLength];
char[] seq = s.toCharArray();
for (int i = 0; i < seq.length; i++) {
int index = AMINO_ACIDS21.indexOf(seq[i]);
// replace any non-matching code, e.g., U, with X
if (index == -1) {
index = AMINO_ACIDS21.indexOf('X');
}
values[i * len + index] = 1;
}
return Vectors.dense(values);
}
}, new VectorUDT());
// append feature column
data.createOrReplaceTempView("table");
data = session.sql("SELECT *, encoder("
+ inputCol + ") AS "
+ outputCol + " from table");
return data;
}
/**
* Encodes a protein sequence by 7 physicochemical
* properties.
*
* <p> See: Meiler, J., Müller, M., Zeidler, A. et al. J Mol Model (2001) 7: 360. doi:
* <a href="https://link.springer.com/article/10.1007/s008940100038">10.1007/s008940100038</a>
*
* @return dataset with feature vector appended
*/
public Dataset<Row> propertyEncode() {
SparkSession session = data.sparkSession();
int maxLength = getMaxSequenceLength(data);
session.udf().register("encoder", new UDF1<String, Vector>(){
private static final long serialVersionUID = 1L;
@Override
public Vector call(String s) throws Exception {
double[] values = new double[7*maxLength];
for (int i = 0, k = 0; i < s.length(); i++) {
double[] property = properties.get(s.charAt(i));
if (property != null) {
for (double p: property) {
values[k++] = p;
}
}
}
return Vectors.dense(values);
}
}, new VectorUDT());
// append feature column
data.createOrReplaceTempView("table");
data = session.sql("SELECT *, encoder("
+ inputCol + ") AS "
+ outputCol + " from table");
return data;
}
/**
* Encodes a protein sequence by a Blosum62 matrix.
*
* <p> See: <a href="https://ftp.ncbi.nih.gov/repository/blocks/unix/blosum/BLOSUM/blosum62.blast.new">BLOSUM62 Matrix</a>
*
* @return dataset with feature vector appended
*/
public Dataset<Row> blosum62Encode() {
SparkSession session = data.sparkSession();
int maxLength = getMaxSequenceLength(data);
session.udf().register("encoder", new UDF1<String, Vector>(){
private static final long serialVersionUID = 1L;
@Override
public Vector call(String s) throws Exception {
double[] values = new double[20*maxLength];
for (int i = 0, k = 0; i < s.length(); i++) {
double[] property = blosum62.get(s.charAt(i));
if (property != null) {
for (double p: property) {
values[k++] = p;
}
}
}
return Vectors.dense(values);
}
}, new VectorUDT());
// append feature column
data.createOrReplaceTempView("table");
data = session.sql("SELECT *, encoder("
+ inputCol + ") AS "
+ outputCol + " from table");
return data;
}
/**
* Splits a one-letter sequence column (e.g., protein sequence)
* into array of non-overlapping n-grams. To generate all possible n-grams,
* this method needs to be called n times with shift parameters {0, ..., n-1}.
*
* <p> Example 3-gram(shift=0) : IDCGHTVEDQR ... => [IDC, GHT, VED, ...]
* <p> Example 3-gram(shift=1) : IDCGHTVEDQR ... => [DCG, HTV, EDQ, ...]
* <p> Example 3-gram(shift=2) : IDCGHTVEDQR ... => [CGH, TVE, DQR, ...]
*
* <p>For an application of shifted n-grams see:
* E Asgari, MRK Mofrad, PLoS One. 2015; 10(11): e0141287, doi:
* <a href="https://dx.doi.org/10.1371/journal.pone.0141287">10.1371/journal.pone.0141287</a>
*
* @param data input dataset with column "sequence"
* @param n size of the n-gram
* @param shift start index for the n-gram
* @param outputCol name of the output column
* @return output dataset with appended ngram column
*/
public static Dataset<Row> shiftedNgram(Dataset<Row> data, int n, int shift, String outputCol) {
SparkSession session = data.sparkSession();
session.udf().register("encoder", new UDF1<String, String[]>() {
private static final long serialVersionUID = 4844644794982507954L;
@Override
public String[] call(String s) throws Exception {
if (shift > s.length()) {
return new String[0];
}
s = s.substring(shift);
int t = s.length() / n;
String[] ngram = new String[t];
for (int i = 0, j = 0; j < t; i += n) {
ngram[j++] = s.substring(i, i + n);
}
return ngram;
}
}, DataTypes.createArrayType(DataTypes.StringType));
data.createOrReplaceTempView("table");
// append shifted ngram column
return session.sql("SELECT *, encoder(sequence) AS " + outputCol + " from table");
}
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("JavaTokenizerExample")
.getOrCreate();
// $example on$
List<Row> data = Arrays.asList(
RowFactory.create(0, "Hi I heard about Spark"),
RowFactory.create(1, "I wish Java could use case classes"),
RowFactory.create(2, "Logistic,regression,models,are,neat")
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);
Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");
RegexTokenizer regexTokenizer = new RegexTokenizer()
.setInputCol("sentence")
.setOutputCol("words")
.setPattern("\\W"); // alternatively .setPattern("\\w+").setGaps(false);
spark.udf().register("countTokens", new UDF1<WrappedArray, Integer>() {
@Override
public Integer call(WrappedArray words) {
return words.size();
}
}, DataTypes.IntegerType);
Dataset<Row> tokenized = tokenizer.transform(sentenceDataFrame);
tokenized.select("sentence", "words")
.withColumn("tokens", callUDF("countTokens", col("words"))).show(false);
Dataset<Row> regexTokenized = regexTokenizer.transform(sentenceDataFrame);
regexTokenized.select("sentence", "words")
.withColumn("tokens", callUDF("countTokens", col("words"))).show(false);
// $example off$
spark.stop();
}