下面列出了怎么用org.apache.spark.sql.SparkSession的API类实例代码及写法,或者点击链接到github查看源代码。
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("spark-bigquery-demo")
.getOrCreate();
// Use the Cloud Storage bucket for temporary BigQuery export data used
// by the connector. This assumes the Cloud Storage connector for
// Hadoop is configured.
String bucket = spark.sparkContext().hadoopConfiguration().get("fs.gs.system.bucket");
spark.conf().set("temporaryGcsBucket", bucket);
// Load data in from BigQuery.
Dataset<Row> wordsDF = spark.read().format("bigquery")
.option("table", "bigquery-public-data.samples.shakespeare").load().cache();
wordsDF.show();
wordsDF.printSchema();
wordsDF.createOrReplaceTempView("words");
// Perform word count.
Dataset<Row> wordCountDF = spark.sql(
"SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word");
// Saving the data to BigQuery
wordCountDF.write().format("bigquery").option("table", "wordcount_dataset.wordcount_output")
.save();
}
public static void main(String[] args) throws AnalysisException {
// $example on:init_session$
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate();
// $example off:init_session$
runBasicDataFrameExample(spark);
runDatasetCreationExample(spark);
runInferSchemaExample(spark);
runProgrammaticSchemaExample(spark);
spark.stop();
}
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().master("local")
.appName("Java Spark SQL")
.getOrCreate();
Dataset<Row> dataset = sparkSession.read().json("URL");
try {
//创建全局临时视图
dataset.createGlobalTempView("user");
//全局临时视图绑定到系统保存的数据库“global_temp”
Dataset<Row> globalUser = sparkSession.sql("SELECT * FROM global_temp.user");
sparkSession.newSession().sql("SELECT * FROM global_temp.user");
} catch (AnalysisException e) {
e.printStackTrace();
}
}
public void test16(SparkSession spark, Dataset<Row> schemaFlights, String containerOut, String type)
throws Exception {
System.out.println("*********************************");
System.out.println("T16: Non overwrite mode " + containerOut);
String o1 = containerOut + "myData/123";
StructType schema = DataTypes
.createStructType(new StructField[] { DataTypes.createStructField("NAME", DataTypes.StringType, false),
DataTypes.createStructField("STRING_VALUE", DataTypes.StringType, false),
DataTypes.createStructField("NUM_VALUE", DataTypes.IntegerType, false), });
Row r1 = RowFactory.create("name1", "value1", 1);
Row r2 = RowFactory.create("name2", "value2", 2);
List<Row> rowList = ImmutableList.of(r1, r2);
Dataset<Row> rows = spark.createDataFrame(rowList, schema);
try {
if (type.equals(Constants.PARQUET_TYPE)) {
rows.write().mode(SaveMode.Overwrite).parquet(o1);
} else if (type.equals(Constants.JSON_TYPE)) {
rows.write().mode(SaveMode.Overwrite).json(o1);
}
} catch (Exception e) {
deleteData(o1, spark.sparkContext().hadoopConfiguration(), dataCreate);
throw e;
} finally {
deleteData(o1, spark.sparkContext().hadoopConfiguration(), dataCreate);
}
}
public void test14(SparkSession spark, Dataset<Row> schemaFlights, String containerOut, String type)
throws Exception {
System.out.println("*********************************");
System.out.println("T14: Append mode " + containerOut);
String o1 = containerOut + "myData";
try {
createAppendObject("T14 - first append", schemaFlights, o1, type);
long baseCount = schemaFlights.count();
System.out
.println("***T14-1 : Reading " + o1 + " from " + containerOut + ", base unit " + baseCount + " type " + type);
readAndTest("T14-1-" + type, type, o1, spark, baseCount, 1);
createAppendObject("T14 - second append", schemaFlights, o1, type);
baseCount = schemaFlights.count();
System.out
.println("***T14-2 : Reading " + o1 + " from " + containerOut + ", base unit " + baseCount + " type " + type);
readAndTest("T14-2-" + type, type, o1, spark, baseCount, 2);
} catch (Exception e) {
throw e;
} finally {
deleteData(o1, spark.sparkContext().hadoopConfiguration(), true);
}
}
/**
* Returns a dataset of missense variations for a list of Uniprot Ids and a MyVariant.info query.
* See <a href="http://myvariant.info/docs/">query syntax</a>.
* <p> Example:
* <pre>
* String query = "clinvar.rcv.clinical_significance:pathogenic "
* + "OR clinvar.rcv.clinical_significance:likely pathogenic";
* </pre>
*
* @param uniprotIds list of Uniprot Ids
* @param query MyVariant.info query string
* @return dataset with variation Ids and Uniprot Ids or null if no data are found
* @throws IOException
*/
public static Dataset<Row> getVariations(List<String> uniprotIds, String query) throws IOException {
// get a spark context
SparkSession spark = SparkSession.builder().getOrCreate();
@SuppressWarnings("resource") // sc will be closed elsewhere
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
// download data in parallel
JavaRDD<String> data = sc.parallelize(uniprotIds).flatMap(m -> getData(m, query));
// convert from JavaRDD to Dataset
Dataset<String> jsonData = spark.createDataset(JavaRDD.toRDD(data), Encoders.STRING());
// parse json strings and return as a dataset
Dataset<Row> dataset = spark.read().json(jsonData);
// return null if dataset contains no results
if (!Arrays.asList(dataset.columns()).contains("hits")) {
System.out.println("MyVariantDataset: no matches found");
return null;
}
return flattenDataset(dataset);
}
/** */
private static void nativeSparkSqlExample(SparkSession spark) {
System.out.println("Querying using Spark SQL.");
Dataset<Row> df = spark.read()
.format(IgniteDataFrameSettings.FORMAT_IGNITE()) //Data source type.
.option(IgniteDataFrameSettings.OPTION_TABLE(), "person") //Table to read.
.option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG) //Ignite config.
.load();
//Registering DataFrame as Spark view.
df.createOrReplaceTempView("person");
//Selecting data from Ignite through Spark SQL Engine.
Dataset<Row> igniteDF = spark.sql("SELECT * FROM person WHERE id >= 2 AND name = 'Mary Major'");
System.out.println("Result schema:");
igniteDF.printSchema(); //Printing query schema to console.
System.out.println("Result content:");
igniteDF.show(); //Printing query results to console.
}
private void start() {
SparkSession spark = SparkSession.builder().appName("CSV to Dataset")
.master("local").getOrCreate();
spark.udf().register("x2Multiplier", new Multiplier2(),
DataTypes.IntegerType);
String filename = "data/tuple-data-file.csv";
Dataset<Row> df = spark.read().format("csv").option("inferSchema", "true")
.option("header", "false").load(filename);
df = df.withColumn("label", df.col("_c0")).drop("_c0");
df = df.withColumn("value", df.col("_c1")).drop("_c1");
df = df.withColumn("x2", callUDF("x2Multiplier", df.col("value").cast(
DataTypes.IntegerType)));
df.show();
}
/**
* Returns a dataset with the specified columns for all current PDB entries.
* See <a href="https://www.rcsb.org/pdb/results/reportField.do"> for list
* of supported field names</a>
*
* @param columnNames
* names of the columns for the dataset
* @return dataset with the specified columns
* @throws IOException
* when temporary csv file cannot be created
*/
public static Dataset<Row> getDataset(String... columnNames) throws IOException {
// form query URL
String query = CURRENT_URL + columNamesString(columnNames);
// run tabular report query
InputStream input = postQuery(query);
// save as a temporary CSV file
Path tempFile = saveTempFile(input);
SparkSession spark = SparkSession.builder().getOrCreate();
// load temporary CSV file into Spark dataset
Dataset<Row> dataset = readCsv(spark, tempFile.toString());
return concatIds(spark, dataset, columnNames);
}
public GraphFrame getGraphFrameUserRelationship() throws IOException {
Path temp = Files.createTempDirectory("sparkGraphFrames");
SparkSession session = SparkSession.builder()
.appName("SparkGraphFrameSample")
.config("spark.sql.warehouse.dir", temp.toString())
.sparkContext(getSparkContext().sc())
.master("local[*]")
.getOrCreate();
List<User> users = loadUsers();
Dataset<Row> userDataset = session.createDataFrame(users, User.class);
List<Relationship> relationshipsList = getRelations();
Dataset<Row> relationshipDataset = session.createDataFrame(relationshipsList, Relationship.class);
GraphFrame graphFrame = new GraphFrame(userDataset, relationshipDataset);
return graphFrame;
}
@Test
public void testParquetColumnName() throws Exception {
String tablePath = getExternalResourceDirectory()+"parquet_colname";
methodWatcher.execute(String.format("create external table t_parquet (col1 int, col2 varchar(5))" +
" STORED AS PARQUET LOCATION '%s'", tablePath));
methodWatcher.execute("insert into t_parquet values (1, 'A')");
SparkSession spark = SparkSession.builder()
.master("local")
.appName("ExternaltableIT")
.getOrCreate();
Dataset dataset = spark
.read()
.parquet(tablePath);
String actual = dataset.schema().toString();
String expected = "StructType(StructField(COL1,IntegerType,true), StructField(COL2,StringType,true))";
Assert.assertEquals(actual, expected, actual);
}
private List<String> getCollectorItemListForLobs(List<Lob> lobList, SparkSession sparkSession, JavaSparkContext javaSparkContext) {
dashboardCollectorItemsMap
= DashBoardCollectorItemMapBuilder.getDashboardNameCollectorItemsMapById(getCollectorType(), sparkSession, javaSparkContext);
List<String> collectorItemList = new ArrayList<>();
Optional.ofNullable(lobList).orElseGet(Collections::emptyList).stream()
.map(Lob::getProducts)
.forEach(products -> products.stream()
.map(Product::getProductComponentList)
.forEach(productComponents -> productComponents
.stream()
.map(ProductComponent::getProductComponentDashboardId)
.filter(Objects::nonNull)
.<List<String>>map(dashboardId -> dashboardCollectorItemsMap.get(dashboardId.toString()) != null ? dashboardCollectorItemsMap.get(dashboardId.toString()) : new ArrayList<>())
.forEach(collectorItemList::addAll)));
return collectorItemList;
}
@Test
public void testStringDataFrameToVectorDataFrameNull() {
List<String> list = new ArrayList<>();
list.add("[1.2, 3.4]");
list.add(null);
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new StringToRow());
SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("C1", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> inDF = sparkSession.createDataFrame(javaRddRow, schema);
Dataset<Row> outDF = RDDConverterUtilsExt.stringDataFrameToVectorDataFrame(sparkSession, inDF);
List<String> expectedResults = new ArrayList<>();
expectedResults.add("[[1.2,3.4]]");
expectedResults.add("[null]");
List<Row> outputList = outDF.collectAsList();
for (Row row : outputList) {
assertTrue("Expected results don't contain: " + row, expectedResults.contains(row.toString()));
}
}
/**
* Create a SystemDS-preferred Spark Session.
*
* @param appName the application name
* @param master the master value (ie, "local", etc)
* @return Spark Session
*/
public static SparkSession createSystemDSSparkSession(String appName, String master) {
Builder builder = SparkSession.builder();
if (appName != null) {
builder.appName(appName);
}
if (master != null) {
builder.master(master);
}
builder.config("spark.driver.maxResultSize", "0");
if (SparkExecutionContext.FAIR_SCHEDULER_MODE) {
builder.config("spark.scheduler.mode", "FAIR");
}
builder.config("spark.locality.wait", "5s");
SparkSession spark = builder.getOrCreate();
return spark;
}
private void start() {
SparkSession spark = SparkSession.builder().appName("Book CSV to Dataset")
.master("local").getOrCreate();
String filename = "data/books.csv";
// @formatter:off
Dataset<Row> df = spark
.read()
.format("csv")
.option("inferSchema", "false") // We are not inferring the schema for now
.option("header", "true")
.load(filename);
// @formatter:on
df.show();
// In this case everything is a string
df.printSchema();
}
private static void runMLContextTestMultipleScript(ExecMode platform, boolean wRead)
{
ExecMode oldplatform = DMLScript.getGlobalExecMode();
DMLScript.setGlobalExecMode(platform);
//create mlcontext
SparkSession spark = createSystemDSSparkSession("MLContextScratchCleanupTest", "local");
MLContext ml = new MLContext(spark);
ml.setExplain(true);
String dml1 = baseDirectory + File.separator + "ScratchCleanup1.dml";
String dml2 = baseDirectory + File.separator + (wRead?"ScratchCleanup2b.dml":"ScratchCleanup2.dml");
try
{
Script script1 = dmlFromFile(dml1).in("$rows", rows).in("$cols", cols).out("X");
Matrix X = ml.execute(script1).getMatrix("X");
//clear in-memory/cached data to emulate on-disk storage
X.toMatrixObject().clearData();
Script script2 = dmlFromFile(dml2).in("X", X).out("z");
String z = ml.execute(script2).getString("z");
System.out.println(z);
}
catch(Exception ex) {
throw new RuntimeException(ex);
}
finally {
DMLScript.setGlobalExecMode(oldplatform);
// stop underlying spark context to allow single jvm tests (otherwise the
// next test that tries to create a SparkContext would fail)
spark.stop();
// clear status mlcontext and spark exec context
ml.close();
}
}
@Override
public Dataset<String> read(SparkSession spark, Properties profilerProps, Properties readerProps) {
String inputPath = TELEMETRY_INPUT_PATH.get(profilerProps, String.class);
if(inputFormat == null) {
inputFormat = TELEMETRY_INPUT_FORMAT.get(profilerProps, String.class);
}
LOG.debug("Loading telemetry; inputPath={}, inputFormat={}", inputPath, inputFormat);
return spark
.read()
.options(Maps.fromProperties(readerProps))
.format(inputFormat)
.load(inputPath)
.as(Encoders.STRING());
}
protected ConceptMaps newInstance(SparkSession spark,
Dataset<UrlAndVersion> members,
Dataset<ConceptMap> conceptMaps,
Dataset<Mapping> mappings) {
return new ConceptMaps(spark, members, conceptMaps, mappings);
}
@Override
public void show() {
try {
compileRequirement(buildWrapper().show(), session(), SparkSession.class).execute();
} catch (Exception ex) {
ex.printStackTrace();
}
}
/**
* Encodes a protein sequence by a Blosum62 matrix.
*
* <p> See: <a href="https://ftp.ncbi.nih.gov/repository/blocks/unix/blosum/BLOSUM/blosum62.blast.new">BLOSUM62 Matrix</a>
*
* @return dataset with feature vector appended
*/
public Dataset<Row> blosum62Encode() {
SparkSession session = data.sparkSession();
int maxLength = getMaxSequenceLength(data);
session.udf().register("encoder", new UDF1<String, Vector>(){
private static final long serialVersionUID = 1L;
@Override
public Vector call(String s) throws Exception {
double[] values = new double[20*maxLength];
for (int i = 0, k = 0; i < s.length(); i++) {
double[] property = blosum62.get(s.charAt(i));
if (property != null) {
for (double p: property) {
values[k++] = p;
}
}
}
return Vectors.dense(values);
}
}, new VectorUDT());
// append feature column
data.createOrReplaceTempView("table");
data = session.sql("SELECT *, encoder("
+ inputCol + ") AS "
+ outputCol + " from table");
return data;
}
private static Dataset<Row> averageFeatureVectors(Dataset<Row> data, String outputCol) {
SparkSession session = data.sparkSession();
session.udf().register("averager", new UDF3<Vector, Vector, Vector, Vector>() {
private static final long serialVersionUID = -8190379199020903671L;
@Override
public Vector call(Vector v1, Vector v2, Vector v3) throws Exception {
double[] f1 = v1.toArray();
double[] f2 = v2.toArray();
double[] f3 = v3.toArray();
// arrays may be of different length
int len = Math.min(Math.min(f1.length, f2.length), f3.length);
double[] average = new double[len];
for (int i = 0; i < len; i++) {
average[i] = (f1[i] + f2[i] + f3[i]) / 3.0;
}
return Vectors.dense(average);
}
}, new VectorUDT());
data.createOrReplaceTempView("table");
// append new feature column with average values
return session.sql("SELECT *, averager(features0,features1,features2) AS " + outputCol + " from table");
}
protected TryResult createSparkSession() {
try {
SparkSession sparkSession = getOrCreate();
return TryResult.createResult(sparkSession);
} catch (Exception e) {
return TryResult.createError(errorPrinter.print(e));
}
}
/**
* Returns an empty ValueSets instance.
*
* @param spark the spark session
* @return an empty ValueSets instance.
*/
public static ValueSets getEmpty(SparkSession spark) {
Dataset<ValueSet> emptyValueSets = spark.emptyDataset(VALUE_SET_ENCODER)
.withColumn("timestamp", lit(null).cast("timestamp"))
.as(VALUE_SET_ENCODER);
return new ValueSets(spark,
spark.emptyDataset(URL_AND_VERSION_ENCODER),
emptyValueSets,
spark.emptyDataset(getValueEncoder()));
}
public static void main(String[] args) throws IOException {
SparkSession spark = SparkSession.builder().master("local[*]").appName(PdbMetadataDemo.class.getSimpleName())
.getOrCreate();
// query the following fields from the _citation category using PDBj's Mine2 web service:
// journal_abbrev, pdbx_database_id_PubMed, year.
// Note, mixed case column names must be quoted and escaped with \".
String sqlQuery = "SELECT pdbid, journal_abbrev, \"pdbx_database_id_PubMed\", year from citation WHERE id = 'primary'";
Dataset<Row>ds = PdbjMineDataset.getDataset(sqlQuery);
System.out.println("First 10 results from query: " + sqlQuery);
ds.show(10, false);
// filter out unpublished entries (they contain the word "published" in various upper/lower case combinations)
ds = ds.filter("UPPER(journal_abbrev) NOT LIKE '%PUBLISHED%'");
// print the top 10 journals
System.out.println("Top 10 journals that publish PDB structures:");
ds.groupBy("journal_abbrev").count().sort(col("count").desc()).show(10, false);
// filter out entries without a PubMed Id (is -1 if PubMed Id is not available)
ds = ds.filter("pdbx_database_id_PubMed > 0");
System.out.println("Entries with PubMed Ids: " + ds.count());
// show growth of papers in PubMed
System.out.println("PubMed Ids per year: ");
ds.groupBy("year").count().sort(col("year").desc()).show(10, false);
spark.close();
}
DefaultDataCollector(String collectionName, String query, List<String> collectorItemIds, SparkSession sparkSession, JavaSparkContext javaSparkContext, PortfolioCollectorSetting portfolioCollectorSetting) {
this.collectionName = collectionName;
this.query = query;
this.collectorItemIds = collectorItemIds;
this.sparkSession = sparkSession;
this.javaSparkContext = javaSparkContext;
this.portfolioCollectorSetting = portfolioCollectorSetting;
}
/**
* Extracts the given resource type from the RDD of bundles and returns
* it as a Dataset of that type.
*
* @param spark the spark session
* @param bundles an RDD of FHIR Bundles
* @param resourceClass the type of resource to extract.
* @return a dataset of the given resource
*/
public Dataset<Row> extractEntry(SparkSession spark,
JavaRDD<BundleContainer> bundles,
Class resourceClass) {
RuntimeResourceDefinition definition = FhirContexts.contextFor(fhirVersion)
.getResourceDefinition(resourceClass);
return extractEntry(spark, bundles, definition.getName());
}
public void collect(SparkSession sparkSession, JavaSparkContext javaSparkContext, List<?> objectList) {
if ((sparkSession == null) || (javaSparkContext == null) || CollectionUtils.isEmpty(objectList)) { return; }
if (objectList.get(0) instanceof Portfolio){
collectPortFolioMetrics(sparkSession, javaSparkContext, (List<Portfolio>) objectList);
return;
}
if (objectList.get(0) instanceof Lob){
collectLobMetrics(sparkSession, javaSparkContext, (List<Lob>) objectList);
return;
}
}
/**
* Reads CSV file into a Spark dataset
*
* @param fileName
* @throws IOException
*/
private static Dataset<Row> readCsv(String inputFileName) throws IOException {
SparkSession spark = SparkSession.builder().getOrCreate();
Dataset<Row> dataset = spark.read().format("csv").option("header", "true").option("inferSchema", "true")
.load(inputFileName);
return dataset;
}
private ValueSets(SparkSession spark,
Dataset<UrlAndVersion> members,
Dataset<Row> valueSets,
Dataset<Value> values) {
super(spark, FhirVersionEnum.DSTU3, members, valueSets,values, valuesetRowConverter);
}
/**
* Returns an empty ConceptMaps instance.
*
* @param spark the spark session
* @return an empty ConceptMaps instance.
*/
public static ConceptMaps getEmpty(SparkSession spark) {
Dataset<ConceptMap> emptyConceptMaps = spark.emptyDataset(CONCEPT_MAP_ENCODER)
.withColumn("timestamp", lit(null).cast("timestamp"))
.as(CONCEPT_MAP_ENCODER);
return new ConceptMaps(spark,
spark.emptyDataset(URL_AND_VERSION_ENCODER),
emptyConceptMaps,
spark.emptyDataset(MAPPING_ENCODER));
}