下面列出了怎么用org.apache.beam.sdk.transforms.SerializableFunction的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testOutputIsHashed() {
String clientId = "client_id";
String clientIp = "client_ip";
Map<String, String> attributes = ImmutableMap.<String, String>builder()
.put(Attribute.CLIENT_ID, clientId).put(Attribute.CLIENT_IP, clientIp).build();
PubsubMessage input = new PubsubMessage("{}".getBytes(StandardCharsets.UTF_8), attributes);
PCollection<PubsubMessage> output = pipeline.apply(Create.of(input)).apply(HashClientInfo
.of(pipeline.newProvider(ID_HASH_KEY_PATH), pipeline.newProvider(IP_HASH_KEY_PATH)));
PAssert.that(output).satisfies((SerializableFunction<Iterable<PubsubMessage>, Void>) input1 -> {
for (PubsubMessage message : input1) {
Assert.assertNotEquals(message.getAttribute(Attribute.CLIENT_ID), clientId);
Assert.assertNotEquals(message.getAttribute(Attribute.CLIENT_IP), clientIp);
Assert.assertTrue(HashClientInfo.isHashed(message.getAttribute(Attribute.CLIENT_ID)));
Assert.assertTrue(HashClientInfo.isHashed(message.getAttribute(Attribute.CLIENT_IP)));
}
return null;
});
pipeline.run();
}
@Test
public void allPanesMultiplePanes() {
SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, Iterable<Integer>> extractor =
PaneExtractors.allPanes();
Iterable<ValueInSingleWindow<Integer>> onlyOnTime =
ImmutableList.of(
ValueInSingleWindow.of(
8,
new Instant(0L),
GlobalWindow.INSTANCE,
PaneInfo.createPane(false, false, Timing.LATE, 2L, 1L)),
ValueInSingleWindow.of(
4,
new Instant(0L),
GlobalWindow.INSTANCE,
PaneInfo.createPane(false, false, Timing.ON_TIME, 1L, 0L)),
ValueInSingleWindow.of(
1,
new Instant(0L),
GlobalWindow.INSTANCE,
PaneInfo.createPane(true, false, Timing.EARLY)));
assertThat(extractor.apply(onlyOnTime), containsInAnyOrder(4, 8, 1));
}
private StreamingWriteTables(
BigQueryServices bigQueryServices,
InsertRetryPolicy retryPolicy,
boolean extendedErrorInfo,
boolean skipInvalidRows,
boolean ignoreUnknownValues,
boolean ignoreInsertIds,
Coder<ElementT> elementCoder,
SerializableFunction<ElementT, TableRow> toTableRow) {
this.bigQueryServices = bigQueryServices;
this.retryPolicy = retryPolicy;
this.extendedErrorInfo = extendedErrorInfo;
this.skipInvalidRows = skipInvalidRows;
this.ignoreUnknownValues = ignoreUnknownValues;
this.ignoreInsertIds = ignoreInsertIds;
this.elementCoder = elementCoder;
this.toTableRow = toTableRow;
}
public static <T> BigQueryStorageTableSource<T> create(
ValueProvider<TableReference> tableRefProvider,
@Nullable TableReadOptions readOptions,
@Nullable ValueProvider<List<String>> selectedFields,
@Nullable ValueProvider<String> rowRestriction,
SerializableFunction<SchemaAndRecord, T> parseFn,
Coder<T> outputCoder,
BigQueryServices bqServices) {
return new BigQueryStorageTableSource<>(
tableRefProvider,
readOptions,
selectedFields,
rowRestriction,
parseFn,
outputCoder,
bqServices);
}
@Nullable
@Override
public <T> SerializableFunction<Row, T> fromRowFunction(TypeDescriptor<T> typeDescriptor) {
TypeDescriptor<?> type = typeDescriptor;
do {
SchemaProvider schemaProvider = providers.get(type);
if (schemaProvider != null) {
return (SerializableFunction<Row, T>) schemaProvider.fromRowFunction(type);
}
Class<?> superClass = type.getRawType().getSuperclass();
if (superClass == null || superClass.equals(Object.class)) {
return null;
}
type = TypeDescriptor.of(superClass);
} while (true);
}
/**
* Returns a function mapping from {@code BillingEvent} to filename {@code Params}.
*
* <p>Beam uses this to determine which file a given {@code BillingEvent} should get placed into.
*
* @param outputBucket the GCS bucket we're outputting reports to
* @param yearMonthProvider a runtime provider for the yyyy-MM we're generating the invoice for
*/
static SerializableFunction<BillingEvent, Params> makeDestinationFunction(
String outputBucket, ValueProvider<String> yearMonthProvider) {
return billingEvent ->
new Params()
.withShardTemplate("")
.withSuffix(".csv")
.withBaseFilename(
NestedValueProvider.of(
yearMonthProvider,
yearMonth ->
FileBasedSink.convertToFileResourceIfPossible(
String.format(
"%s/%s/%s",
outputBucket, yearMonth, billingEvent.toFilename(yearMonth)))));
}
@Test
public void testWriteDataToDynamo() {
final List<WriteRequest> writeRequests = DynamoDBIOTestHelper.generateWriteRequests(numOfItems);
final PCollection<Void> output =
pipeline
.apply(Create.of(writeRequests))
.apply(
DynamoDBIO.<WriteRequest>write()
.withWriteRequestMapperFn(
(SerializableFunction<WriteRequest, KV<String, WriteRequest>>)
writeRequest -> KV.of(tableName, writeRequest))
.withRetryConfiguration(
DynamoDBIO.RetryConfiguration.create(5, Duration.standardMinutes(1)))
.withAwsClientsProvider(
AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())));
final PCollection<Long> publishedResultsSize = output.apply(Count.globally());
PAssert.that(publishedResultsSize).containsInAnyOrder(0L);
pipeline.run().waitUntilFinish();
}
@Test
public void shouldAdvanceWatermarkWithCustomTimePolicy() {
SerializableFunction<KinesisRecord, Instant> timestampFn =
(record) -> record.getApproximateArrivalTimestamp().plus(Duration.standardMinutes(1));
WatermarkPolicy policy =
WatermarkPolicyFactory.withCustomWatermarkPolicy(
WatermarkParameters.create().withTimestampFn(timestampFn))
.createWatermarkPolicy();
KinesisRecord a = mock(KinesisRecord.class);
KinesisRecord b = mock(KinesisRecord.class);
Instant time1 = NOW.minus(Duration.standardSeconds(30L));
Instant time2 = NOW.minus(Duration.standardSeconds(20L));
when(a.getApproximateArrivalTimestamp()).thenReturn(time1);
when(b.getApproximateArrivalTimestamp()).thenReturn(time2);
policy.update(a);
assertThat(policy.getWatermark()).isEqualTo(time1.plus(Duration.standardMinutes(1)));
policy.update(b);
assertThat(policy.getWatermark()).isEqualTo(time2.plus(Duration.standardMinutes(1)));
}
@Test
public void testGetSchemaCoder() throws NoSuchSchemaException {
SchemaRegistry registry = SchemaRegistry.createDefault();
registry.registerJavaBean(SimpleBean.class);
Schema schema = registry.getSchema(SimpleBean.class);
SerializableFunction<SimpleBean, Row> toRowFunction =
registry.getToRowFunction(SimpleBean.class);
SerializableFunction<Row, SimpleBean> fromRowFunction =
registry.getFromRowFunction(SimpleBean.class);
SchemaCoder schemaCoder = registry.getSchemaCoder(SimpleBean.class);
assertTrue(schema.equivalent(schemaCoder.getSchema()));
assertTrue(toRowFunction.equals(schemaCoder.getToRowFunction()));
assertTrue(fromRowFunction.equals(schemaCoder.getFromRowFunction()));
thrown.expect(NoSuchSchemaException.class);
registry.getSchemaCoder(Double.class);
}
@Test
public void testNestedValueProviderStatic() throws Exception {
ValueProvider<String> xvp = StaticValueProvider.of("foo");
ValueProvider<Integer> yvp = StaticValueProvider.of(1);
ValueProvider<String> zvp =
DualInputNestedValueProvider.of(
xvp,
yvp,
new SerializableFunction<TranslatorInput<String, Integer>, String>() {
@Override
public String apply(TranslatorInput<String, Integer> input) {
return input.getX() + (input.getY() + 1);
}
});
assertTrue(zvp.isAccessible());
assertEquals("foo2", zvp.get());
}
@Nullable
@Override
public <T> SerializableFunction<T, Row> toRowFunction(TypeDescriptor<T> typeDescriptor) {
TypeDescriptor<?> type = typeDescriptor;
do {
SchemaProvider schemaProvider = providers.get(type);
if (schemaProvider != null) {
return (SerializableFunction<T, Row>) schemaProvider.toRowFunction(type);
}
Class<?> superClass = type.getRawType().getSuperclass();
if (superClass == null || superClass.equals(Object.class)) {
return null;
}
type = TypeDescriptor.of(superClass);
} while (true);
}
@Test
public void testCustomMapperImplDelete() {
counter.set(0);
SerializableFunction<Session, Mapper> factory = new NOOPMapperFactory();
pipeline
.apply(Create.of(""))
.apply(
CassandraIO.<String>delete()
.withHosts(Collections.singletonList(CASSANDRA_HOST))
.withPort(cassandraPort)
.withKeyspace(CASSANDRA_KEYSPACE)
.withMapperFactoryFn(factory)
.withEntity(String.class));
pipeline.run();
assertEquals(1, counter.intValue());
}
@Test
public void onlyPaneMultiplePanesFails() {
SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, Iterable<Integer>> extractor =
PaneExtractors.onlyPane(PAssert.PAssertionSite.capture(""));
Iterable<ValueInSingleWindow<Integer>> multipleFiring =
ImmutableList.of(
ValueInSingleWindow.of(
4,
new Instant(0L),
GlobalWindow.INSTANCE,
PaneInfo.createPane(true, false, Timing.EARLY)),
ValueInSingleWindow.of(
2,
new Instant(0L),
GlobalWindow.INSTANCE,
PaneInfo.createPane(false, false, Timing.ON_TIME, 1L, 0L)),
ValueInSingleWindow.of(
1,
new Instant(0L),
GlobalWindow.INSTANCE,
PaneInfo.createPane(false, false, Timing.LATE, 2L, 1L)));
thrown.expectMessage("trigger that fires at most once");
extractor.apply(multipleFiring);
}
@Test
public void nonLatePanesSingleEarly() {
SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, Iterable<Integer>> extractor =
PaneExtractors.nonLatePanes();
Iterable<ValueInSingleWindow<Integer>> onlyOnTime =
ImmutableList.of(
ValueInSingleWindow.of(
8,
new Instant(0L),
GlobalWindow.INSTANCE,
PaneInfo.createPane(true, false, Timing.EARLY)),
ValueInSingleWindow.of(
4,
new Instant(0L),
GlobalWindow.INSTANCE,
PaneInfo.createPane(true, false, Timing.EARLY)));
assertThat(extractor.apply(onlyOnTime), containsInAnyOrder(4, 8));
}
@Test
public void onTimePane() {
SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, Iterable<Integer>> extractor =
PaneExtractors.onTimePane();
Iterable<ValueInSingleWindow<Integer>> onlyOnTime =
ImmutableList.of(
ValueInSingleWindow.of(
4,
new Instant(0L),
GlobalWindow.INSTANCE,
PaneInfo.createPane(false, false, Timing.ON_TIME, 1L, 0L)),
ValueInSingleWindow.of(
2,
new Instant(0L),
GlobalWindow.INSTANCE,
PaneInfo.createPane(false, false, Timing.ON_TIME, 1L, 0L)));
assertThat(extractor.apply(onlyOnTime), containsInAnyOrder(2, 4));
}
/** Constructor. */
public StreamingInserts(
CreateDisposition createDisposition,
DynamicDestinations<?, DestinationT> dynamicDestinations,
Coder<ElementT> elementCoder,
SerializableFunction<ElementT, TableRow> toTableRow) {
this(
createDisposition,
dynamicDestinations,
new BigQueryServicesImpl(),
InsertRetryPolicy.alwaysRetry(),
false,
false,
false,
false,
elementCoder,
toTableRow,
null);
}
@Test
public void testMissingTotalSegments() {
thrown.expectMessage("TotalSegments is required with withScanRequestFn()");
pipeline.apply(
DynamoDBIO.read()
.withScanRequestFn(
(SerializableFunction<Void, ScanRequest>) input -> new ScanRequest(tableName))
.withAwsClientsProvider(
AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())));
try {
pipeline.run().waitUntilFinish();
fail("TotalSegments is required with withScanRequestFn()");
} catch (IllegalArgumentException ex) {
assertEquals("TotalSegments is required with withScanRequestFn()", ex.getMessage());
}
}
public SnowflakeServiceConfig(
SerializableFunction<Void, DataSource> dataSourceProviderFn,
String table,
String query,
String storageIntegration,
String stagingBucketDir) {
this.dataSourceProviderFn = dataSourceProviderFn;
this.table = table;
this.query = query;
this.storageIntegrationName = storageIntegration;
this.stagingBucketDir = stagingBucketDir;
}
/**
* Returns a {@link DynamicAvroDestinations} that always returns the same {@link FilenamePolicy},
* schema, metadata, and codec.
*/
public static <UserT, OutputT> DynamicAvroDestinations<UserT, Void, OutputT> constantDestinations(
FilenamePolicy filenamePolicy,
Schema schema,
Map<String, Object> metadata,
CodecFactory codec,
SerializableFunction<UserT, OutputT> formatFunction) {
return new ConstantAvroDestination<>(filenamePolicy, schema, metadata, codec, formatFunction);
}
@Test
public void testOneOfProtoToRow() {
SerializableFunction<OneOf, Row> toRow =
new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(OneOf.class));
assertEquals(ONEOF_ROW_INT32, toRow.apply(ONEOF_PROTO_INT32));
assertEquals(ONEOF_ROW_BOOL, toRow.apply(ONEOF_PROTO_BOOL));
assertEquals(ONEOF_ROW_STRING, toRow.apply(ONEOF_PROTO_STRING));
assertEquals(ONEOF_ROW_PRIMITIVE, toRow.apply(ONEOF_PROTO_PRIMITIVE));
}
/**
* Applies a {@link SerializableMatcher} to check the elements of the {@code Iterable}.
*
* <p>Returns this {@code IterableAssert}.
*/
PCollectionContentsAssert<T> satisfies(
final SerializableMatcher<Iterable<? extends T>> matcher) {
// Safe covariant cast. Could be elided by changing a lot of this file to use
// more flexible bounds.
@SuppressWarnings({"rawtypes", "unchecked"})
SerializableFunction<Iterable<T>, Void> checkerFn =
(SerializableFunction) new MatcherCheckerFn<>(matcher);
actual.apply(
"PAssert$" + (assertCount++),
new GroupThenAssert<>(checkerFn, rewindowingStrategy, paneExtractor, site));
return this;
}
@Override
public <T> SerializableFunction<T, Row> toRowFunction(TypeDescriptor<T> typeDescriptor) {
if (typeDescriptor.equals(TypeDescriptor.of(TestDefaultSchemaClass.class))) {
return v -> Row.withSchema(EMPTY_SCHEMA).build();
}
return null;
}
private static <T> SerializableFunction<T, PubsubMessage> formatPayloadUsingCoder(
Coder<T> coder) {
return input -> {
try {
return new PubsubMessage(CoderUtils.encodeToByteArray(coder, input), ImmutableMap.of());
} catch (CoderException e) {
throw new RuntimeException("Could not encode Pubsub message", e);
}
};
}
/**
* A function to assign a timestamp to a record. Default is processing timestamp.
*
* @deprecated as of version 2.4. Use {@link
* #withTimestampPolicyFactory(TimestampPolicyFactory)} instead.
*/
@Deprecated
public Read<K, V> withTimestampFn2(
SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) {
checkArgument(timestampFn != null, "timestampFn can not be null");
return toBuilder()
.setTimestampPolicyFactory(TimestampPolicyFactory.withTimestampFn(timestampFn))
.build();
}
@Test
public void testWriteWithoutPreparedStatementWithReadRows() throws Exception {
SerializableFunction<Void, DataSource> dataSourceProvider = ignored -> dataSource;
PCollection<Row> rows =
pipeline.apply(
JdbcIO.readRows()
.withDataSourceProviderFn(dataSourceProvider)
.withQuery(String.format("select name,id from %s where name = ?", readTableName))
.withStatementPreparator(
preparedStatement ->
preparedStatement.setString(1, TestRow.getNameForSeed(1))));
String writeTableName = DatabaseTestHelper.getTestTableName("UT_WRITE_PS_WITH_READ_ROWS");
DatabaseTestHelper.createTableForRowWithSchema(dataSource, writeTableName);
try {
rows.apply(
JdbcIO.<Row>write()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(
"org.apache.derby.jdbc.ClientDriver",
"jdbc:derby://localhost:" + port + "/target/beam"))
.withBatchSize(10L)
.withTable(writeTableName));
pipeline.run();
} finally {
DatabaseTestHelper.deleteTable(dataSource, writeTableName);
}
}
@Override
public PCollectionSingletonAssert<T> satisfies(SerializableFunction<T, Void> checkerFn) {
actual.apply(
"PAssert$" + (assertCount++),
new GroupThenAssertForSingleton<>(checkerFn, rewindowingStrategy, paneExtractor, site));
return this;
}
private GroupThenAssert(
SerializableFunction<Iterable<T>, Void> checkerFn,
AssertionWindows rewindowingStrategy,
SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> paneExtractor,
PAssertionSite site) {
this.checkerFn = checkerFn;
this.rewindowingStrategy = rewindowingStrategy;
this.paneExtractor = paneExtractor;
this.site = site;
}
@Test
public void onTimePaneOnlyEarlyAndLate() {
SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, Iterable<Integer>> extractor =
PaneExtractors.onTimePane();
Iterable<ValueInSingleWindow<Integer>> onlyOnTime =
ImmutableList.of(
ValueInSingleWindow.of(
8,
new Instant(0L),
GlobalWindow.INSTANCE,
PaneInfo.createPane(false, false, Timing.LATE, 2L, 1L)),
ValueInSingleWindow.of(
4,
new Instant(0L),
GlobalWindow.INSTANCE,
PaneInfo.createPane(false, false, Timing.ON_TIME, 1L, 0L)),
ValueInSingleWindow.of(
2,
new Instant(0L),
GlobalWindow.INSTANCE,
PaneInfo.createPane(false, false, Timing.ON_TIME, 1L, 0L)),
ValueInSingleWindow.of(
1,
new Instant(0L),
GlobalWindow.INSTANCE,
PaneInfo.createPane(true, false, Timing.EARLY)));
assertThat(extractor.apply(onlyOnTime), containsInAnyOrder(2, 4));
}
private OneSideInputAssert(
PTransform<PBegin, PCollectionView<ActualT>> createActual,
PTransform<PCollection<Integer>, PCollection<Integer>> windowToken,
SerializableFunction<ActualT, Void> checkerFn,
PAssertionSite site) {
this.createActual = createActual;
this.windowToken = windowToken;
this.checkerFn = checkerFn;
this.site = site;
}
@Test
public void testOneOfProtoToRow() throws InvalidProtocolBufferException {
ProtoDynamicMessageSchema schemaProvider = schemaFromDescriptor(OneOf.getDescriptor());
SerializableFunction<DynamicMessage, Row> toRow = schemaProvider.getToRowFunction();
// equality doesn't work between dynamic messages and other,
// so we compare string representation
assertEquals(ONEOF_ROW_INT32.toString(), toRow.apply(toDynamic(ONEOF_PROTO_INT32)).toString());
assertEquals(ONEOF_ROW_BOOL.toString(), toRow.apply(toDynamic(ONEOF_PROTO_BOOL)).toString());
assertEquals(
ONEOF_ROW_STRING.toString(), toRow.apply(toDynamic(ONEOF_PROTO_STRING)).toString());
assertEquals(
ONEOF_ROW_PRIMITIVE.toString(), toRow.apply(toDynamic(ONEOF_PROTO_PRIMITIVE)).toString());
}