下面列出了org.openjdk.jmh.annotations.Threads#org.apache.spark.sql.Dataset 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public DataSet< V> intersect(DataSet< V> dataSet, String name, OperationContext context, boolean pushScope, String scopeDetail) throws StandardException {
pushScopeIfNeeded(context, pushScope, scopeDetail);
try {
//Convert this rdd backed iterator to a Spark untyped dataset
Dataset<Row> left = SpliceSpark.getSession()
.createDataFrame(
rdd.map(
new LocatedRowToRowFunction()),
context.getOperation()
.getExecRowDefinition()
.schema());
return new NativeSparkDataSet(left, context).intersect(dataSet, name, context, pushScope, scopeDetail);
}finally {
if (pushScope) context.popScope();
}
}
@Test
public void testDeriveDropRangePartitionBoundariesQuery() {
Map<String, Object> configMap = new HashMap<>();
configMap.put(HOST_CONFIG, "testhost");
configMap.put(QUERY_TYPE_CONFIG, "drop_partition");
configMap.put(QUERY_TABLE_CONFIG, "testtable");
configMap.put(QUERY_PART_RANGE_START_CONFIG, "20190122");
configMap.put(QUERY_PART_RANGE_END_CONFIG, "20190123");
configMap.put(AUTH_CONFIG, "none");
Config config = ConfigFactory.parseMap(configMap);
ImpalaMetadataTask metadataTask = new ImpalaMetadataTask();
metadataTask.configure(config);
Map<String, Dataset<Row>> dependencies = Maps.newHashMap();
String query = metadataTask.deriveQuery(dependencies);
assertEquals("ALTER TABLE testtable DROP IF EXISTS RANGE PARTITION 20190122 <= VALUES < 20190123", query);
}
@Test
public void testWrongField() throws Exception {
thrown.expect(RuntimeException.class);
thrown.expectMessage("Error executing IN list filtering");
Dataset<Row> source = createTestDataframe();
List<String> inListLiteral = Arrays.asList("1", "2", "3");
Map<String, Dataset<Row>> dependencies = new HashMap<>();
dependencies.put("df1", source);
Config config = ConfigFactory.empty()
.withValue(InListDeriver.INLIST_STEP_CONFIG, ConfigValueFactory.fromAnyRef("df1"))
.withValue(InListDeriver.INLIST_FIELD_CONFIG, ConfigValueFactory.fromAnyRef("non_existing_field"))
.withValue(InListDeriver.INLIST_VALUES_CONFIG, ConfigValueFactory.fromIterable(inListLiteral));
InListDeriver deriver = new InListDeriver();
assertNoValidationFailures(deriver, config);
deriver.configure(config);
deriver.derive(dependencies);
}
@Test
public void testExplicitDontAppendRaw() {
Map<String, Object> configMap = Maps.newHashMap();
configMap.put(ComponentFactory.TYPE_CONFIG_NAME, DummyTranslator.class.getName());
configMap.put(TranslateFunction.APPEND_RAW_ENABLED_CONFIG, false);
Config config = ConfigFactory.parseMap(configMap);
TranslateFunction tf = new TranslateFunction(config);
tf.receiveProvidedSchema(tf.getExpectingSchema());
Dataset<Row> raw = Contexts.getSparkSession().createDataFrame(
Lists.newArrayList(RowFactory.create("hello?")), tf.getExpectingSchema());
Dataset<Row> translated = raw.flatMap(tf, RowEncoder.apply(tf.getProvidingSchema()));
assertEquals(1, translated.schema().size());
assertNotEquals("_value", translated.schema().fields()[0].name());
}
@Test
public void testQueryFile() throws Exception {
Contexts.getSparkSession().createDataset(Lists.newArrayList(1), Encoders.INT()).createOrReplaceTempView("literaltable");
Map<String, Object> configMap = Maps.newHashMap();
configMap.put(SQLDeriver.QUERY_FILE_CONFIG_NAME, getClass().getResource("/sql/query_without_parameters.sql").getPath());
Config config = ConfigFactory.parseMap(configMap);
SQLDeriver deriver = new SQLDeriver();
assertNoValidationFailures(deriver, config);
deriver.configure(config);
Object result = deriver.derive(Maps.<String, Dataset<Row>>newHashMap()).collectAsList().get(0).get(0);
assertEquals(1, result);
}
@Test
public void testDataFrameSumDMLMllibVectorWithIDColumn() {
System.out.println("MLContextTest - DataFrame sum DML, mllib vector with ID column");
List<Tuple2<Double, org.apache.spark.mllib.linalg.Vector>> list = new ArrayList<>();
list.add(new Tuple2<>(1.0, org.apache.spark.mllib.linalg.Vectors.dense(1.0, 2.0, 3.0)));
list.add(new Tuple2<>(2.0, org.apache.spark.mllib.linalg.Vectors.dense(4.0, 5.0, 6.0)));
list.add(new Tuple2<>(3.0, org.apache.spark.mllib.linalg.Vectors.dense(7.0, 8.0, 9.0)));
JavaRDD<Tuple2<Double, org.apache.spark.mllib.linalg.Vector>> javaRddTuple = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddTuple.map(new DoubleMllibVectorRow());
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C1", new org.apache.spark.mllib.linalg.VectorUDT(), true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema);
MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_VECTOR_WITH_INDEX);
Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame, mm);
setExpectedStdOut("sum: 45.0");
ml.execute(script);
}
private List<ManifestFile> writeManifestsForUnpartitionedTable(Dataset<Row> manifestEntryDF, int numManifests) {
Broadcast<FileIO> io = sparkContext.broadcast(fileIO);
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
// we rely only on the target number of manifests for unpartitioned tables
// as we should not worry about having too much metadata per partition
long maxNumManifestEntries = Long.MAX_VALUE;
return manifestEntryDF
.repartition(numManifests)
.mapPartitions(
toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
manifestEncoder
)
.collectAsList();
}
/**
* Method to verify result is equals to expect.
*/
private void verifyResultData(List<GenericRecord> expectData) {
Dataset<Row> ds = HoodieClientTestUtils.read(jsc, tablePath, sqlContext, fs, tablePath + "/*/*/*/*");
List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
List<HoodieTripModel> result = readData.stream().map(row ->
new HoodieTripModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
row.getDouble(5), row.getDouble(6), row.getDouble(7)))
.collect(Collectors.toList());
List<HoodieTripModel> expected = expectData.stream().map(g ->
new HoodieTripModel(Double.parseDouble(g.get("timestamp").toString()),
g.get("_row_key").toString(),
g.get("rider").toString(),
g.get("driver").toString(),
Double.parseDouble(g.get("begin_lat").toString()),
Double.parseDouble(g.get("begin_lon").toString()),
Double.parseDouble(g.get("end_lat").toString()),
Double.parseDouble(g.get("end_lon").toString())))
.collect(Collectors.toList());
assertAll("Result list equals",
() -> assertEquals(expected.size(), result.size()),
() -> assertTrue(result.containsAll(expected) && expected.containsAll(result)));
}
@Test
public void testOutputDataFrameFromMatrixDML() {
System.out.println("MLContextTest - output DataFrame from matrix DML");
String s = "M = matrix('1 2 3 4', rows=2, cols=2);";
Script script = dml(s).out("M");
Dataset<Row> df = ml.execute(script).getMatrix("M").toDF();
Dataset<Row> sortedDF = df.sort(RDDConverterUtils.DF_ID_COLUMN);
List<Row> list = sortedDF.collectAsList();
Row row1 = list.get(0);
Assert.assertEquals(1.0, row1.getDouble(0), 0.0);
Assert.assertEquals(1.0, row1.getDouble(1), 0.0);
Assert.assertEquals(2.0, row1.getDouble(2), 0.0);
Row row2 = list.get(1);
Assert.assertEquals(2.0, row2.getDouble(0), 0.0);
Assert.assertEquals(3.0, row2.getDouble(1), 0.0);
Assert.assertEquals(4.0, row2.getDouble(2), 0.0);
}
@Test (expected = RuntimeException.class)
public void testDifferentSchemas() throws Exception {
StructType schema1 = DataTypes.createStructType(Lists.<StructField>newArrayList(
DataTypes.createStructField("col1", DataTypes.StringType, false)));
StructType schema2 = DataTypes.createStructType(Lists.<StructField>newArrayList(
DataTypes.createStructField("col2", DataTypes.StringType, false)));
Dataset<Row> dep1 = Contexts.getSparkSession().createDataFrame(
Lists.newArrayList(RowFactory.create("a")), schema1);
Dataset<Row> dep2= Contexts.getSparkSession().createDataFrame(
Lists.newArrayList(RowFactory.create("b")), schema2);
Map<String, Dataset<Row>> dependencies = Maps.newHashMap();
dependencies.put("dep1", dep1);
dependencies.put("dep2", dep2);
Deriver deriver = new PassthroughDeriver();
deriver.derive(dependencies).collectAsList();
}
public static Dataset<Row> sql(String prj, String sqlText, List<String> parameters) {
if (sqlText == null)
throw new RuntimeException("Sorry your SQL is null...");
try {
logger.info("Try to query from cube....");
long startTs = System.currentTimeMillis();
Dataset<Row> dataset = queryCubeAndSkipCompute(prj, sqlText, parameters);
logger.info("Cool! This sql hits cube...");
logger.info("Duration(ms): {}", (System.currentTimeMillis() - startTs));
return dataset;
} catch (Throwable e) {
logger.error("There is no cube can be used for query [{}]", sqlText);
logger.error("Reasons:", e);
throw new RuntimeException("Error in running query [ " + sqlText.trim() + " ]", e);
}
}
public static void testResultSetToDF(String table, ResultSet[] resultSets) throws SQLException {
try{
Connection conn = DriverManager.getConnection("jdbc:default:connection");
PreparedStatement pstmt = conn.prepareStatement("select * from " + table.toUpperCase());
ResultSet res = pstmt.executeQuery();
// Convert result set to Dataframe
Dataset<Row> resultSetDF = SparkUtils.resultSetToDF(res);
resultSets[0] = res;
// Construct Stored Procedure Result
List<ExecRow> rows = Lists.newArrayList();
ExecRow row = new ValueRow(1);
// System.out.println(resultSetDF.dataset().count());
row.setColumn(1, new SQLLongint(resultSetDF.count()));
rows.add(row);
IteratorNoPutResultSet resultsToWrap = wrapResults((EmbedConnection) conn, rows, DATAFRAME_COUNT_STORED_PROCEDURE_COLUMN_DECSRIPTOR);
resultSets[0] = new EmbedResultSet40((EmbedConnection)conn, resultsToWrap, false, null, true);
conn.close();
}
catch (StandardException e) {
throw new SQLException(Throwables.getRootCause(e));
}
}
private void assertCorrectness(Dataset<Row> rowDataset, Transformer transformer) {
List<Row> sparkOutput = rowDataset.collectAsList();
for (Row row : sparkOutput) {
Map<String, Object> data = new HashMap<>();
data.put("mergedAddress", row.get(0));
List<Object> list = row.getList(1);
String[] sanitizedAddress = new String[list.size()];
for (int j = 0; j < sanitizedAddress.length; j++) {
sanitizedAddress[j] = (String) list.get(j);
}
data.put("sanitizedAddress", sanitizedAddress);
transformer.transform(data);
assertEquals("number of words should be equals", row.get(2), data.get("numWords"));
assertEquals("number of commas should be equals", row.get(3), data.get("numCommas"));
assertEquals("numericPresent should be equals", row.get(4), data.get("numericPresent"));
assertEquals("addressLength should be equals", row.get(5), data.get("addressLength"));
assertEquals("favouredStart should be equals", row.get(6), data.get("favouredStart"));
assertEquals("unfavouredStart should be equals", row.get(7), data.get("unfavouredStart"));
}
}
/**
* Scale based on min,max
*
* @param dataFrame the dataframe to scale
* @param min the minimum value
* @param max the maximum value
* @return the normalized dataframe per column
*/
public static Dataset<Row> normalize(Dataset<Row> dataFrame, double min, double max, List<String> skipColumns) {
List<String> columnsList = DataFrames.toList(dataFrame.columns());
columnsList.removeAll(skipColumns);
String[] columnNames = DataFrames.toArray(columnsList);
//first row is min second row is max, each column in a row is for a particular column
List<Row> minMax = minMaxColumns(dataFrame, columnNames);
for (int i = 0; i < columnNames.length; i++) {
String columnName = columnNames[i];
double dMin = ((Number) minMax.get(0).get(i)).doubleValue();
double dMax = ((Number) minMax.get(1).get(i)).doubleValue();
double maxSubMin = (dMax - dMin);
if (maxSubMin == 0)
maxSubMin = 1;
Column newCol = dataFrame.col(columnName).minus(dMin).divide(maxSubMin).multiply(max - min).plus(min);
dataFrame = dataFrame.withColumn(columnName, newCol);
}
return dataFrame;
}
@Override
public Column getColumnExpression(Dataset<Row> leftDF,
Dataset<Row> rightDF,
Function<String, DataType> convertStringToDataTypeFunction) throws UnsupportedOperationException {
Column leftExpr = getLeftChild().getColumnExpression(leftDF, rightDF, convertStringToDataTypeFunction);
Column rightExpr = getRightChild().getColumnExpression(leftDF, rightDF, convertStringToDataTypeFunction);
if (relOpKind == EQUALS_RELOP)
return leftExpr.equalTo(rightExpr);
else if (relOpKind == NOT_EQUALS_RELOP)
return leftExpr.notEqual(rightExpr);
else if (relOpKind == GREATER_THAN_RELOP)
return leftExpr.gt(rightExpr);
else if (relOpKind == GREATER_EQUALS_RELOP)
return leftExpr.geq(rightExpr);
else if (relOpKind == LESS_THAN_RELOP)
return leftExpr.lt(rightExpr);
else if (relOpKind == LESS_EQUALS_RELOP)
return leftExpr.leq(rightExpr);
else if (relOpKind == IS_NULL_RELOP)
return leftExpr.isNull();
else if (relOpKind == IS_NOT_NULL_RELOP)
return leftExpr.isNotNull();
else
throw new UnsupportedOperationException();
}
@Test
public void testOutputDataFrameDoublesWithIDColumnFromMatrixDML() {
System.out.println("MLContextTest - output DataFrame of doubles with ID column from matrix DML");
String s = "M = matrix('1 2 3 4', rows=2, cols=2);";
Script script = dml(s).out("M");
Dataset<Row> df = ml.execute(script).getMatrix("M").toDFDoubleWithIDColumn();
Dataset<Row> sortedDF = df.sort(RDDConverterUtils.DF_ID_COLUMN);
List<Row> list = sortedDF.collectAsList();
Row row1 = list.get(0);
Assert.assertEquals(1.0, row1.getDouble(0), 0.0);
Assert.assertEquals(1.0, row1.getDouble(1), 0.0);
Assert.assertEquals(2.0, row1.getDouble(2), 0.0);
Row row2 = list.get(1);
Assert.assertEquals(2.0, row2.getDouble(0), 0.0);
Assert.assertEquals(3.0, row2.getDouble(1), 0.0);
Assert.assertEquals(4.0, row2.getDouble(2), 0.0);
}
@Override
public void translateTransform(
PTransform<PCollection<T>, PCollection<T>> transform, TranslationContext context) {
Window.Assign<T> assignTransform = (Window.Assign<T>) transform;
@SuppressWarnings("unchecked")
final PCollection<T> input = (PCollection<T>) context.getInput();
@SuppressWarnings("unchecked")
final PCollection<T> output = (PCollection<T>) context.getOutput();
Dataset<WindowedValue<T>> inputDataset = context.getDataset(input);
if (WindowingHelpers.skipAssignWindows(assignTransform, context)) {
context.putDataset(output, inputDataset);
} else {
WindowFn<T, ?> windowFn = assignTransform.getWindowFn();
WindowedValue.FullWindowedValueCoder<T> windowedValueCoder =
WindowedValue.FullWindowedValueCoder.of(input.getCoder(), windowFn.windowCoder());
Dataset<WindowedValue<T>> outputDataset =
inputDataset.map(
WindowingHelpers.assignWindowsMapFunction(windowFn),
EncoderHelpers.fromBeamCoder(windowedValueCoder));
context.putDataset(output, outputDataset);
}
}
@Test
public void testCountMetricsCollectionForParquet() throws IOException {
String tableLocation = temp.newFolder("iceberg-table").toString();
HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> properties = Maps.newHashMap();
properties.put(TableProperties.DEFAULT_WRITE_METRICS_MODE, "counts");
Table table = tables.create(SIMPLE_SCHEMA, spec, properties, tableLocation);
List<SimpleRecord> expectedRecords = Lists.newArrayList(
new SimpleRecord(1, "a"),
new SimpleRecord(2, "b"),
new SimpleRecord(3, "c")
);
Dataset<Row> df = spark.createDataFrame(expectedRecords, SimpleRecord.class);
df.select("id", "data")
.coalesce(1)
.write()
.format("iceberg")
.option("write-format", "parquet")
.mode("append")
.save(tableLocation);
for (FileScanTask task : table.newScan().includeColumnStats().planFiles()) {
DataFile file = task.file();
Assert.assertEquals(2, file.nullValueCounts().size());
Assert.assertEquals(2, file.valueCounts().size());
Assert.assertTrue(file.lowerBounds().isEmpty());
Assert.assertTrue(file.upperBounds().isEmpty());
}
}
@Test
public void testPlansInserts() {
Config config = ConfigFactory.empty();
AppendPlanner ap = new AppendPlanner();
assertNoValidationFailures(ap, config);
ap.configure(config);
List<Tuple2<MutationType, Dataset<Row>>> planned = ap.planMutationsForSet(dataFrame);
assertEquals(planned.size(), 1);
assertEquals(planned.get(0)._1(), MutationType.INSERT);
assertEquals(planned.get(0)._2().count(), 1);
}
private void start() {
SparkSession spark = SparkSession.builder()
.appName("Array to Dataset<String>")
.master("local")
.getOrCreate();
String[] l = new String[] { "a", "b", "c", "d" };
List<String> data = Arrays.asList(l);
Dataset<String> df = spark.createDataset(data, Encoders.STRING());
df.show();
}
@Test
public void basicVisualizerTest()
{
Pair<Dataset<Row>,Dataset<Row>> pair = getAppleTablePair("Test6", "Test7");
String html = generateString(pair.getLeft(), pair.getRight(), "FRUIT", 100);
if (html.isEmpty())
{
Assert.fail("html was empty");
}
}
@Test
public void nullRightDfTest()
{
Pair<Dataset<Row>,Dataset<Row>> pair = getAppleTablePair("Test1", "Test4");
String html = generateString(pair.getLeft(), null, "FRUIT", 100);
Assert.assertEquals("<h3>Error message: Right dataframe is null</h3>", html);
}
private static Dataset<Row> withDateColumnDictEncoded(Dataset<Row> df) {
return df.withColumn(
"dateCol",
when(modColumn(9, 0), to_date(lit("04/12/2019"), "MM/dd/yyyy"))
.when(modColumn(9, 1), to_date(lit("04/13/2019"), "MM/dd/yyyy"))
.when(modColumn(9, 2), to_date(lit("04/14/2019"), "MM/dd/yyyy"))
.when(modColumn(9, 3), to_date(lit("04/15/2019"), "MM/dd/yyyy"))
.when(modColumn(9, 4), to_date(lit("04/16/2019"), "MM/dd/yyyy"))
.when(modColumn(9, 5), to_date(lit("04/17/2019"), "MM/dd/yyyy"))
.when(modColumn(9, 6), to_date(lit("04/18/2019"), "MM/dd/yyyy"))
.when(modColumn(9, 7), to_date(lit("04/19/2019"), "MM/dd/yyyy"))
.when(modColumn(9, 8), to_date(lit("04/20/2019"), "MM/dd/yyyy")));
}
@Test
public void test2() throws IOException {
List<String> uniprotIds = Arrays.asList("P15056"); // BRAF
String query = "clinvar.rcv.clinical_significance:pathogenic OR clinvar.rcv.clinical_significance:likely pathogenic";
Dataset<Row> ds = MyVariantDataset.getVariations(uniprotIds, query);
assertEquals(1, ds.filter(
"variationId = 'chr7:g.140501287T>C'"
+ " AND uniprotId = 'P15056'").count());
}
@Test
public void testBatchSize() throws Exception {
Dataset<Row> source = createTestDataframe();
Dataset<Row> ref = createTestDataframe().withColumnRenamed("id", "fk").filter("value < 6");
List<String> ids = Arrays.asList("A", "B", "C", "D", "E");
Map<String, Dataset<Row>> dependencies = new HashMap<>();
dependencies.put("df1", source);
dependencies.put("df2", ref);
Config config = ConfigFactory.empty()
.withValue(InListDeriver.INLIST_BATCH_SIZE, ConfigValueFactory.fromAnyRef(1))
.withValue(InListDeriver.INLIST_STEP_CONFIG, ConfigValueFactory.fromAnyRef("df1"))
.withValue(InListDeriver.INLIST_FIELD_CONFIG, ConfigValueFactory.fromAnyRef("id"))
.withValue(InListDeriver.INLIST_REFSTEP_CONFIG, ConfigValueFactory.fromAnyRef("df2"))
.withValue(InListDeriver.INLIST_REFFIELD_CONFIG, ConfigValueFactory.fromAnyRef("fk"));
InListDeriver deriver = new InListDeriver();
assertNoValidationFailures(deriver, config);
deriver.configure(config);
List<Row> results = deriver.derive(dependencies).select("id").collectAsList();
assertThat(results.size(), is(5));
for (Row row : results) {
assertThat(row.getString(0), in(ids));
}
}
@Test
public void testKafkaStreamingImportActivityLevel() throws Exception {
//run main class
String args[] = {"-kb", KAFKA_HOST + ":" + KAFKA_PORT, "-fd", IMPORT_TEST_OUTPUT_DIRECTORY_ACTIVITY, "-bm", "true", "-sr", "false", "-dl", "activity", "-wd", "./src/test/resources/config/kafka_import_activity/","-sm", "overwrite"};
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local[*]");
SparkSession.builder().config(sparkConf).getOrCreate();
KafkaImportApplication.main(args);
//start Spark session
SparkSession sparkSession = SparkSession.builder()
.master("local[*]")
.appName("IntegrationTest")
.getOrCreate();
//generate Dataset and create hash to compare
Dataset<Row> importedDataset = sparkSession.read().load(IMPORT_TEST_OUTPUT_DIRECTORY_ACTIVITY);
//check that dataset contains 55 lines
assertEquals(55, importedDataset.count());
//check hash of dataset
String hash = BpmnaiUtils.getInstance().md5CecksumOfObject(importedDataset.collect());
assertEquals("9CEE92C16D7803E0ECF57666FDAC60D7", hash);
//close Spark session
sparkSession.close();
}
@Benchmark
@Threads(1)
public void readIceberg() {
Map<String, String> tableProperties = Maps.newHashMap();
tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
withTableProperties(tableProperties, () -> {
String tableLocation = table().location();
Dataset<Row> df = spark().read().format("iceberg").load(tableLocation);
materialize(df);
});
}
@Test
public void rollbackToPreviousSnapshotAndReadData() {
long oldId = table.history().get(0).snapshotId();
table.rollback().toSnapshotId(oldId).commit();
table.refresh();
Dataset<Row> results = spark.read()
.format("iceberg")
.load(tableLocation.toString());
results.createOrReplaceTempView("table");
spark.sql("select * from table").show();
}
@Test(expected = EsHadoopIllegalArgumentException.class)
public void test0FailOnIndexCreationDisabled() throws Exception {
String target = wrapIndex(resource("test-nonexisting", "data"));
JavaStreamingQueryTestHarness<RecordBean> test = new JavaStreamingQueryTestHarness<>(spark, Encoders.bean(RecordBean.class));
RecordBean doc1 = new RecordBean();
doc1.setId(1);
doc1.setName("Spark");
RecordBean doc2 = new RecordBean();
doc2.setId(2);
doc2.setName("Hadoop");
RecordBean doc3 = new RecordBean();
doc3.setId(3);
doc3.setName("YARN");
Dataset<RecordBean> dataset = test
.withInput(doc1)
.withInput(doc2)
.withInput(doc3)
.expectingToThrow(EsHadoopIllegalArgumentException.class)
.stream();
test.run(
dataset.writeStream()
.option("checkpointLocation", checkpoint(target))
.option(ES_INDEX_AUTO_CREATE, "no")
.format("es"),
target
);
assertTrue(!RestUtils.exists(target));
}
@Benchmark
@Threads(1)
public void readWithProjectionFileSourceNonVectorized() {
Map<String, String> conf = Maps.newHashMap();
conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false");
conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024));
conf.put(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED().key(), "true");
withSQLConf(conf, () -> {
Dataset<Row> df = spark().read().parquet(dataLocation()).selectExpr("nested.col3");
materialize(df);
});
}