下面列出了怎么用org.apache.spark.sql.AnalysisException的API类实例代码及写法,或者点击链接到github查看源代码。
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().master("local")
.appName("Java Spark SQL")
.getOrCreate();
Dataset<Row> dataset = sparkSession.read().json("URL");
try {
//创建全局临时视图
dataset.createGlobalTempView("user");
//全局临时视图绑定到系统保存的数据库“global_temp”
Dataset<Row> globalUser = sparkSession.sql("SELECT * FROM global_temp.user");
sparkSession.newSession().sql("SELECT * FROM global_temp.user");
} catch (AnalysisException e) {
e.printStackTrace();
}
}
/**
* Converts checked exceptions to unchecked exceptions.
*
* @param cause a checked exception object which is to be converted to its unchecked equivalent.
* @param message exception message as a format string
* @param args format specifiers
* @return unchecked exception.
*/
public static RuntimeException toUncheckedException(Throwable cause, String message, Object... args) {
if (cause instanceof RuntimeException) {
return (RuntimeException) cause;
} else if (cause instanceof org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException) {
return new NoSuchNamespaceException(cause, message, args);
} else if (cause instanceof org.apache.spark.sql.catalyst.analysis.NoSuchTableException) {
return new NoSuchTableException(cause, message, args);
} else if (cause instanceof AnalysisException) {
return new ValidationException(cause, message, args);
} else if (cause instanceof IOException) {
return new RuntimeIOException((IOException) cause, message, args);
} else {
return new RuntimeException(String.format(message, args), cause);
}
}
public static void main(String[] args) throws AnalysisException {
// $example on:init_session$
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate();
// $example off:init_session$
runBasicDataFrameExample(spark);
runDatasetCreationExample(spark);
runInferSchemaExample(spark);
runProgrammaticSchemaExample(spark);
spark.stop();
}
/**
* Import files from an existing Spark table to an Iceberg table.
*
* The import uses the Spark session to get table metadata. It assumes no
* operation is going on the original and target table and thus is not
* thread-safe.
*
* @param spark a Spark session
* @param sourceTableIdent an identifier of the source Spark table
* @param targetTable an Iceberg table where to import the data
* @param stagingDir a staging directory to store temporary manifest files
*/
public static void importSparkTable(
SparkSession spark, TableIdentifier sourceTableIdent, Table targetTable, String stagingDir) {
SessionCatalog catalog = spark.sessionState().catalog();
String db = sourceTableIdent.database().nonEmpty() ?
sourceTableIdent.database().get() :
catalog.getCurrentDatabase();
TableIdentifier sourceTableIdentWithDB = new TableIdentifier(sourceTableIdent.table(), Some.apply(db));
if (!catalog.tableExists(sourceTableIdentWithDB)) {
throw new org.apache.iceberg.exceptions.NoSuchTableException(
String.format("Table %s does not exist", sourceTableIdentWithDB));
}
try {
PartitionSpec spec = SparkSchemaUtil.specForTable(spark, sourceTableIdentWithDB.unquotedString());
if (spec == PartitionSpec.unpartitioned()) {
importUnpartitionedSparkTable(spark, sourceTableIdentWithDB, targetTable);
} else {
List<SparkPartition> sourceTablePartitions = getPartitions(spark, sourceTableIdent);
importSparkPartitions(spark, sourceTablePartitions, targetTable, spec, stagingDir);
}
} catch (AnalysisException e) {
throw SparkExceptionUtil.toUncheckedException(
e, "Unable to get partition spec for table: %s", sourceTableIdentWithDB);
}
}
@Test
public void testCsvDFSSourceNoHeaderWithoutSchemaProviderAndWithTransformer() throws Exception {
// The CSV files do not have header, the columns are separated by '\t'
// No schema provider is specified, transformer is applied
// In this case, the source schema comes from the inferred schema of the CSV files.
// Target schema is determined based on the Dataframe after transformation
// No CSV header and no schema provider at the same time are not recommended,
// as the transformer behavior may be unexpected
Exception e = assertThrows(AnalysisException.class, () -> {
testCsvDFSSource(false, '\t', false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
}, "Should error out when doing the transformation.");
LOG.debug("Expected error during transformation", e);
assertTrue(e.getMessage().contains("cannot resolve '`begin_lat`' given input columns:"));
}
@Test(expected = AnalysisException.class)
public void checkApplyBulkMutations_Exception_TableExist() {
JdbcOutput jdbcOutput = new JdbcOutput();
ValidationAssert.assertNoValidationFailures(jdbcOutput, config);
jdbcOutput.configure(config);
ArrayList<Tuple2<MutationType, Dataset<Row>>> planned = new ArrayList<>();
Dataset<Row> o = Contexts.getSparkSession().read().json(JdbcInput.class.getResource(SAMPLE_DATA_PATH).getPath());
Tuple2<MutationType, Dataset<Row>> input = new Tuple2<>(MutationType.INSERT, o);
planned.add(input);
jdbcOutput.applyBulkMutations(planned);
}
@Test (expected = AnalysisException.class)
public void testHiveDisabledConfiguration() {
Map<String, Object> sparamMap = new HashMap<>();
sparamMap.put(Contexts.APPLICATION_SECTION_PREFIX + "." +
Contexts.SPARK_SESSION_ENABLE_HIVE_SUPPORT, false);
sparamMap.putAll(getTestConfigMap());
Contexts.initialize(ConfigFactory.parseMap(sparamMap), Contexts.ExecutionMode.BATCH);
Contexts.getSparkSession().sql("CREATE TABLE testHiveDisabled(d int)");
try {
Contexts.getSparkSession().sql("SELECT count(*) from testHiveDisabled");
} finally {
Contexts.getSparkSession().sql("DROP TABLE testHiveDisabled");
}
}
@Test (expected = AnalysisException.class)
public void testDefaultHiveDisabledForUnitTestsConfiguration() {
Contexts.getSparkSession().sql("CREATE TABLE testHiveDisabled(d int)");
try {
Contexts.getSparkSession().sql("SELECT count(*) from testHiveDisabled");
} finally {
Contexts.getSparkSession().sql("DROP TABLE testHiveDisabled");
}
}
@Test
(expected = AnalysisException.class)
public void testNoUDFs() throws Throwable {
Contexts.closeSparkSession();
Config config = ConfigUtils.configFromResource("/udf/udf_none.conf");
new Runner().initializeUDFs(config);
Deriver deriver = ComponentFactory.create(
Deriver.class, config.getConfig("steps.runudf.deriver"), true);
deriver.derive(Maps.<String, Dataset<Row>>newHashMap());
}
@Test (expected = AnalysisException.class)
public void testInputRepartitionInvalidColumn() throws Exception {
Map<String, Object> configMap = Maps.newHashMap();
configMap.put(DataStep.INPUT_TYPE + "." + ComponentFactory.TYPE_CONFIG_NAME, DummyInput.class.getName());
configMap.put(DataStep.INPUT_TYPE + "." + "starting.partitions", 10);
configMap.put(BatchStep.REPARTITION_COLUMNS_PROPERTY, Lists.newArrayList("modulo == 0"));
configMap.put(BatchStep.REPARTITION_NUM_PARTITIONS_PROPERTY, 5);
Config config = ConfigFactory.parseMap(configMap);
BatchStep batchStep = new BatchStep("test");
batchStep.configure(config);
batchStep.submit(Sets.<Step>newHashSet());
batchStep.getData();
}
public static void main(String[] args) throws AnalysisException {
//Window Specific property if Hadoop is not instaalled or HADOOP_HOME is not set
System.setProperty("hadoop.home.dir", "E:\\hadoop");
//Build a Spark Session
SparkSession sparkSession = SparkSession
.builder()
.master("local")
.config("spark.sql.warehouse.dir","file:///E:/hadoop/warehouse")
.appName("DatasetOperations")
//.enableHiveSupport()
.getOrCreate();
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.WARN);
//Create a RDD
JavaRDD<String> deptRDD = sparkSession.sparkContext()
.textFile("src/main/resources/dept.txt", 1)
.toJavaRDD();
//Convert the RDD to RDD<Rows>
JavaRDD<Row> deptRows = deptRDD.filter(str-> !str.contains("deptno")).map(new Function<String, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Row call(String rowString) throws Exception {
String[] cols = rowString.split(",");
return RowFactory.create(cols[0].trim(), cols[1].trim(),cols[2].trim());
}
});
//Create schema
String[] schemaArr=deptRDD.first().split(",");
List<StructField> structFieldList = new ArrayList<>();
for (String fieldName : schemaArr) {
StructField structField = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
structFieldList.add(structField);
}
StructType schema = DataTypes.createStructType(structFieldList);
Dataset<Row> deptDf = sparkSession.createDataFrame(deptRows, schema);
deptDf.printSchema();
deptDf.show();
deptDf.createOrReplaceTempView("dept");
Dataset<Row> result = sparkSession.sql("select loc,count(loc) from dept where deptno > 10 group by loc" );
result.show();
// sparkSession.newSession().sql("SELECT * FROM dept").show();
deptDf.createGlobalTempView("dept_global_view");
sparkSession.newSession().sql("SELECT deptno,dname,loc, rank() OVER (PARTITION BY loc ORDER BY deptno ) FROM global_temp.dept_global_view").show();
// sparkSession.newSession().sql("SELECT * FROM dept_global_view").show();
deptDf.write().mode(SaveMode.Overwrite).json("src/main/resources/output/dept");
deptDf.write().mode(SaveMode.Overwrite).format("csv").save("src/main/resources/output/deptText");
deptDf.write().mode("overwrite").format("csv").save("src/main/resources/output/deptText");
deptDf.write().mode(SaveMode.Overwrite).format("csv").saveAsTable("Department");
deptDf.write().mode(SaveMode.Overwrite).format("csv").option("path", "file:///E:/hadoop/bin").saveAsTable("Department");
// Read the CSV data
Dataset<Row> emp_ds = sparkSession.read()
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("src/main/resources/employee.txt");
emp_ds.printSchema();
emp_ds.show();
emp_ds.select("empName" ,"empId").show();
emp_ds.select(col("empName").name("Employee Name") ,col("empId").cast(DataTypes.IntegerType).name("Employee Id")).show();
emp_ds.sort(col("empId").asc()).filter(col("salary").gt("2500"));
emp_ds.select("job").groupBy(col("job")).count().show();
//emp_ds.as("A").join(deptDf.as("B"),col("deptno"),"left").printSchema();
emp_ds.as("A").join(deptDf.as("B"),emp_ds.col("deptno").equalTo(deptDf.col("deptno")),"left").select("A.empId","A.empName","A.job","A.manager","A.hiredate","A.salary","A.comm","A.deptno","B.dname","B.loc").show();
emp_ds.join(deptDf,emp_ds.col("deptno").equalTo(deptDf.col("deptno")),"right").show();
emp_ds.join(deptDf,emp_ds.col("deptno").equalTo(deptDf.col("deptno")),"right").logicalPlan();
emp_ds.join(deptDf,emp_ds.col("deptno").equalTo(deptDf.col("deptno")),"right").explain();
sparkSession.sql("show functions").show(false);
sparkSession.sql("DESCRIBE FUNCTION add_months").show(false);
sparkSession.sql("DESCRIBE FUNCTION EXTENDED add_months").show(false);
}
private static void runBasicDataFrameExample(SparkSession spark) throws AnalysisException {
// $example on:create_df$
Dataset<Row> df = spark.read().json(Constant.LOCAL_FILE_PREX +"/data/resources/people.json");
// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// $example off:create_df$
// $example on:untyped_ops$
// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show();
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+
// Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+
// Select people older than 21
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// Count people by age
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
// $example off:untyped_ops$
// $example on:run_sql$
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people");
Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// $example off:run_sql$
// $example on:global_temp_view$
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people");
// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// $example off:global_temp_view$
}
/**
* Helper to write new records using one of HoodieWriteClient's write API and use ReadClient to test filterExists()
* API works correctly.
*
* @param config Hoodie Write Config
* @param writeFn Write Function for writing records
* @throws Exception in case of error
*/
private void testReadFilterExist(HoodieWriteConfig config,
Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn) throws Exception {
try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
HoodieReadClient readClient = getHoodieReadClient(config.getBasePath());
String newCommitTime = writeClient.startCommit();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
JavaRDD<HoodieRecord> filteredRDD = readClient.filterExists(recordsRDD);
// Should not find any files
assertEquals(100, filteredRDD.collect().size());
JavaRDD<HoodieRecord> smallRecordsRDD = jsc.parallelize(records.subList(0, 75), 1);
// We create three parquet file, each having one record. (3 different partitions)
List<WriteStatus> statuses = writeFn.apply(writeClient, smallRecordsRDD, newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
HoodieReadClient anotherReadClient = getHoodieReadClient(config.getBasePath());
filteredRDD = anotherReadClient.filterExists(recordsRDD);
List<HoodieRecord> result = filteredRDD.collect();
// Check results
assertEquals(25, result.size());
// check path exists for written keys
JavaPairRDD<HoodieKey, Option<String>> keyToPathPair =
anotherReadClient.checkExists(recordsRDD.map(r -> r.getKey()));
JavaRDD<HoodieKey> keysWithPaths = keyToPathPair.filter(keyPath -> keyPath._2.isPresent())
.map(keyPath -> keyPath._1);
assertEquals(75, keysWithPaths.count());
// verify rows match inserted records
Dataset<Row> rows = anotherReadClient.readROView(keysWithPaths, 1);
assertEquals(75, rows.count());
JavaRDD<HoodieKey> keysWithoutPaths = keyToPathPair.filter(keyPath -> !keyPath._2.isPresent())
.map(keyPath -> keyPath._1);
assertThrows(AnalysisException.class, () -> {
anotherReadClient.readROView(keysWithoutPaths, 1);
});
// Actual tests of getPendingCompactions method are in TestAsyncCompaction
// This is just testing empty list
assertEquals(0, anotherReadClient.getPendingCompactions().size());
}
}
/**
* Returns a {@link PartitionSpec} for the given table.
* <p>
* This creates a partition spec for an existing table by looking up the table's schema and
* creating a spec with identity partitions for each partition column.
*
* @param spark a Spark session
* @param name a table name and (optional) database
* @return a PartitionSpec for the table
* @throws AnalysisException if thrown by the Spark catalog
*/
public static PartitionSpec specForTable(SparkSession spark, String name) throws AnalysisException {
List<String> parts = Lists.newArrayList(Splitter.on('.').limit(2).split(name));
String db = parts.size() == 1 ? "default" : parts.get(0);
String table = parts.get(parts.size() == 1 ? 0 : 1);
PartitionSpec spec = identitySpec(
schemaForTable(spark, name),
spark.catalog().listColumns(db, table).collectAsList());
return spec == null ? PartitionSpec.unpartitioned() : spec;
}
/**
* Returns a {@link PartitionSpec} for the given table.
* <p>
* This creates a partition spec for an existing table by looking up the table's schema and
* creating a spec with identity partitions for each partition column.
*
* @param spark a Spark session
* @param name a table name and (optional) database
* @return a PartitionSpec for the table, if found
* @throws AnalysisException if thrown by the Spark catalog
*/
public static PartitionSpec specForTable(SparkSession spark, String name) throws AnalysisException {
List<String> parts = Lists.newArrayList(Splitter.on('.').limit(2).split(name));
String db = parts.size() == 1 ? "default" : parts.get(0);
String table = parts.get(parts.size() == 1 ? 0 : 1);
return identitySpec(
schemaForTable(spark, name),
spark.catalog().listColumns(db, table).collectAsList());
}
/** */
public static void main(String args[]) throws AnalysisException {
setupServerAndData();
//Creating Ignite-specific implementation of Spark session.
IgniteSparkSession igniteSession = IgniteSparkSession.builder()
.appName("Spark Ignite catalog example")
.master("local")
.config("spark.executor.instances", "2")
.igniteConfig(CONFIG)
.getOrCreate();
//Adjust the logger to exclude the logs of no interest.
Logger.getRootLogger().setLevel(Level.ERROR);
Logger.getLogger("org.apache.ignite").setLevel(Level.INFO);
System.out.println("List of available tables:");
//Showing existing tables.
igniteSession.catalog().listTables().show();
System.out.println("PERSON table description:");
//Showing `person` schema.
igniteSession.catalog().listColumns("person").show();
System.out.println("CITY table description:");
//Showing `city` schema.
igniteSession.catalog().listColumns("city").show();
println("Querying all persons from city with ID=2.");
//Selecting data through Spark SQL engine.
Dataset<Row> df = igniteSession.sql("SELECT * FROM person WHERE CITY_ID = 2");
System.out.println("Result schema:");
df.printSchema();
System.out.println("Result content:");
df.show();
System.out.println("Querying all persons living in Denver.");
//Selecting data through Spark SQL engine.
Dataset<Row> df2 = igniteSession.sql("SELECT * FROM person p JOIN city c ON c.ID = p.CITY_ID WHERE c.NAME = 'Denver'");
System.out.println("Result schema:");
df2.printSchema();
System.out.println("Result content:");
df2.show();
Ignition.stop(false);
}