下面列出了怎么用org.apache.spark.sql.sources.EqualTo的API类实例代码及写法,或者点击链接到github查看源代码。
@Test public void testValidFiltersForAvro() {
ImmutableList<Filter> validFilters = ImmutableList.of(
EqualTo.apply("foo", "manatee"),
GreaterThan.apply("foo", "aardvark"),
GreaterThanOrEqual.apply("bar", 2),
LessThan.apply("foo", "zebra"),
LessThanOrEqual.apply("bar", 1),
In.apply("foo", new Object[] {1, 2, 3}),
IsNull.apply("foo"),
IsNotNull.apply("foo"),
And.apply(IsNull.apply("foo"), IsNotNull.apply("bar")),
Or.apply(IsNull.apply("foo"), IsNotNull.apply("foo")),
Not.apply(IsNull.apply("foo")),
StringStartsWith.apply("foo", "abc"),
StringEndsWith.apply("foo", "def"),
StringContains.apply("foo", "abcdef")
);
validFilters.forEach(f -> assertThat(SparkFilterUtils.unhandledFilters(AVRO, f)).isEmpty());
}
@Test public void testValidFiltersForArrow() {
ImmutableList<Filter> validFilters = ImmutableList.of(
EqualTo.apply("foo", "manatee"),
GreaterThan.apply("foo", "aardvark"),
GreaterThanOrEqual.apply("bar", 2),
LessThan.apply("foo", "zebra"),
LessThanOrEqual.apply("bar", 1),
In.apply("foo", new Object[] {1, 2, 3}),
IsNull.apply("foo"),
IsNotNull.apply("foo"),
And.apply(IsNull.apply("foo"), IsNotNull.apply("bar")),
Not.apply(IsNull.apply("foo")),
StringStartsWith.apply("foo", "abc"),
StringEndsWith.apply("foo", "def"),
StringContains.apply("foo", "abcdef")
);
validFilters.forEach(f -> assertThat(SparkFilterUtils.unhandledFilters(ARROW, f)).isEmpty());
}
private String generateWhereClause(List<Filter> pushed) {
List<String> filterStr = Lists.newArrayList();
for (Filter filter : pushed) {
if (filter instanceof IsNotNull) {
filterStr.add(String.format("isnotnull(\"%s\")", ((IsNotNull) filter).attribute()));
} else if (filter instanceof EqualTo) {
filterStr.add(String.format("\"%s\" = %s", ((EqualTo) filter).attribute(), valueToString(((EqualTo) filter).value())));
} else if (filter instanceof GreaterThan) {
filterStr.add(String.format("\"%s\" > %s", ((GreaterThan) filter).attribute(), valueToString(((GreaterThan) filter).value())));
} else if (filter instanceof GreaterThanOrEqual) {
filterStr.add(String.format("\"%s\" <= %s", ((GreaterThanOrEqual) filter).attribute(), valueToString(((GreaterThanOrEqual) filter).value())));
} else if (filter instanceof LessThan) {
filterStr.add(String.format("\"%s\" < %s", ((LessThan) filter).attribute(), valueToString(((LessThan) filter).value())));
} else if (filter instanceof LessThanOrEqual) {
filterStr.add(String.format("\"%s\" <= %s", ((LessThanOrEqual) filter).attribute(), valueToString(((LessThanOrEqual) filter).value())));
}
//todo fill out rest of Filter types
}
return WHERE_JOINER.join(filterStr);
}
private boolean canBePushed(Filter filter) {
if (filter instanceof IsNotNull) {
return true;
} else if (filter instanceof EqualTo) {
return true;
}
if (filter instanceof GreaterThan) {
return true;
}
if (filter instanceof GreaterThanOrEqual) {
return true;
}
if (filter instanceof LessThan) {
return true;
}
if (filter instanceof LessThanOrEqual) {
return true;
}
LOGGER.error("Cant push filter of type " + filter.toString());
return false;
}
@Test
public void testUnpartitionedIDFilters() {
DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
"path", unpartitioned.toString())
);
IcebergSource source = new IcebergSource();
for (int i = 0; i < 10; i += 1) {
DataSourceReader reader = source.createReader(options);
pushFilters(reader, EqualTo.apply("id", i));
List<InputPartition<InternalRow>> tasks = reader.planInputPartitions();
Assert.assertEquals("Should only create one task for a small file", 1, tasks.size());
// validate row filtering
assertEqualsSafe(SCHEMA.asStruct(), expected(i),
read(unpartitioned.toString(), "id = " + i));
}
}
@Test
public void testUnpartitionedIDFilters() {
CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(ImmutableMap.of(
"path", unpartitioned.toString())
);
SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
for (int i = 0; i < 10; i += 1) {
pushFilters(builder, EqualTo.apply("id", i));
Batch scan = builder.build().toBatch();
InputPartition[] partitions = scan.planInputPartitions();
Assert.assertEquals("Should only create one task for a small file", 1, partitions.length);
// validate row filtering
assertEqualsSafe(SCHEMA.asStruct(), expected(i),
read(unpartitioned.toString(), "id = " + i));
}
}
@Test
public void testBucketPartitionedIDFilters() {
Table table = buildPartitionedTable("bucketed_by_id", BUCKET_BY_ID, "bucket4", "id");
CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(ImmutableMap.of("path", table.location()));
Batch unfiltered = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options).build().toBatch();
Assert.assertEquals("Unfiltered table should created 4 read tasks",
4, unfiltered.planInputPartitions().length);
for (int i = 0; i < 10; i += 1) {
SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
pushFilters(builder, EqualTo.apply("id", i));
Batch scan = builder.build().toBatch();
InputPartition[] tasks = scan.planInputPartitions();
// validate predicate push-down
Assert.assertEquals("Should create one task for a single bucket", 1, tasks.length);
// validate row filtering
assertEqualsSafe(SCHEMA.asStruct(), expected(i), read(table.location(), "id = " + i));
}
}
@Test public void testInvalidFiltersWithAvro() {
Filter valid1 = EqualTo.apply("foo", "bar");
Filter valid2 = EqualTo.apply("bar", 1);
Filter invalid1 = EqualNullSafe.apply("foo", "bar");
Filter invalid2 = And.apply(EqualTo.apply("foo", "bar"), Not.apply(EqualNullSafe.apply("bar", 1)));
Iterable<Filter> unhandled = SparkFilterUtils.unhandledFilters(AVRO, valid1, valid2, invalid1, invalid2);
assertThat(unhandled).containsExactly(invalid1, invalid2);
}
@Test public void testInvalidFiltersWithArrow() {
Filter valid1 = EqualTo.apply("foo", "bar");
Filter valid2 = EqualTo.apply("bar", 1);
Filter invalid1 = EqualNullSafe.apply("foo", "bar");
Filter invalid2 = And.apply(EqualTo.apply("foo", "bar"), Not.apply(EqualNullSafe.apply("bar", 1)));
Filter invalid3 = Or.apply(IsNull.apply("foo"), IsNotNull.apply("foo"));
Iterable<Filter> unhandled = SparkFilterUtils.unhandledFilters(ARROW, valid1, valid2, invalid1, invalid2, invalid3);
assertThat(unhandled).containsExactly(invalid1, invalid2, invalid3);
}
@Test
public void testUnpartitionedCaseInsensitiveIDFilters() {
DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
"path", unpartitioned.toString())
);
// set spark.sql.caseSensitive to false
String caseSensitivityBeforeTest = TestFilteredScan.spark.conf().get("spark.sql.caseSensitive");
TestFilteredScan.spark.conf().set("spark.sql.caseSensitive", "false");
try {
IcebergSource source = new IcebergSource();
for (int i = 0; i < 10; i += 1) {
DataSourceReader reader = source.createReader(options);
pushFilters(reader, EqualTo.apply("ID", i)); // note lower(ID) == lower(id), so there must be a match
List<InputPartition<InternalRow>> tasks = reader.planInputPartitions();
Assert.assertEquals("Should only create one task for a small file", 1, tasks.size());
// validate row filtering
assertEqualsSafe(SCHEMA.asStruct(), expected(i),
read(unpartitioned.toString(), "id = " + i));
}
} finally {
// return global conf to previous state
TestFilteredScan.spark.conf().set("spark.sql.caseSensitive", caseSensitivityBeforeTest);
}
}
@Test
public void testBucketPartitionedIDFilters() {
File location = buildPartitionedTable("bucketed_by_id", BUCKET_BY_ID, "bucket4", "id");
DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
"path", location.toString())
);
IcebergSource source = new IcebergSource();
DataSourceReader unfiltered = source.createReader(options);
Assert.assertEquals("Unfiltered table should created 4 read tasks",
4, unfiltered.planInputPartitions().size());
for (int i = 0; i < 10; i += 1) {
DataSourceReader reader = source.createReader(options);
pushFilters(reader, EqualTo.apply("id", i));
List<InputPartition<InternalRow>> tasks = reader.planInputPartitions();
// validate predicate push-down
Assert.assertEquals("Should create one task for a single bucket", 1, tasks.size());
// validate row filtering
assertEqualsSafe(SCHEMA.asStruct(), expected(i), read(location.toString(), "id = " + i));
}
}
@Test
public void testUnpartitionedCaseInsensitiveIDFilters() {
CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(ImmutableMap.of(
"path", unpartitioned.toString())
);
// set spark.sql.caseSensitive to false
String caseSensitivityBeforeTest = TestFilteredScan.spark.conf().get("spark.sql.caseSensitive");
TestFilteredScan.spark.conf().set("spark.sql.caseSensitive", "false");
try {
for (int i = 0; i < 10; i += 1) {
SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options)
.caseSensitive(false);
pushFilters(builder, EqualTo.apply("ID", i)); // note lower(ID) == lower(id), so there must be a match
Batch scan = builder.build().toBatch();
InputPartition[] tasks = scan.planInputPartitions();
Assert.assertEquals("Should only create one task for a small file", 1, tasks.length);
// validate row filtering
assertEqualsSafe(SCHEMA.asStruct(), expected(i),
read(unpartitioned.toString(), "id = " + i));
}
} finally {
// return global conf to previous state
TestFilteredScan.spark.conf().set("spark.sql.caseSensitive", caseSensitivityBeforeTest);
}
}
@Test public void testMultipleValidFiltersAreHandled() {
Filter valid1 = EqualTo.apply("foo", "bar");
Filter valid2 = EqualTo.apply("bar", 1);
assertThat(SparkFilterUtils.unhandledFilters(AVRO, valid1, valid2)).isEmpty();
}