类org.apache.spark.sql.sources.EqualTo源码实例Demo

下面列出了怎么用org.apache.spark.sql.sources.EqualTo的API类实例代码及写法,或者点击链接到github查看源代码。

@Test public void testValidFiltersForAvro() {
    ImmutableList<Filter> validFilters = ImmutableList.of(
            EqualTo.apply("foo", "manatee"),
            GreaterThan.apply("foo", "aardvark"),
            GreaterThanOrEqual.apply("bar", 2),
            LessThan.apply("foo", "zebra"),
            LessThanOrEqual.apply("bar", 1),
            In.apply("foo", new Object[] {1, 2, 3}),
            IsNull.apply("foo"),
            IsNotNull.apply("foo"),
            And.apply(IsNull.apply("foo"), IsNotNull.apply("bar")),
            Or.apply(IsNull.apply("foo"), IsNotNull.apply("foo")),
            Not.apply(IsNull.apply("foo")),
            StringStartsWith.apply("foo", "abc"),
            StringEndsWith.apply("foo", "def"),
            StringContains.apply("foo", "abcdef")
    );
    validFilters.forEach(f -> assertThat(SparkFilterUtils.unhandledFilters(AVRO, f)).isEmpty());
}
 
@Test public void testValidFiltersForArrow() {
    ImmutableList<Filter> validFilters = ImmutableList.of(
            EqualTo.apply("foo", "manatee"),
            GreaterThan.apply("foo", "aardvark"),
            GreaterThanOrEqual.apply("bar", 2),
            LessThan.apply("foo", "zebra"),
            LessThanOrEqual.apply("bar", 1),
            In.apply("foo", new Object[] {1, 2, 3}),
            IsNull.apply("foo"),
            IsNotNull.apply("foo"),
            And.apply(IsNull.apply("foo"), IsNotNull.apply("bar")),
            Not.apply(IsNull.apply("foo")),
            StringStartsWith.apply("foo", "abc"),
            StringEndsWith.apply("foo", "def"),
            StringContains.apply("foo", "abcdef")
    );
    validFilters.forEach(f -> assertThat(SparkFilterUtils.unhandledFilters(ARROW, f)).isEmpty());
}
 
private String generateWhereClause(List<Filter> pushed) {
  List<String> filterStr = Lists.newArrayList();
  for (Filter filter : pushed) {
    if (filter instanceof IsNotNull) {
      filterStr.add(String.format("isnotnull(\"%s\")", ((IsNotNull) filter).attribute()));
    } else if (filter instanceof EqualTo) {
      filterStr.add(String.format("\"%s\" = %s", ((EqualTo) filter).attribute(), valueToString(((EqualTo) filter).value())));
    } else if (filter instanceof GreaterThan) {
      filterStr.add(String.format("\"%s\" > %s", ((GreaterThan) filter).attribute(), valueToString(((GreaterThan) filter).value())));
    } else if (filter instanceof GreaterThanOrEqual) {
      filterStr.add(String.format("\"%s\" <= %s", ((GreaterThanOrEqual) filter).attribute(), valueToString(((GreaterThanOrEqual) filter).value())));
    } else if (filter instanceof LessThan) {
      filterStr.add(String.format("\"%s\" < %s", ((LessThan) filter).attribute(), valueToString(((LessThan) filter).value())));
    } else if (filter instanceof LessThanOrEqual) {
      filterStr.add(String.format("\"%s\" <= %s", ((LessThanOrEqual) filter).attribute(), valueToString(((LessThanOrEqual) filter).value())));
    }
    //todo fill out rest of Filter types
  }
  return WHERE_JOINER.join(filterStr);
}
 
private boolean canBePushed(Filter filter) {
  if (filter instanceof IsNotNull) {
    return true;
  } else if (filter instanceof EqualTo) {
    return true;
  }
  if (filter instanceof GreaterThan) {
    return true;
  }
  if (filter instanceof GreaterThanOrEqual) {
    return true;
  }
  if (filter instanceof LessThan) {
    return true;
  }
  if (filter instanceof LessThanOrEqual) {
    return true;
  }
  LOGGER.error("Cant push filter of type " + filter.toString());
  return false;
}
 
源代码5 项目: iceberg   文件: TestFilteredScan.java
@Test
public void testUnpartitionedIDFilters() {
  DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
      "path", unpartitioned.toString())
  );

  IcebergSource source = new IcebergSource();

  for (int i = 0; i < 10; i += 1) {
    DataSourceReader reader = source.createReader(options);

    pushFilters(reader, EqualTo.apply("id", i));

    List<InputPartition<InternalRow>> tasks = reader.planInputPartitions();
    Assert.assertEquals("Should only create one task for a small file", 1, tasks.size());

    // validate row filtering
    assertEqualsSafe(SCHEMA.asStruct(), expected(i),
        read(unpartitioned.toString(), "id = " + i));
  }
}
 
源代码6 项目: iceberg   文件: TestFilteredScan.java
@Test
public void testUnpartitionedIDFilters() {
  CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(ImmutableMap.of(
      "path", unpartitioned.toString())
  );
  SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);

  for (int i = 0; i < 10; i += 1) {
    pushFilters(builder, EqualTo.apply("id", i));
    Batch scan = builder.build().toBatch();

    InputPartition[] partitions = scan.planInputPartitions();
    Assert.assertEquals("Should only create one task for a small file", 1, partitions.length);

    // validate row filtering
    assertEqualsSafe(SCHEMA.asStruct(), expected(i),
        read(unpartitioned.toString(), "id = " + i));
  }
}
 
源代码7 项目: iceberg   文件: TestFilteredScan.java
@Test
public void testBucketPartitionedIDFilters() {
  Table table = buildPartitionedTable("bucketed_by_id", BUCKET_BY_ID, "bucket4", "id");
  CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(ImmutableMap.of("path", table.location()));

  Batch unfiltered = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options).build().toBatch();
  Assert.assertEquals("Unfiltered table should created 4 read tasks",
      4, unfiltered.planInputPartitions().length);

  for (int i = 0; i < 10; i += 1) {
    SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);

    pushFilters(builder, EqualTo.apply("id", i));
    Batch scan = builder.build().toBatch();

    InputPartition[] tasks = scan.planInputPartitions();

    // validate predicate push-down
    Assert.assertEquals("Should create one task for a single bucket", 1, tasks.length);

    // validate row filtering
    assertEqualsSafe(SCHEMA.asStruct(), expected(i), read(table.location(), "id = " + i));
  }
}
 
@Test public void testInvalidFiltersWithAvro() {
    Filter valid1 = EqualTo.apply("foo", "bar");
    Filter valid2 = EqualTo.apply("bar", 1);
    Filter invalid1 = EqualNullSafe.apply("foo", "bar");
    Filter invalid2 = And.apply(EqualTo.apply("foo", "bar"), Not.apply(EqualNullSafe.apply("bar", 1)));
    Iterable<Filter> unhandled = SparkFilterUtils.unhandledFilters(AVRO, valid1, valid2, invalid1, invalid2);
    assertThat(unhandled).containsExactly(invalid1, invalid2);
}
 
@Test public void testInvalidFiltersWithArrow() {
            Filter valid1 = EqualTo.apply("foo", "bar");
    Filter valid2 = EqualTo.apply("bar", 1);
    Filter invalid1 = EqualNullSafe.apply("foo", "bar");
    Filter invalid2 = And.apply(EqualTo.apply("foo", "bar"), Not.apply(EqualNullSafe.apply("bar", 1)));
    Filter invalid3 = Or.apply(IsNull.apply("foo"), IsNotNull.apply("foo"));
    Iterable<Filter> unhandled = SparkFilterUtils.unhandledFilters(ARROW, valid1, valid2, invalid1, invalid2, invalid3);
    assertThat(unhandled).containsExactly(invalid1, invalid2, invalid3);
}
 
源代码10 项目: iceberg   文件: TestFilteredScan.java
@Test
public void testUnpartitionedCaseInsensitiveIDFilters() {
  DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
      "path", unpartitioned.toString())
  );

  // set spark.sql.caseSensitive to false
  String caseSensitivityBeforeTest = TestFilteredScan.spark.conf().get("spark.sql.caseSensitive");
  TestFilteredScan.spark.conf().set("spark.sql.caseSensitive", "false");

  try {
    IcebergSource source = new IcebergSource();

    for (int i = 0; i < 10; i += 1) {
      DataSourceReader reader = source.createReader(options);

      pushFilters(reader, EqualTo.apply("ID", i)); // note lower(ID) == lower(id), so there must be a match

      List<InputPartition<InternalRow>> tasks = reader.planInputPartitions();
      Assert.assertEquals("Should only create one task for a small file", 1, tasks.size());

      // validate row filtering
      assertEqualsSafe(SCHEMA.asStruct(), expected(i),
          read(unpartitioned.toString(), "id = " + i));
    }
  } finally {
    // return global conf to previous state
    TestFilteredScan.spark.conf().set("spark.sql.caseSensitive", caseSensitivityBeforeTest);
  }
}
 
源代码11 项目: iceberg   文件: TestFilteredScan.java
@Test
public void testBucketPartitionedIDFilters() {
  File location = buildPartitionedTable("bucketed_by_id", BUCKET_BY_ID, "bucket4", "id");

  DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
      "path", location.toString())
  );

  IcebergSource source = new IcebergSource();
  DataSourceReader unfiltered = source.createReader(options);
  Assert.assertEquals("Unfiltered table should created 4 read tasks",
      4, unfiltered.planInputPartitions().size());

  for (int i = 0; i < 10; i += 1) {
    DataSourceReader reader = source.createReader(options);

    pushFilters(reader, EqualTo.apply("id", i));

    List<InputPartition<InternalRow>> tasks = reader.planInputPartitions();

    // validate predicate push-down
    Assert.assertEquals("Should create one task for a single bucket", 1, tasks.size());

    // validate row filtering
    assertEqualsSafe(SCHEMA.asStruct(), expected(i), read(location.toString(), "id = " + i));
  }
}
 
源代码12 项目: iceberg   文件: TestFilteredScan.java
@Test
public void testUnpartitionedCaseInsensitiveIDFilters() {
  CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(ImmutableMap.of(
      "path", unpartitioned.toString())
  );

  // set spark.sql.caseSensitive to false
  String caseSensitivityBeforeTest = TestFilteredScan.spark.conf().get("spark.sql.caseSensitive");
  TestFilteredScan.spark.conf().set("spark.sql.caseSensitive", "false");

  try {

    for (int i = 0; i < 10; i += 1) {
      SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options)
          .caseSensitive(false);

      pushFilters(builder, EqualTo.apply("ID", i)); // note lower(ID) == lower(id), so there must be a match
      Batch scan = builder.build().toBatch();

      InputPartition[] tasks = scan.planInputPartitions();
      Assert.assertEquals("Should only create one task for a small file", 1, tasks.length);

      // validate row filtering
      assertEqualsSafe(SCHEMA.asStruct(), expected(i),
          read(unpartitioned.toString(), "id = " + i));
    }
  } finally {
    // return global conf to previous state
    TestFilteredScan.spark.conf().set("spark.sql.caseSensitive", caseSensitivityBeforeTest);
  }
}
 
@Test public void testMultipleValidFiltersAreHandled() {
    Filter valid1 = EqualTo.apply("foo", "bar");
    Filter valid2 = EqualTo.apply("bar", 1);
    assertThat(SparkFilterUtils.unhandledFilters(AVRO, valid1, valid2)).isEmpty();
}
 
 类所在包
 类方法
 同包方法