org.openjdk.jmh.annotations.Threads#org.apache.spark.sql.Dataset源码实例Demo

下面列出了org.openjdk.jmh.annotations.Threads#org.apache.spark.sql.Dataset 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: spliceengine   文件: SparkDataSet.java
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public DataSet< V> intersect(DataSet< V> dataSet, String name, OperationContext context, boolean pushScope, String scopeDetail) throws StandardException {
    pushScopeIfNeeded(context, pushScope, scopeDetail);
    try {
        //Convert this rdd backed iterator to a Spark untyped dataset
        Dataset<Row> left = SpliceSpark.getSession()
                .createDataFrame(
                    rdd.map(
                        new LocatedRowToRowFunction()),
                    context.getOperation()
                           .getExecRowDefinition()
                           .schema());

        return new NativeSparkDataSet(left, context).intersect(dataSet, name, context, pushScope, scopeDetail);
    }finally {
        if (pushScope) context.popScope();
    }
}
 
源代码2 项目: envelope   文件: TestImpalaMetadataTask.java
@Test
public void testDeriveDropRangePartitionBoundariesQuery() {
  Map<String, Object> configMap = new HashMap<>();
  configMap.put(HOST_CONFIG, "testhost");
  configMap.put(QUERY_TYPE_CONFIG, "drop_partition");
  configMap.put(QUERY_TABLE_CONFIG, "testtable");
  configMap.put(QUERY_PART_RANGE_START_CONFIG, "20190122");
  configMap.put(QUERY_PART_RANGE_END_CONFIG, "20190123");
  configMap.put(AUTH_CONFIG, "none");
  Config config = ConfigFactory.parseMap(configMap);
  ImpalaMetadataTask metadataTask = new ImpalaMetadataTask();
  metadataTask.configure(config);

  Map<String, Dataset<Row>> dependencies = Maps.newHashMap();
  String query = metadataTask.deriveQuery(dependencies);

  assertEquals("ALTER TABLE testtable DROP IF EXISTS RANGE PARTITION 20190122 <= VALUES < 20190123", query);
}
 
源代码3 项目: envelope   文件: TestInListDeriver.java
@Test
public void testWrongField() throws Exception {
  thrown.expect(RuntimeException.class);
  thrown.expectMessage("Error executing IN list filtering");

  Dataset<Row> source = createTestDataframe();
  List<String> inListLiteral = Arrays.asList("1", "2", "3");

  Map<String, Dataset<Row>> dependencies = new HashMap<>();
  dependencies.put("df1", source);

  Config config = ConfigFactory.empty()
      .withValue(InListDeriver.INLIST_STEP_CONFIG, ConfigValueFactory.fromAnyRef("df1"))
      .withValue(InListDeriver.INLIST_FIELD_CONFIG, ConfigValueFactory.fromAnyRef("non_existing_field"))
      .withValue(InListDeriver.INLIST_VALUES_CONFIG, ConfigValueFactory.fromIterable(inListLiteral));

  InListDeriver deriver = new InListDeriver();

  assertNoValidationFailures(deriver, config);
  deriver.configure(config);

  deriver.derive(dependencies);
}
 
源代码4 项目: envelope   文件: TestTranslateFunction.java
@Test
public void testExplicitDontAppendRaw() {
  Map<String, Object> configMap = Maps.newHashMap();
  configMap.put(ComponentFactory.TYPE_CONFIG_NAME, DummyTranslator.class.getName());
  configMap.put(TranslateFunction.APPEND_RAW_ENABLED_CONFIG, false);
  Config config = ConfigFactory.parseMap(configMap);

  TranslateFunction tf = new TranslateFunction(config);
  tf.receiveProvidedSchema(tf.getExpectingSchema());
  Dataset<Row> raw = Contexts.getSparkSession().createDataFrame(
      Lists.newArrayList(RowFactory.create("hello?")), tf.getExpectingSchema());
  Dataset<Row> translated = raw.flatMap(tf, RowEncoder.apply(tf.getProvidingSchema()));

  assertEquals(1, translated.schema().size());
  assertNotEquals("_value", translated.schema().fields()[0].name());
}
 
源代码5 项目: envelope   文件: TestSQLDeriver.java
@Test
public void testQueryFile() throws Exception {
  Contexts.getSparkSession().createDataset(Lists.newArrayList(1), Encoders.INT()).createOrReplaceTempView("literaltable");

  Map<String, Object> configMap = Maps.newHashMap();
  configMap.put(SQLDeriver.QUERY_FILE_CONFIG_NAME, getClass().getResource("/sql/query_without_parameters.sql").getPath());
  Config config = ConfigFactory.parseMap(configMap);

  SQLDeriver deriver = new SQLDeriver();
  assertNoValidationFailures(deriver, config);
  deriver.configure(config);

  Object result = deriver.derive(Maps.<String, Dataset<Row>>newHashMap()).collectAsList().get(0).get(0);

  assertEquals(1, result);
}
 
源代码6 项目: systemds   文件: MLContextTest.java
@Test
public void testDataFrameSumDMLMllibVectorWithIDColumn() {
	System.out.println("MLContextTest - DataFrame sum DML, mllib vector with ID column");

	List<Tuple2<Double, org.apache.spark.mllib.linalg.Vector>> list = new ArrayList<>();
	list.add(new Tuple2<>(1.0, org.apache.spark.mllib.linalg.Vectors.dense(1.0, 2.0, 3.0)));
	list.add(new Tuple2<>(2.0, org.apache.spark.mllib.linalg.Vectors.dense(4.0, 5.0, 6.0)));
	list.add(new Tuple2<>(3.0, org.apache.spark.mllib.linalg.Vectors.dense(7.0, 8.0, 9.0)));
	JavaRDD<Tuple2<Double, org.apache.spark.mllib.linalg.Vector>> javaRddTuple = sc.parallelize(list);

	JavaRDD<Row> javaRddRow = javaRddTuple.map(new DoubleMllibVectorRow());
	List<StructField> fields = new ArrayList<>();
	fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true));
	fields.add(DataTypes.createStructField("C1", new org.apache.spark.mllib.linalg.VectorUDT(), true));
	StructType schema = DataTypes.createStructType(fields);
	Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema);

	MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_VECTOR_WITH_INDEX);

	Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame, mm);
	setExpectedStdOut("sum: 45.0");
	ml.execute(script);
}
 
源代码7 项目: iceberg   文件: RewriteManifestsAction.java
private List<ManifestFile> writeManifestsForUnpartitionedTable(Dataset<Row> manifestEntryDF, int numManifests) {
  Broadcast<FileIO> io = sparkContext.broadcast(fileIO);
  StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();

  // we rely only on the target number of manifests for unpartitioned tables
  // as we should not worry about having too much metadata per partition
  long maxNumManifestEntries = Long.MAX_VALUE;

  return manifestEntryDF
      .repartition(numManifests)
      .mapPartitions(
          toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
          manifestEncoder
      )
      .collectAsList();
}
 
源代码8 项目: hudi   文件: ITTestHDFSParquetImportCommand.java
/**
 * Method to verify result is equals to expect.
 */
private void verifyResultData(List<GenericRecord> expectData) {
  Dataset<Row> ds = HoodieClientTestUtils.read(jsc, tablePath, sqlContext, fs, tablePath + "/*/*/*/*");

  List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
  List<HoodieTripModel> result = readData.stream().map(row ->
      new HoodieTripModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
          row.getDouble(5), row.getDouble(6), row.getDouble(7)))
      .collect(Collectors.toList());

  List<HoodieTripModel> expected = expectData.stream().map(g ->
      new HoodieTripModel(Double.parseDouble(g.get("timestamp").toString()),
          g.get("_row_key").toString(),
          g.get("rider").toString(),
          g.get("driver").toString(),
          Double.parseDouble(g.get("begin_lat").toString()),
          Double.parseDouble(g.get("begin_lon").toString()),
          Double.parseDouble(g.get("end_lat").toString()),
          Double.parseDouble(g.get("end_lon").toString())))
      .collect(Collectors.toList());

  assertAll("Result list equals",
      () -> assertEquals(expected.size(), result.size()),
      () -> assertTrue(result.containsAll(expected) && expected.containsAll(result)));
}
 
源代码9 项目: systemds   文件: MLContextTest.java
@Test
public void testOutputDataFrameFromMatrixDML() {
	System.out.println("MLContextTest - output DataFrame from matrix DML");

	String s = "M = matrix('1 2 3 4', rows=2, cols=2);";
	Script script = dml(s).out("M");
	Dataset<Row> df = ml.execute(script).getMatrix("M").toDF();
	Dataset<Row> sortedDF = df.sort(RDDConverterUtils.DF_ID_COLUMN);
	List<Row> list = sortedDF.collectAsList();
	Row row1 = list.get(0);
	Assert.assertEquals(1.0, row1.getDouble(0), 0.0);
	Assert.assertEquals(1.0, row1.getDouble(1), 0.0);
	Assert.assertEquals(2.0, row1.getDouble(2), 0.0);

	Row row2 = list.get(1);
	Assert.assertEquals(2.0, row2.getDouble(0), 0.0);
	Assert.assertEquals(3.0, row2.getDouble(1), 0.0);
	Assert.assertEquals(4.0, row2.getDouble(2), 0.0);
}
 
源代码10 项目: envelope   文件: TestPassthroughDeriver.java
@Test (expected = RuntimeException.class)
public void testDifferentSchemas() throws Exception {
  StructType schema1 = DataTypes.createStructType(Lists.<StructField>newArrayList(
      DataTypes.createStructField("col1", DataTypes.StringType, false)));
  StructType schema2 = DataTypes.createStructType(Lists.<StructField>newArrayList(
      DataTypes.createStructField("col2", DataTypes.StringType, false)));
  Dataset<Row> dep1 = Contexts.getSparkSession().createDataFrame(
      Lists.newArrayList(RowFactory.create("a")), schema1);
  Dataset<Row> dep2= Contexts.getSparkSession().createDataFrame(
      Lists.newArrayList(RowFactory.create("b")), schema2);
  Map<String, Dataset<Row>> dependencies = Maps.newHashMap();
  dependencies.put("dep1", dep1);
  dependencies.put("dep2", dep2);

  Deriver deriver = new PassthroughDeriver();

  deriver.derive(dependencies).collectAsList();
}
 
源代码11 项目: kylin-on-parquet-v2   文件: NExecAndComp.java
public static Dataset<Row> sql(String prj, String sqlText, List<String> parameters) {
    if (sqlText == null)
        throw new RuntimeException("Sorry your SQL is null...");

    try {
        logger.info("Try to query from cube....");
        long startTs = System.currentTimeMillis();
        Dataset<Row> dataset = queryCubeAndSkipCompute(prj, sqlText, parameters);
        logger.info("Cool! This sql hits cube...");
        logger.info("Duration(ms): {}", (System.currentTimeMillis() - startTs));
        return dataset;
    } catch (Throwable e) {
        logger.error("There is no cube can be used for query [{}]", sqlText);
        logger.error("Reasons:", e);
        throw new RuntimeException("Error in running query [ " + sqlText.trim() + " ]", e);
    }
}
 
源代码12 项目: spliceengine   文件: DataFrameIT.java
public static void testResultSetToDF(String table, ResultSet[] resultSets) throws SQLException {

    try{
        Connection conn = DriverManager.getConnection("jdbc:default:connection");
        PreparedStatement pstmt = conn.prepareStatement("select * from " + table.toUpperCase());
        ResultSet res = pstmt.executeQuery();
        // Convert result set to Dataframe
        Dataset<Row> resultSetDF = SparkUtils.resultSetToDF(res);
        resultSets[0] = res;

            // Construct Stored Procedure Result
            List<ExecRow> rows = Lists.newArrayList();
            ExecRow row = new ValueRow(1);
            // System.out.println(resultSetDF.dataset().count());
            row.setColumn(1, new SQLLongint(resultSetDF.count()));
            rows.add(row);
            IteratorNoPutResultSet resultsToWrap = wrapResults((EmbedConnection) conn, rows, DATAFRAME_COUNT_STORED_PROCEDURE_COLUMN_DECSRIPTOR);
            resultSets[0] = new EmbedResultSet40((EmbedConnection)conn, resultsToWrap, false, null, true);

            conn.close();
        }
        catch (StandardException e) {
            throw new SQLException(Throwables.getRootCause(e));
        }
    }
 
private void assertCorrectness(Dataset<Row> rowDataset, Transformer transformer) {
	List<Row> sparkOutput = rowDataset.collectAsList();

	for (Row row : sparkOutput) {
		Map<String, Object> data = new HashMap<>();
		data.put("mergedAddress", row.get(0));

		List<Object> list = row.getList(1);
		String[] sanitizedAddress = new String[list.size()];
		for (int j = 0; j < sanitizedAddress.length; j++) {
			sanitizedAddress[j] = (String) list.get(j);
		}

		data.put("sanitizedAddress", sanitizedAddress);
		transformer.transform(data);

		assertEquals("number of words should be equals", row.get(2), data.get("numWords"));
		assertEquals("number of commas should be equals", row.get(3), data.get("numCommas"));
		assertEquals("numericPresent should be equals", row.get(4), data.get("numericPresent"));
		assertEquals("addressLength should be equals", row.get(5), data.get("addressLength"));
		assertEquals("favouredStart should be equals", row.get(6), data.get("favouredStart"));
		assertEquals("unfavouredStart should be equals", row.get(7), data.get("unfavouredStart"));
	}
}
 
源代码14 项目: deeplearning4j   文件: Normalization.java
/**
 * Scale based on min,max
 *
 * @param dataFrame the dataframe to scale
 * @param min       the minimum value
 * @param max       the maximum value
 * @return the normalized dataframe per column
 */
public static Dataset<Row> normalize(Dataset<Row> dataFrame, double min, double max, List<String> skipColumns) {
    List<String> columnsList = DataFrames.toList(dataFrame.columns());
    columnsList.removeAll(skipColumns);
    String[] columnNames = DataFrames.toArray(columnsList);
    //first row is min second row is max, each column in a row is for a particular column
    List<Row> minMax = minMaxColumns(dataFrame, columnNames);
    for (int i = 0; i < columnNames.length; i++) {
        String columnName = columnNames[i];
        double dMin = ((Number) minMax.get(0).get(i)).doubleValue();
        double dMax = ((Number) minMax.get(1).get(i)).doubleValue();
        double maxSubMin = (dMax - dMin);
        if (maxSubMin == 0)
            maxSubMin = 1;

        Column newCol = dataFrame.col(columnName).minus(dMin).divide(maxSubMin).multiply(max - min).plus(min);
        dataFrame = dataFrame.withColumn(columnName, newCol);
    }


    return dataFrame;
}
 
源代码15 项目: spliceengine   文件: SparkRelationalOperator.java
@Override
public Column getColumnExpression(Dataset<Row> leftDF,
                                  Dataset<Row> rightDF,
                                  Function<String, DataType> convertStringToDataTypeFunction) throws UnsupportedOperationException {
    Column leftExpr  = getLeftChild().getColumnExpression(leftDF, rightDF, convertStringToDataTypeFunction);
    Column rightExpr = getRightChild().getColumnExpression(leftDF, rightDF, convertStringToDataTypeFunction);

    if (relOpKind == EQUALS_RELOP)
        return leftExpr.equalTo(rightExpr);
    else if (relOpKind == NOT_EQUALS_RELOP)
        return leftExpr.notEqual(rightExpr);
    else if (relOpKind == GREATER_THAN_RELOP)
        return leftExpr.gt(rightExpr);
    else if (relOpKind == GREATER_EQUALS_RELOP)
        return leftExpr.geq(rightExpr);
    else if (relOpKind == LESS_THAN_RELOP)
        return leftExpr.lt(rightExpr);
    else if (relOpKind == LESS_EQUALS_RELOP)
        return leftExpr.leq(rightExpr);
    else if (relOpKind == IS_NULL_RELOP)
        return leftExpr.isNull();
    else if (relOpKind == IS_NOT_NULL_RELOP)
        return leftExpr.isNotNull();
    else
        throw new UnsupportedOperationException();
}
 
源代码16 项目: systemds   文件: MLContextTest.java
@Test
public void testOutputDataFrameDoublesWithIDColumnFromMatrixDML() {
	System.out.println("MLContextTest - output DataFrame of doubles with ID column from matrix DML");

	String s = "M = matrix('1 2 3 4', rows=2, cols=2);";
	Script script = dml(s).out("M");
	Dataset<Row> df = ml.execute(script).getMatrix("M").toDFDoubleWithIDColumn();
	Dataset<Row> sortedDF = df.sort(RDDConverterUtils.DF_ID_COLUMN);
	List<Row> list = sortedDF.collectAsList();

	Row row1 = list.get(0);
	Assert.assertEquals(1.0, row1.getDouble(0), 0.0);
	Assert.assertEquals(1.0, row1.getDouble(1), 0.0);
	Assert.assertEquals(2.0, row1.getDouble(2), 0.0);

	Row row2 = list.get(1);
	Assert.assertEquals(2.0, row2.getDouble(0), 0.0);
	Assert.assertEquals(3.0, row2.getDouble(1), 0.0);
	Assert.assertEquals(4.0, row2.getDouble(2), 0.0);
}
 
源代码17 项目: beam   文件: WindowAssignTranslatorBatch.java
@Override
public void translateTransform(
    PTransform<PCollection<T>, PCollection<T>> transform, TranslationContext context) {

  Window.Assign<T> assignTransform = (Window.Assign<T>) transform;
  @SuppressWarnings("unchecked")
  final PCollection<T> input = (PCollection<T>) context.getInput();
  @SuppressWarnings("unchecked")
  final PCollection<T> output = (PCollection<T>) context.getOutput();

  Dataset<WindowedValue<T>> inputDataset = context.getDataset(input);
  if (WindowingHelpers.skipAssignWindows(assignTransform, context)) {
    context.putDataset(output, inputDataset);
  } else {
    WindowFn<T, ?> windowFn = assignTransform.getWindowFn();
    WindowedValue.FullWindowedValueCoder<T> windowedValueCoder =
        WindowedValue.FullWindowedValueCoder.of(input.getCoder(), windowFn.windowCoder());
    Dataset<WindowedValue<T>> outputDataset =
        inputDataset.map(
            WindowingHelpers.assignWindowsMapFunction(windowFn),
            EncoderHelpers.fromBeamCoder(windowedValueCoder));
    context.putDataset(output, outputDataset);
  }
}
 
源代码18 项目: iceberg   文件: TestWriteMetricsConfig.java
@Test
public void testCountMetricsCollectionForParquet() throws IOException {
  String tableLocation = temp.newFolder("iceberg-table").toString();

  HadoopTables tables = new HadoopTables(CONF);
  PartitionSpec spec = PartitionSpec.unpartitioned();
  Map<String, String> properties = Maps.newHashMap();
  properties.put(TableProperties.DEFAULT_WRITE_METRICS_MODE, "counts");
  Table table = tables.create(SIMPLE_SCHEMA, spec, properties, tableLocation);

  List<SimpleRecord> expectedRecords = Lists.newArrayList(
      new SimpleRecord(1, "a"),
      new SimpleRecord(2, "b"),
      new SimpleRecord(3, "c")
  );
  Dataset<Row> df = spark.createDataFrame(expectedRecords, SimpleRecord.class);
  df.select("id", "data")
      .coalesce(1)
      .write()
      .format("iceberg")
      .option("write-format", "parquet")
      .mode("append")
      .save(tableLocation);

  for (FileScanTask task : table.newScan().includeColumnStats().planFiles()) {
    DataFile file = task.file();
    Assert.assertEquals(2, file.nullValueCounts().size());
    Assert.assertEquals(2, file.valueCounts().size());
    Assert.assertTrue(file.lowerBounds().isEmpty());
    Assert.assertTrue(file.upperBounds().isEmpty());
  }
}
 
源代码19 项目: envelope   文件: TestAppendPlanner.java
@Test
public void testPlansInserts() {
  Config config = ConfigFactory.empty();
  AppendPlanner ap = new AppendPlanner();
  assertNoValidationFailures(ap, config);
  ap.configure(config);

  List<Tuple2<MutationType, Dataset<Row>>> planned = ap.planMutationsForSet(dataFrame);

  assertEquals(planned.size(), 1);
  assertEquals(planned.get(0)._1(), MutationType.INSERT);
  assertEquals(planned.get(0)._2().count(), 1);
}
 
源代码20 项目: net.jgp.labs.spark   文件: ArrayToDatasetApp.java
private void start() {
  SparkSession spark = SparkSession.builder()
      .appName("Array to Dataset<String>")
      .master("local")
      .getOrCreate();

  String[] l = new String[] { "a", "b", "c", "d" };
  List<String> data = Arrays.asList(l);
  Dataset<String> df = spark.createDataset(data, Encoders.STRING());
  df.show();
}
 
源代码21 项目: MegaSparkDiff   文件: VisualizerTest.java
@Test
public void basicVisualizerTest()
{
    Pair<Dataset<Row>,Dataset<Row>> pair = getAppleTablePair("Test6", "Test7");
    String html = generateString(pair.getLeft(), pair.getRight(), "FRUIT", 100);
    if (html.isEmpty())
    {
        Assert.fail("html was empty");
    }
}
 
源代码22 项目: MegaSparkDiff   文件: VisualizerTest.java
@Test
public void nullRightDfTest()
{
    Pair<Dataset<Row>,Dataset<Row>> pair = getAppleTablePair("Test1", "Test4");
    String html = generateString(pair.getLeft(), null, "FRUIT", 100);
    Assert.assertEquals("<h3>Error message: Right dataframe is null</h3>", html);
}
 
private static Dataset<Row> withDateColumnDictEncoded(Dataset<Row> df) {
  return df.withColumn(
      "dateCol",
      when(modColumn(9, 0), to_date(lit("04/12/2019"), "MM/dd/yyyy"))
          .when(modColumn(9, 1), to_date(lit("04/13/2019"), "MM/dd/yyyy"))
          .when(modColumn(9, 2), to_date(lit("04/14/2019"), "MM/dd/yyyy"))
          .when(modColumn(9, 3), to_date(lit("04/15/2019"), "MM/dd/yyyy"))
          .when(modColumn(9, 4), to_date(lit("04/16/2019"), "MM/dd/yyyy"))
          .when(modColumn(9, 5), to_date(lit("04/17/2019"), "MM/dd/yyyy"))
          .when(modColumn(9, 6), to_date(lit("04/18/2019"), "MM/dd/yyyy"))
          .when(modColumn(9, 7), to_date(lit("04/19/2019"), "MM/dd/yyyy"))
          .when(modColumn(9, 8), to_date(lit("04/20/2019"), "MM/dd/yyyy")));
}
 
源代码24 项目: mmtf-spark   文件: MyVariantDatasetTest.java
@Test
public void test2() throws IOException {
    List<String> uniprotIds = Arrays.asList("P15056"); // BRAF
    String query = "clinvar.rcv.clinical_significance:pathogenic OR clinvar.rcv.clinical_significance:likely pathogenic";
    Dataset<Row> ds = MyVariantDataset.getVariations(uniprotIds, query);
    assertEquals(1, ds.filter(
            "variationId = 'chr7:g.140501287T>C'"
            + " AND uniprotId = 'P15056'").count());
}
 
源代码25 项目: envelope   文件: TestInListDeriver.java
@Test
public void testBatchSize() throws Exception {
  Dataset<Row> source = createTestDataframe();
  Dataset<Row> ref = createTestDataframe().withColumnRenamed("id", "fk").filter("value < 6");

  List<String> ids = Arrays.asList("A", "B", "C", "D", "E");

  Map<String, Dataset<Row>> dependencies = new HashMap<>();
  dependencies.put("df1", source);
  dependencies.put("df2", ref);

  Config config = ConfigFactory.empty()
      .withValue(InListDeriver.INLIST_BATCH_SIZE, ConfigValueFactory.fromAnyRef(1))
      .withValue(InListDeriver.INLIST_STEP_CONFIG, ConfigValueFactory.fromAnyRef("df1"))
      .withValue(InListDeriver.INLIST_FIELD_CONFIG, ConfigValueFactory.fromAnyRef("id"))
      .withValue(InListDeriver.INLIST_REFSTEP_CONFIG, ConfigValueFactory.fromAnyRef("df2"))
      .withValue(InListDeriver.INLIST_REFFIELD_CONFIG, ConfigValueFactory.fromAnyRef("fk"));

  InListDeriver deriver = new InListDeriver();

  assertNoValidationFailures(deriver, config);
  deriver.configure(config);

  List<Row> results = deriver.derive(dependencies).select("id").collectAsList();
  assertThat(results.size(), is(5));

  for (Row row : results) {
    assertThat(row.getString(0), in(ids));
  }
}
 
@Test
public void testKafkaStreamingImportActivityLevel() throws Exception {
    //run main class
    String args[] = {"-kb", KAFKA_HOST + ":" + KAFKA_PORT, "-fd", IMPORT_TEST_OUTPUT_DIRECTORY_ACTIVITY, "-bm", "true", "-sr", "false", "-dl", "activity", "-wd", "./src/test/resources/config/kafka_import_activity/","-sm", "overwrite"};
    SparkConf sparkConf = new SparkConf();
    sparkConf.setMaster("local[*]");
    SparkSession.builder().config(sparkConf).getOrCreate();
    KafkaImportApplication.main(args);

    //start Spark session
    SparkSession sparkSession = SparkSession.builder()
            .master("local[*]")
            .appName("IntegrationTest")
            .getOrCreate();

    //generate Dataset and create hash to compare
    Dataset<Row> importedDataset = sparkSession.read().load(IMPORT_TEST_OUTPUT_DIRECTORY_ACTIVITY);

    //check that dataset contains 55 lines
    assertEquals(55, importedDataset.count());

    //check hash of dataset
    String hash = BpmnaiUtils.getInstance().md5CecksumOfObject(importedDataset.collect());
    assertEquals("9CEE92C16D7803E0ECF57666FDAC60D7", hash);

    //close Spark session
    sparkSession.close();
}
 
@Benchmark
@Threads(1)
public void readIceberg() {
  Map<String, String> tableProperties = Maps.newHashMap();
  tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
  withTableProperties(tableProperties, () -> {
    String tableLocation = table().location();
    Dataset<Row> df = spark().read().format("iceberg").load(tableLocation);
    materialize(df);
  });
}
 
源代码28 项目: iceberg   文件: SnapshotFunctionalityTest.java
@Test
public void rollbackToPreviousSnapshotAndReadData() {
  long oldId = table.history().get(0).snapshotId();

  table.rollback().toSnapshotId(oldId).commit();
  table.refresh();

  Dataset<Row> results = spark.read()
      .format("iceberg")
      .load(tableLocation.toString());

  results.createOrReplaceTempView("table");
  spark.sql("select * from table").show();
}
 
@Test(expected = EsHadoopIllegalArgumentException.class)
public void test0FailOnIndexCreationDisabled() throws Exception {
    String target = wrapIndex(resource("test-nonexisting", "data"));
    JavaStreamingQueryTestHarness<RecordBean> test = new JavaStreamingQueryTestHarness<>(spark, Encoders.bean(RecordBean.class));

    RecordBean doc1 = new RecordBean();
    doc1.setId(1);
    doc1.setName("Spark");

    RecordBean doc2 = new RecordBean();
    doc2.setId(2);
    doc2.setName("Hadoop");

    RecordBean doc3 = new RecordBean();
    doc3.setId(3);
    doc3.setName("YARN");

    Dataset<RecordBean> dataset = test
            .withInput(doc1)
            .withInput(doc2)
            .withInput(doc3)
            .expectingToThrow(EsHadoopIllegalArgumentException.class)
            .stream();

    test.run(
            dataset.writeStream()
                    .option("checkpointLocation", checkpoint(target))
                    .option(ES_INDEX_AUTO_CREATE, "no")
                    .format("es"),
            target
    );

    assertTrue(!RestUtils.exists(target));
}
 
@Benchmark
@Threads(1)
public void readWithProjectionFileSourceNonVectorized() {
  Map<String, String> conf = Maps.newHashMap();
  conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false");
  conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024));
  conf.put(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED().key(), "true");
  withSQLConf(conf, () -> {
    Dataset<Row> df = spark().read().parquet(dataLocation()).selectExpr("nested.col3");
    materialize(df);
  });
}