类org.apache.spark.sql.SparkSession源码实例Demo

下面列出了怎么用org.apache.spark.sql.SparkSession的API类实例代码及写法,或者点击链接到github查看源代码。

public static void main(String[] args) {
    SparkSession spark = SparkSession.builder()
            .appName("spark-bigquery-demo")
            .getOrCreate();

    // Use the Cloud Storage bucket for temporary BigQuery export data used
    // by the connector. This assumes the Cloud Storage connector for
    // Hadoop is configured.
    String bucket = spark.sparkContext().hadoopConfiguration().get("fs.gs.system.bucket");
    spark.conf().set("temporaryGcsBucket", bucket);

    // Load data in from BigQuery.
    Dataset<Row> wordsDF = spark.read().format("bigquery")
            .option("table", "bigquery-public-data.samples.shakespeare").load().cache();
    wordsDF.show();
    wordsDF.printSchema();
    wordsDF.createOrReplaceTempView("words");

    // Perform word count.
    Dataset<Row> wordCountDF = spark.sql(
            "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word");

    // Saving the data to BigQuery
    wordCountDF.write().format("bigquery").option("table", "wordcount_dataset.wordcount_output")
            .save();
}
 
源代码2 项目: SparkDemo   文件: JavaSparkSQLExample.java
public static void main(String[] args) throws AnalysisException {
  // $example on:init_session$
  SparkSession spark = SparkSession
    .builder()
    .appName("Java Spark SQL basic example")
    .config("spark.some.config.option", "some-value")
    .getOrCreate();
  // $example off:init_session$

  runBasicDataFrameExample(spark);
  runDatasetCreationExample(spark);
  runInferSchemaExample(spark);
  runProgrammaticSchemaExample(spark);

  spark.stop();
}
 
源代码3 项目: sparkResearch   文件: global.java
public static void main(String[] args) {
    SparkSession sparkSession = SparkSession.builder().master("local")
            .appName("Java Spark SQL")
            .getOrCreate();

    Dataset<Row> dataset = sparkSession.read().json("URL");
    try {
        //创建全局临时视图
        dataset.createGlobalTempView("user");
        //全局临时视图绑定到系统保存的数据库“global_temp”
        Dataset<Row> globalUser = sparkSession.sql("SELECT * FROM global_temp.user");
        sparkSession.newSession().sql("SELECT * FROM global_temp.user");
    } catch (AnalysisException e) {
        e.printStackTrace();
    }
}
 
源代码4 项目: stocator   文件: TestSuite.java
public void test16(SparkSession spark, Dataset<Row> schemaFlights, String containerOut, String type)
    throws Exception {
  System.out.println("*********************************");
  System.out.println("T16: Non overwrite mode " + containerOut);
  String o1 = containerOut + "myData/123";
  StructType schema = DataTypes
      .createStructType(new StructField[] { DataTypes.createStructField("NAME", DataTypes.StringType, false),
          DataTypes.createStructField("STRING_VALUE", DataTypes.StringType, false),
          DataTypes.createStructField("NUM_VALUE", DataTypes.IntegerType, false), });
  Row r1 = RowFactory.create("name1", "value1", 1);
  Row r2 = RowFactory.create("name2", "value2", 2);
  List<Row> rowList = ImmutableList.of(r1, r2);
  Dataset<Row> rows = spark.createDataFrame(rowList, schema);
  try {
    if (type.equals(Constants.PARQUET_TYPE)) {
      rows.write().mode(SaveMode.Overwrite).parquet(o1);
    } else if (type.equals(Constants.JSON_TYPE)) {
      rows.write().mode(SaveMode.Overwrite).json(o1);
    }
  } catch (Exception e) {
    deleteData(o1, spark.sparkContext().hadoopConfiguration(), dataCreate);
    throw e;
  } finally {
    deleteData(o1, spark.sparkContext().hadoopConfiguration(), dataCreate);
  }
}
 
源代码5 项目: stocator   文件: TestSuite.java
public void test14(SparkSession spark, Dataset<Row> schemaFlights, String containerOut, String type)
    throws Exception {
  System.out.println("*********************************");
  System.out.println("T14: Append mode " + containerOut);
  String o1 = containerOut + "myData";
  try {
    createAppendObject("T14 - first append", schemaFlights, o1, type);
    long baseCount = schemaFlights.count();
    System.out
        .println("***T14-1 : Reading " + o1 + " from " + containerOut + ", base unit " + baseCount + " type " + type);
    readAndTest("T14-1-" + type, type, o1, spark, baseCount, 1);
    createAppendObject("T14 - second append", schemaFlights, o1, type);
    baseCount = schemaFlights.count();
    System.out
        .println("***T14-2 : Reading " + o1 + " from " + containerOut + ", base unit " + baseCount + " type " + type);
    readAndTest("T14-2-" + type, type, o1, spark, baseCount, 2);
  } catch (Exception e) {
    throw e;
  } finally {
    deleteData(o1, spark.sparkContext().hadoopConfiguration(), true);
  }
}
 
源代码6 项目: mmtf-spark   文件: MyVariantDataset.java
/**
 * Returns a dataset of missense variations for a list of Uniprot Ids and a MyVariant.info query.
 * See <a href="http://myvariant.info/docs/">query syntax</a>.
 * <p> Example:
 * <pre>
 * String query = "clinvar.rcv.clinical_significance:pathogenic " 
 *                + "OR clinvar.rcv.clinical_significance:likely pathogenic";
 * </pre>
 * 
 * @param uniprotIds list of Uniprot Ids
 * @param query MyVariant.info query string
 * @return dataset with variation Ids and Uniprot Ids or null if no data are found
 * @throws IOException
 */
public static Dataset<Row> getVariations(List<String> uniprotIds, String query) throws IOException {
    // get a spark context
    SparkSession spark = SparkSession.builder().getOrCreate();
    @SuppressWarnings("resource") // sc will be closed elsewhere
    JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());

    // download data in parallel
    JavaRDD<String> data = sc.parallelize(uniprotIds).flatMap(m -> getData(m, query));

    // convert from JavaRDD to Dataset
    Dataset<String> jsonData = spark.createDataset(JavaRDD.toRDD(data), Encoders.STRING());

    // parse json strings and return as a dataset
    Dataset<Row> dataset = spark.read().json(jsonData);

    // return null if dataset contains no results
    if (!Arrays.asList(dataset.columns()).contains("hits")) {
        System.out.println("MyVariantDataset: no matches found");
        return null;
    }

    return flattenDataset(dataset);
}
 
源代码7 项目: ignite   文件: JavaIgniteDataFrameExample.java
/** */
private static void nativeSparkSqlExample(SparkSession spark) {
    System.out.println("Querying using Spark SQL.");

    Dataset<Row> df = spark.read()
            .format(IgniteDataFrameSettings.FORMAT_IGNITE()) //Data source type.
            .option(IgniteDataFrameSettings.OPTION_TABLE(), "person") //Table to read.
            .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG) //Ignite config.
            .load();

    //Registering DataFrame as Spark view.
    df.createOrReplaceTempView("person");

    //Selecting data from Ignite through Spark SQL Engine.
    Dataset<Row> igniteDF = spark.sql("SELECT * FROM person WHERE id >= 2 AND name = 'Mary Major'");

    System.out.println("Result schema:");

    igniteDF.printSchema(); //Printing query schema to console.

    System.out.println("Result content:");

    igniteDF.show(); //Printing query results to console.
}
 
private void start() {
  SparkSession spark = SparkSession.builder().appName("CSV to Dataset")
      .master("local").getOrCreate();

  spark.udf().register("x2Multiplier", new Multiplier2(),
      DataTypes.IntegerType);

  String filename = "data/tuple-data-file.csv";
  Dataset<Row> df = spark.read().format("csv").option("inferSchema", "true")
      .option("header", "false").load(filename);
  df = df.withColumn("label", df.col("_c0")).drop("_c0");
  df = df.withColumn("value", df.col("_c1")).drop("_c1");
  df = df.withColumn("x2", callUDF("x2Multiplier", df.col("value").cast(
      DataTypes.IntegerType)));
  df.show();
}
 
源代码9 项目: mmtf-spark   文件: CustomReportService.java
/**
 * Returns a dataset with the specified columns for all current PDB entries.
 * See <a href="https://www.rcsb.org/pdb/results/reportField.do"> for list
 * of supported field names</a>
 * 
 * @param columnNames
 *            names of the columns for the dataset
 * @return dataset with the specified columns
 * @throws IOException
 *             when temporary csv file cannot be created
 */
public static Dataset<Row> getDataset(String... columnNames) throws IOException {
	// form query URL
	String query = CURRENT_URL + columNamesString(columnNames);

	// run tabular report query
	InputStream input = postQuery(query);

	// save as a temporary CSV file
	Path tempFile = saveTempFile(input);

	SparkSession spark = SparkSession.builder().getOrCreate();

	// load temporary CSV file into Spark dataset
	Dataset<Row> dataset = readCsv(spark, tempFile.toString());

	return concatIds(spark, dataset, columnNames);
}
 
源代码10 项目: tutorials   文件: GraphLoader.java
public GraphFrame getGraphFrameUserRelationship() throws IOException {
    Path temp = Files.createTempDirectory("sparkGraphFrames");
    SparkSession session = SparkSession.builder()
        .appName("SparkGraphFrameSample")
        .config("spark.sql.warehouse.dir", temp.toString())
        .sparkContext(getSparkContext().sc())
        .master("local[*]")
        .getOrCreate();
    List<User> users = loadUsers();

    Dataset<Row> userDataset = session.createDataFrame(users, User.class);

    List<Relationship> relationshipsList = getRelations();
    Dataset<Row> relationshipDataset = session.createDataFrame(relationshipsList, Relationship.class);

    GraphFrame graphFrame = new GraphFrame(userDataset, relationshipDataset);

    return graphFrame;
}
 
源代码11 项目: spliceengine   文件: ExternalTableIT.java
@Test
public void testParquetColumnName() throws Exception {
    String tablePath = getExternalResourceDirectory()+"parquet_colname";
    methodWatcher.execute(String.format("create external table t_parquet (col1 int, col2 varchar(5))" +
            " STORED AS PARQUET LOCATION '%s'", tablePath));
    methodWatcher.execute("insert into t_parquet values (1, 'A')");
    SparkSession spark = SparkSession.builder()
            .master("local")
            .appName("ExternaltableIT")
            .getOrCreate();

    Dataset dataset = spark
            .read()
            .parquet(tablePath);
    String actual = dataset.schema().toString();
    String expected = "StructType(StructField(COL1,IntegerType,true), StructField(COL2,StringType,true))";
    Assert.assertEquals(actual, expected, actual);
}
 
源代码12 项目: ExecDashboard   文件: DefaultMetricCollector.java
private List<String> getCollectorItemListForLobs(List<Lob> lobList, SparkSession sparkSession, JavaSparkContext javaSparkContext) {
    dashboardCollectorItemsMap
            = DashBoardCollectorItemMapBuilder.getDashboardNameCollectorItemsMapById(getCollectorType(), sparkSession, javaSparkContext);

    List<String> collectorItemList = new ArrayList<>();
    Optional.ofNullable(lobList).orElseGet(Collections::emptyList).stream()
            .map(Lob::getProducts)
            .forEach(products -> products.stream()
                    .map(Product::getProductComponentList)
                    .forEach(productComponents -> productComponents
                            .stream()
                            .map(ProductComponent::getProductComponentDashboardId)
                            .filter(Objects::nonNull)
                            .<List<String>>map(dashboardId -> dashboardCollectorItemsMap.get(dashboardId.toString()) != null ? dashboardCollectorItemsMap.get(dashboardId.toString()) : new ArrayList<>())
                            .forEach(collectorItemList::addAll)));
    return collectorItemList;
}
 
源代码13 项目: systemds   文件: RDDConverterUtilsExtTest.java
@Test
public void testStringDataFrameToVectorDataFrameNull() {
	List<String> list = new ArrayList<>();
	list.add("[1.2, 3.4]");
	list.add(null);
	JavaRDD<String> javaRddString = sc.parallelize(list);
	JavaRDD<Row> javaRddRow = javaRddString.map(new StringToRow());
	SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
	List<StructField> fields = new ArrayList<>();
	fields.add(DataTypes.createStructField("C1", DataTypes.StringType, true));
	StructType schema = DataTypes.createStructType(fields);
	Dataset<Row> inDF = sparkSession.createDataFrame(javaRddRow, schema);
	Dataset<Row> outDF = RDDConverterUtilsExt.stringDataFrameToVectorDataFrame(sparkSession, inDF);

	List<String> expectedResults = new ArrayList<>();
	expectedResults.add("[[1.2,3.4]]");
	expectedResults.add("[null]");

	List<Row> outputList = outDF.collectAsList();
	for (Row row : outputList) {
		assertTrue("Expected results don't contain: " + row, expectedResults.contains(row.toString()));
	}
}
 
源代码14 项目: systemds   文件: AutomatedTestBase.java
/**
 * Create a SystemDS-preferred Spark Session.
 *
 * @param appName the application name
 * @param master the master value (ie, "local", etc)
 * @return Spark Session
 */
public static SparkSession createSystemDSSparkSession(String appName, String master) {
	Builder builder = SparkSession.builder();
	if (appName != null) {
		builder.appName(appName);
	}
	if (master != null) {
		builder.master(master);
	}
	builder.config("spark.driver.maxResultSize", "0");
	if (SparkExecutionContext.FAIR_SCHEDULER_MODE) {
		builder.config("spark.scheduler.mode", "FAIR");
	}
	builder.config("spark.locality.wait", "5s");
	SparkSession spark = builder.getOrCreate();
	return spark;
}
 
源代码15 项目: net.jgp.labs.spark   文件: BooksCsvToDataset.java
private void start() {
  SparkSession spark = SparkSession.builder().appName("Book CSV to Dataset")
      .master("local").getOrCreate();

  String filename = "data/books.csv";
  // @formatter:off
Dataset<Row> df = spark
		.read()
		.format("csv")
		.option("inferSchema", "false") // We are not inferring the schema for now
		.option("header", "true")
		.load(filename);
// @formatter:on
  df.show();

  // In this case everything is a string
  df.printSchema();
}
 
源代码16 项目: systemds   文件: MLContextScratchCleanupTest.java
private static void runMLContextTestMultipleScript(ExecMode platform, boolean wRead) 
{
	ExecMode oldplatform = DMLScript.getGlobalExecMode();
	DMLScript.setGlobalExecMode(platform);
	
	//create mlcontext
	SparkSession spark = createSystemDSSparkSession("MLContextScratchCleanupTest", "local");
	MLContext ml = new MLContext(spark);
	ml.setExplain(true);

	String dml1 = baseDirectory + File.separator + "ScratchCleanup1.dml";
	String dml2 = baseDirectory + File.separator + (wRead?"ScratchCleanup2b.dml":"ScratchCleanup2.dml");
	
	try
	{
		Script script1 = dmlFromFile(dml1).in("$rows", rows).in("$cols", cols).out("X");
		Matrix X = ml.execute(script1).getMatrix("X");
		
		//clear in-memory/cached data to emulate on-disk storage
		X.toMatrixObject().clearData();
		
		Script script2 = dmlFromFile(dml2).in("X", X).out("z");
		String z = ml.execute(script2).getString("z");
		
		System.out.println(z);
	}
	catch(Exception ex) {
		throw new RuntimeException(ex);
	}
	finally {
		DMLScript.setGlobalExecMode(oldplatform);
		
		// stop underlying spark context to allow single jvm tests (otherwise the
		// next test that tries to create a SparkContext would fail)
		spark.stop();
		// clear status mlcontext and spark exec context
		ml.close();
	}
}
 
源代码17 项目: metron   文件: TextEncodedTelemetryReader.java
@Override
public Dataset<String> read(SparkSession spark, Properties profilerProps, Properties readerProps) {
  String inputPath = TELEMETRY_INPUT_PATH.get(profilerProps, String.class);
  if(inputFormat == null) {
    inputFormat = TELEMETRY_INPUT_FORMAT.get(profilerProps, String.class);
  }
  LOG.debug("Loading telemetry; inputPath={}, inputFormat={}", inputPath, inputFormat);

  return spark
          .read()
          .options(Maps.fromProperties(readerProps))
          .format(inputFormat)
          .load(inputPath)
          .as(Encoders.STRING());
}
 
源代码18 项目: bunsen   文件: ConceptMaps.java
protected ConceptMaps newInstance(SparkSession spark,
    Dataset<UrlAndVersion> members,
    Dataset<ConceptMap> conceptMaps,
    Dataset<Mapping> mappings) {

  return new ConceptMaps(spark, members, conceptMaps, mappings);
}
 
源代码19 项目: Quicksql   文件: SparkPipeline.java
@Override
public void show() {
    try {
        compileRequirement(buildWrapper().show(), session(), SparkSession.class).execute();
    } catch (Exception ex) {
        ex.printStackTrace();
    }
}
 
源代码20 项目: mmtf-spark   文件: ProteinSequenceEncoder.java
/**
 * Encodes a protein sequence by a Blosum62 matrix.
 * 
 * <p> See: <a href="https://ftp.ncbi.nih.gov/repository/blocks/unix/blosum/BLOSUM/blosum62.blast.new">BLOSUM62 Matrix</a>
    *
 * @return dataset with feature vector appended
 */
public Dataset<Row> blosum62Encode() {
	SparkSession session = data.sparkSession();
    int maxLength = getMaxSequenceLength(data);

	session.udf().register("encoder", new UDF1<String, Vector>(){
		private static final long serialVersionUID = 1L;

		@Override
		public Vector call(String s) throws Exception {
			double[] values = new double[20*maxLength];
			for (int i = 0, k = 0; i < s.length(); i++) {
				double[] property = blosum62.get(s.charAt(i));
				if (property != null) {
					for (double p: property) {
						values[k++] = p;
					}
				}	
			}
			return Vectors.dense(values);
		}
	}, new VectorUDT());

	// append feature column
			data.createOrReplaceTempView("table");
			data = session.sql("SELECT *, encoder(" 
			+ inputCol + ") AS " 
					+ outputCol + " from table");
			
			return data;
}
 
源代码21 项目: mmtf-spark   文件: ProteinSequenceEncoder.java
private static Dataset<Row> averageFeatureVectors(Dataset<Row> data, String outputCol) {
	SparkSession session = data.sparkSession();

	session.udf().register("averager", new UDF3<Vector, Vector, Vector, Vector>() {
		private static final long serialVersionUID = -8190379199020903671L;

		@Override
		public Vector call(Vector v1, Vector v2, Vector v3) throws Exception {
			double[] f1 = v1.toArray();
			double[] f2 = v2.toArray();
			double[] f3 = v3.toArray();
			
			// arrays may be of different length
			int len = Math.min(Math.min(f1.length, f2.length), f3.length);
			double[] average = new double[len];

			for (int i = 0; i < len; i++) {
				average[i] = (f1[i] + f2[i] + f3[i]) / 3.0;
			}
			return Vectors.dense(average);
		}
	}, new VectorUDT());

	data.createOrReplaceTempView("table");
	// append new feature column with average values
	return session.sql("SELECT *, averager(features0,features1,features2) AS " + outputCol + " from table");
}
 
源代码22 项目: beakerx   文件: SparkEngineBase.java
protected TryResult createSparkSession() {
  try {
    SparkSession sparkSession = getOrCreate();
    return TryResult.createResult(sparkSession);
  } catch (Exception e) {
    return TryResult.createError(errorPrinter.print(e));
  }
}
 
源代码23 项目: bunsen   文件: ValueSets.java
/**
 * Returns an empty ValueSets instance.
 *
 * @param spark the spark session
 * @return an empty ValueSets instance.
 */
public static ValueSets getEmpty(SparkSession spark) {

  Dataset<ValueSet> emptyValueSets = spark.emptyDataset(VALUE_SET_ENCODER)
      .withColumn("timestamp", lit(null).cast("timestamp"))
      .as(VALUE_SET_ENCODER);

  return new ValueSets(spark,
      spark.emptyDataset(URL_AND_VERSION_ENCODER),
      emptyValueSets,
      spark.emptyDataset(getValueEncoder()));
}
 
源代码24 项目: mmtf-spark   文件: PdbMetadataDemo.java
public static void main(String[] args) throws IOException {
 SparkSession spark = SparkSession.builder().master("local[*]").appName(PdbMetadataDemo.class.getSimpleName())
            .getOrCreate();

 // query the following fields from the _citation category using PDBj's Mine2 web service:
 // journal_abbrev, pdbx_database_id_PubMed, year.   
 // Note, mixed case column names must be quoted and escaped with \".
 String sqlQuery = "SELECT pdbid, journal_abbrev, \"pdbx_database_id_PubMed\", year from citation WHERE id = 'primary'";
 Dataset<Row>ds = PdbjMineDataset.getDataset(sqlQuery);
 
 System.out.println("First 10 results from query: " + sqlQuery);
 ds.show(10, false);
  
 // filter out unpublished entries (they contain the word "published" in various upper/lower case combinations)
 ds = ds.filter("UPPER(journal_abbrev) NOT LIKE '%PUBLISHED%'");
 
 // print the top 10 journals
 System.out.println("Top 10 journals that publish PDB structures:");
 ds.groupBy("journal_abbrev").count().sort(col("count").desc()).show(10, false);
	
 // filter out entries without a PubMed Id (is -1 if PubMed Id is not available)
 ds = ds.filter("pdbx_database_id_PubMed > 0");
 System.out.println("Entries with PubMed Ids: " + ds.count());
 
 // show growth of papers in PubMed
 System.out.println("PubMed Ids per year: ");
 ds.groupBy("year").count().sort(col("year").desc()).show(10, false);

 spark.close();
}
 
源代码25 项目: ExecDashboard   文件: DefaultDataCollector.java
DefaultDataCollector(String collectionName, String query, List<String> collectorItemIds, SparkSession sparkSession, JavaSparkContext javaSparkContext, PortfolioCollectorSetting portfolioCollectorSetting) {
    this.collectionName = collectionName;
    this.query = query;
    this.collectorItemIds = collectorItemIds;
    this.sparkSession = sparkSession;
    this.javaSparkContext = javaSparkContext;
    this.portfolioCollectorSetting = portfolioCollectorSetting;
}
 
源代码26 项目: bunsen   文件: Bundles.java
/**
 * Extracts the given resource type from the RDD of bundles and returns
 * it as a Dataset of that type.
 *
 * @param spark the spark session
 * @param bundles an RDD of FHIR Bundles
 * @param resourceClass the type of resource to extract.
 * @return a dataset of the given resource
 */
public Dataset<Row> extractEntry(SparkSession spark,
    JavaRDD<BundleContainer> bundles,
    Class resourceClass) {

  RuntimeResourceDefinition definition = FhirContexts.contextFor(fhirVersion)
      .getResourceDefinition(resourceClass);

  return extractEntry(spark, bundles, definition.getName());
}
 
源代码27 项目: ExecDashboard   文件: DefaultMetricCollector.java
public void collect(SparkSession sparkSession, JavaSparkContext javaSparkContext, List<?> objectList) {
    if ((sparkSession == null) || (javaSparkContext == null) || CollectionUtils.isEmpty(objectList)) { return; }

    if (objectList.get(0) instanceof Portfolio){
        collectPortFolioMetrics(sparkSession, javaSparkContext, (List<Portfolio>) objectList);
        return;
    }
    if (objectList.get(0) instanceof Lob){
        collectLobMetrics(sparkSession, javaSparkContext, (List<Lob>) objectList);
        return;
    }
}
 
源代码28 项目: mmtf-spark   文件: DrugBankDataset.java
/**
 * Reads CSV file into a Spark dataset
 * 
 * @param fileName
 * @throws IOException
 */
private static Dataset<Row> readCsv(String inputFileName) throws IOException {
    SparkSession spark = SparkSession.builder().getOrCreate();

    Dataset<Row> dataset = spark.read().format("csv").option("header", "true").option("inferSchema", "true")
            .load(inputFileName);

    return dataset;
}
 
源代码29 项目: bunsen   文件: ValueSets.java
private ValueSets(SparkSession spark,
    Dataset<UrlAndVersion> members,
    Dataset<Row> valueSets,
    Dataset<Value> values) {

  super(spark, FhirVersionEnum.DSTU3, members, valueSets,values, valuesetRowConverter);
}
 
源代码30 项目: bunsen   文件: ConceptMaps.java
/**
 * Returns an empty ConceptMaps instance.
 *
 * @param spark the spark session
 * @return an empty ConceptMaps instance.
 */
public static ConceptMaps getEmpty(SparkSession spark) {

  Dataset<ConceptMap> emptyConceptMaps = spark.emptyDataset(CONCEPT_MAP_ENCODER)
      .withColumn("timestamp", lit(null).cast("timestamp"))
      .as(CONCEPT_MAP_ENCODER);

  return new ConceptMaps(spark,
      spark.emptyDataset(URL_AND_VERSION_ENCODER),
      emptyConceptMaps,
      spark.emptyDataset(MAPPING_ENCODER));
}
 
 类所在包
 类方法
 同包方法