类org.apache.beam.sdk.transforms.SerializableFunction源码实例Demo

下面列出了怎么用org.apache.beam.sdk.transforms.SerializableFunction的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: gcp-ingestion   文件: HashClientInfoTest.java
@Test
public void testOutputIsHashed() {
  String clientId = "client_id";
  String clientIp = "client_ip";

  Map<String, String> attributes = ImmutableMap.<String, String>builder()
      .put(Attribute.CLIENT_ID, clientId).put(Attribute.CLIENT_IP, clientIp).build();
  PubsubMessage input = new PubsubMessage("{}".getBytes(StandardCharsets.UTF_8), attributes);

  PCollection<PubsubMessage> output = pipeline.apply(Create.of(input)).apply(HashClientInfo
      .of(pipeline.newProvider(ID_HASH_KEY_PATH), pipeline.newProvider(IP_HASH_KEY_PATH)));

  PAssert.that(output).satisfies((SerializableFunction<Iterable<PubsubMessage>, Void>) input1 -> {
    for (PubsubMessage message : input1) {
      Assert.assertNotEquals(message.getAttribute(Attribute.CLIENT_ID), clientId);
      Assert.assertNotEquals(message.getAttribute(Attribute.CLIENT_IP), clientIp);
      Assert.assertTrue(HashClientInfo.isHashed(message.getAttribute(Attribute.CLIENT_ID)));
      Assert.assertTrue(HashClientInfo.isHashed(message.getAttribute(Attribute.CLIENT_IP)));
    }
    return null;
  });

  pipeline.run();
}
 
源代码2 项目: beam   文件: PaneExtractorsTest.java
@Test
public void allPanesMultiplePanes() {
  SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, Iterable<Integer>> extractor =
      PaneExtractors.allPanes();
  Iterable<ValueInSingleWindow<Integer>> onlyOnTime =
      ImmutableList.of(
          ValueInSingleWindow.of(
              8,
              new Instant(0L),
              GlobalWindow.INSTANCE,
              PaneInfo.createPane(false, false, Timing.LATE, 2L, 1L)),
          ValueInSingleWindow.of(
              4,
              new Instant(0L),
              GlobalWindow.INSTANCE,
              PaneInfo.createPane(false, false, Timing.ON_TIME, 1L, 0L)),
          ValueInSingleWindow.of(
              1,
              new Instant(0L),
              GlobalWindow.INSTANCE,
              PaneInfo.createPane(true, false, Timing.EARLY)));

  assertThat(extractor.apply(onlyOnTime), containsInAnyOrder(4, 8, 1));
}
 
源代码3 项目: beam   文件: StreamingWriteTables.java
private StreamingWriteTables(
    BigQueryServices bigQueryServices,
    InsertRetryPolicy retryPolicy,
    boolean extendedErrorInfo,
    boolean skipInvalidRows,
    boolean ignoreUnknownValues,
    boolean ignoreInsertIds,
    Coder<ElementT> elementCoder,
    SerializableFunction<ElementT, TableRow> toTableRow) {
  this.bigQueryServices = bigQueryServices;
  this.retryPolicy = retryPolicy;
  this.extendedErrorInfo = extendedErrorInfo;
  this.skipInvalidRows = skipInvalidRows;
  this.ignoreUnknownValues = ignoreUnknownValues;
  this.ignoreInsertIds = ignoreInsertIds;
  this.elementCoder = elementCoder;
  this.toTableRow = toTableRow;
}
 
源代码4 项目: beam   文件: BigQueryStorageTableSource.java
public static <T> BigQueryStorageTableSource<T> create(
    ValueProvider<TableReference> tableRefProvider,
    @Nullable TableReadOptions readOptions,
    @Nullable ValueProvider<List<String>> selectedFields,
    @Nullable ValueProvider<String> rowRestriction,
    SerializableFunction<SchemaAndRecord, T> parseFn,
    Coder<T> outputCoder,
    BigQueryServices bqServices) {
  return new BigQueryStorageTableSource<>(
      tableRefProvider,
      readOptions,
      selectedFields,
      rowRestriction,
      parseFn,
      outputCoder,
      bqServices);
}
 
源代码5 项目: beam   文件: SchemaRegistry.java
@Nullable
@Override
public <T> SerializableFunction<Row, T> fromRowFunction(TypeDescriptor<T> typeDescriptor) {
  TypeDescriptor<?> type = typeDescriptor;
  do {
    SchemaProvider schemaProvider = providers.get(type);
    if (schemaProvider != null) {
      return (SerializableFunction<Row, T>) schemaProvider.fromRowFunction(type);
    }
    Class<?> superClass = type.getRawType().getSuperclass();
    if (superClass == null || superClass.equals(Object.class)) {
      return null;
    }
    type = TypeDescriptor.of(superClass);
  } while (true);
}
 
源代码6 项目: nomulus   文件: InvoicingUtils.java
/**
 * Returns a function mapping from {@code BillingEvent} to filename {@code Params}.
 *
 * <p>Beam uses this to determine which file a given {@code BillingEvent} should get placed into.
 *
 * @param outputBucket the GCS bucket we're outputting reports to
 * @param yearMonthProvider a runtime provider for the yyyy-MM we're generating the invoice for
 */
static SerializableFunction<BillingEvent, Params> makeDestinationFunction(
    String outputBucket, ValueProvider<String> yearMonthProvider) {
  return billingEvent ->
      new Params()
          .withShardTemplate("")
          .withSuffix(".csv")
          .withBaseFilename(
              NestedValueProvider.of(
                  yearMonthProvider,
                  yearMonth ->
                      FileBasedSink.convertToFileResourceIfPossible(
                          String.format(
                              "%s/%s/%s",
                              outputBucket, yearMonth, billingEvent.toFilename(yearMonth)))));
}
 
源代码7 项目: beam   文件: DynamoDBIOTest.java
@Test
public void testWriteDataToDynamo() {
  final List<WriteRequest> writeRequests = DynamoDBIOTestHelper.generateWriteRequests(numOfItems);

  final PCollection<Void> output =
      pipeline
          .apply(Create.of(writeRequests))
          .apply(
              DynamoDBIO.<WriteRequest>write()
                  .withWriteRequestMapperFn(
                      (SerializableFunction<WriteRequest, KV<String, WriteRequest>>)
                          writeRequest -> KV.of(tableName, writeRequest))
                  .withRetryConfiguration(
                      DynamoDBIO.RetryConfiguration.create(5, Duration.standardMinutes(1)))
                  .withAwsClientsProvider(
                      AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())));

  final PCollection<Long> publishedResultsSize = output.apply(Count.globally());
  PAssert.that(publishedResultsSize).containsInAnyOrder(0L);

  pipeline.run().waitUntilFinish();
}
 
源代码8 项目: beam   文件: WatermarkPolicyTest.java
@Test
public void shouldAdvanceWatermarkWithCustomTimePolicy() {
  SerializableFunction<KinesisRecord, Instant> timestampFn =
      (record) -> record.getApproximateArrivalTimestamp().plus(Duration.standardMinutes(1));

  WatermarkPolicy policy =
      WatermarkPolicyFactory.withCustomWatermarkPolicy(
              WatermarkParameters.create().withTimestampFn(timestampFn))
          .createWatermarkPolicy();

  KinesisRecord a = mock(KinesisRecord.class);
  KinesisRecord b = mock(KinesisRecord.class);

  Instant time1 = NOW.minus(Duration.standardSeconds(30L));
  Instant time2 = NOW.minus(Duration.standardSeconds(20L));
  when(a.getApproximateArrivalTimestamp()).thenReturn(time1);
  when(b.getApproximateArrivalTimestamp()).thenReturn(time2);

  policy.update(a);
  assertThat(policy.getWatermark()).isEqualTo(time1.plus(Duration.standardMinutes(1)));
  policy.update(b);
  assertThat(policy.getWatermark()).isEqualTo(time2.plus(Duration.standardMinutes(1)));
}
 
源代码9 项目: beam   文件: SchemaRegistryTest.java
@Test
public void testGetSchemaCoder() throws NoSuchSchemaException {
  SchemaRegistry registry = SchemaRegistry.createDefault();
  registry.registerJavaBean(SimpleBean.class);

  Schema schema = registry.getSchema(SimpleBean.class);
  SerializableFunction<SimpleBean, Row> toRowFunction =
      registry.getToRowFunction(SimpleBean.class);
  SerializableFunction<Row, SimpleBean> fromRowFunction =
      registry.getFromRowFunction(SimpleBean.class);
  SchemaCoder schemaCoder = registry.getSchemaCoder(SimpleBean.class);

  assertTrue(schema.equivalent(schemaCoder.getSchema()));
  assertTrue(toRowFunction.equals(schemaCoder.getToRowFunction()));
  assertTrue(fromRowFunction.equals(schemaCoder.getFromRowFunction()));

  thrown.expect(NoSuchSchemaException.class);
  registry.getSchemaCoder(Double.class);
}
 
@Test
public void testNestedValueProviderStatic() throws Exception {
  ValueProvider<String> xvp = StaticValueProvider.of("foo");
  ValueProvider<Integer> yvp = StaticValueProvider.of(1);
  ValueProvider<String> zvp =
      DualInputNestedValueProvider.of(
          xvp,
          yvp,
          new SerializableFunction<TranslatorInput<String, Integer>, String>() {
            @Override
            public String apply(TranslatorInput<String, Integer> input) {
              return input.getX() + (input.getY() + 1);
            }
          });
  assertTrue(zvp.isAccessible());
  assertEquals("foo2", zvp.get());
}
 
源代码11 项目: beam   文件: SchemaRegistry.java
@Nullable
@Override
public <T> SerializableFunction<T, Row> toRowFunction(TypeDescriptor<T> typeDescriptor) {
  TypeDescriptor<?> type = typeDescriptor;
  do {
    SchemaProvider schemaProvider = providers.get(type);
    if (schemaProvider != null) {
      return (SerializableFunction<T, Row>) schemaProvider.toRowFunction(type);
    }
    Class<?> superClass = type.getRawType().getSuperclass();
    if (superClass == null || superClass.equals(Object.class)) {
      return null;
    }
    type = TypeDescriptor.of(superClass);
  } while (true);
}
 
源代码12 项目: beam   文件: CassandraIOTest.java
@Test
public void testCustomMapperImplDelete() {
  counter.set(0);

  SerializableFunction<Session, Mapper> factory = new NOOPMapperFactory();

  pipeline
      .apply(Create.of(""))
      .apply(
          CassandraIO.<String>delete()
              .withHosts(Collections.singletonList(CASSANDRA_HOST))
              .withPort(cassandraPort)
              .withKeyspace(CASSANDRA_KEYSPACE)
              .withMapperFactoryFn(factory)
              .withEntity(String.class));
  pipeline.run();

  assertEquals(1, counter.intValue());
}
 
源代码13 项目: beam   文件: PaneExtractorsTest.java
@Test
public void onlyPaneMultiplePanesFails() {
  SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, Iterable<Integer>> extractor =
      PaneExtractors.onlyPane(PAssert.PAssertionSite.capture(""));
  Iterable<ValueInSingleWindow<Integer>> multipleFiring =
      ImmutableList.of(
          ValueInSingleWindow.of(
              4,
              new Instant(0L),
              GlobalWindow.INSTANCE,
              PaneInfo.createPane(true, false, Timing.EARLY)),
          ValueInSingleWindow.of(
              2,
              new Instant(0L),
              GlobalWindow.INSTANCE,
              PaneInfo.createPane(false, false, Timing.ON_TIME, 1L, 0L)),
          ValueInSingleWindow.of(
              1,
              new Instant(0L),
              GlobalWindow.INSTANCE,
              PaneInfo.createPane(false, false, Timing.LATE, 2L, 1L)));

  thrown.expectMessage("trigger that fires at most once");
  extractor.apply(multipleFiring);
}
 
源代码14 项目: beam   文件: PaneExtractorsTest.java
@Test
public void nonLatePanesSingleEarly() {
  SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, Iterable<Integer>> extractor =
      PaneExtractors.nonLatePanes();
  Iterable<ValueInSingleWindow<Integer>> onlyOnTime =
      ImmutableList.of(
          ValueInSingleWindow.of(
              8,
              new Instant(0L),
              GlobalWindow.INSTANCE,
              PaneInfo.createPane(true, false, Timing.EARLY)),
          ValueInSingleWindow.of(
              4,
              new Instant(0L),
              GlobalWindow.INSTANCE,
              PaneInfo.createPane(true, false, Timing.EARLY)));

  assertThat(extractor.apply(onlyOnTime), containsInAnyOrder(4, 8));
}
 
源代码15 项目: beam   文件: PaneExtractorsTest.java
@Test
public void onTimePane() {
  SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, Iterable<Integer>> extractor =
      PaneExtractors.onTimePane();
  Iterable<ValueInSingleWindow<Integer>> onlyOnTime =
      ImmutableList.of(
          ValueInSingleWindow.of(
              4,
              new Instant(0L),
              GlobalWindow.INSTANCE,
              PaneInfo.createPane(false, false, Timing.ON_TIME, 1L, 0L)),
          ValueInSingleWindow.of(
              2,
              new Instant(0L),
              GlobalWindow.INSTANCE,
              PaneInfo.createPane(false, false, Timing.ON_TIME, 1L, 0L)));

  assertThat(extractor.apply(onlyOnTime), containsInAnyOrder(2, 4));
}
 
源代码16 项目: beam   文件: StreamingInserts.java
/** Constructor. */
public StreamingInserts(
    CreateDisposition createDisposition,
    DynamicDestinations<?, DestinationT> dynamicDestinations,
    Coder<ElementT> elementCoder,
    SerializableFunction<ElementT, TableRow> toTableRow) {
  this(
      createDisposition,
      dynamicDestinations,
      new BigQueryServicesImpl(),
      InsertRetryPolicy.alwaysRetry(),
      false,
      false,
      false,
      false,
      elementCoder,
      toTableRow,
      null);
}
 
源代码17 项目: beam   文件: DynamoDBIOTest.java
@Test
public void testMissingTotalSegments() {
  thrown.expectMessage("TotalSegments is required with withScanRequestFn()");
  pipeline.apply(
      DynamoDBIO.read()
          .withScanRequestFn(
              (SerializableFunction<Void, ScanRequest>) input -> new ScanRequest(tableName))
          .withAwsClientsProvider(
              AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())));
  try {
    pipeline.run().waitUntilFinish();
    fail("TotalSegments is required with withScanRequestFn()");
  } catch (IllegalArgumentException ex) {
    assertEquals("TotalSegments is required with withScanRequestFn()", ex.getMessage());
  }
}
 
源代码18 项目: beam   文件: SnowflakeServiceConfig.java
public SnowflakeServiceConfig(
    SerializableFunction<Void, DataSource> dataSourceProviderFn,
    String table,
    String query,
    String storageIntegration,
    String stagingBucketDir) {
  this.dataSourceProviderFn = dataSourceProviderFn;
  this.table = table;
  this.query = query;
  this.storageIntegrationName = storageIntegration;
  this.stagingBucketDir = stagingBucketDir;
}
 
源代码19 项目: beam   文件: AvroIO.java
/**
 * Returns a {@link DynamicAvroDestinations} that always returns the same {@link FilenamePolicy},
 * schema, metadata, and codec.
 */
public static <UserT, OutputT> DynamicAvroDestinations<UserT, Void, OutputT> constantDestinations(
    FilenamePolicy filenamePolicy,
    Schema schema,
    Map<String, Object> metadata,
    CodecFactory codec,
    SerializableFunction<UserT, OutputT> formatFunction) {
  return new ConstantAvroDestination<>(filenamePolicy, schema, metadata, codec, formatFunction);
}
 
源代码20 项目: beam   文件: ProtoMessageSchemaTest.java
@Test
public void testOneOfProtoToRow() {
  SerializableFunction<OneOf, Row> toRow =
      new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(OneOf.class));
  assertEquals(ONEOF_ROW_INT32, toRow.apply(ONEOF_PROTO_INT32));
  assertEquals(ONEOF_ROW_BOOL, toRow.apply(ONEOF_PROTO_BOOL));
  assertEquals(ONEOF_ROW_STRING, toRow.apply(ONEOF_PROTO_STRING));
  assertEquals(ONEOF_ROW_PRIMITIVE, toRow.apply(ONEOF_PROTO_PRIMITIVE));
}
 
源代码21 项目: beam   文件: PAssert.java
/**
 * Applies a {@link SerializableMatcher} to check the elements of the {@code Iterable}.
 *
 * <p>Returns this {@code IterableAssert}.
 */
PCollectionContentsAssert<T> satisfies(
    final SerializableMatcher<Iterable<? extends T>> matcher) {
  // Safe covariant cast. Could be elided by changing a lot of this file to use
  // more flexible bounds.
  @SuppressWarnings({"rawtypes", "unchecked"})
  SerializableFunction<Iterable<T>, Void> checkerFn =
      (SerializableFunction) new MatcherCheckerFn<>(matcher);
  actual.apply(
      "PAssert$" + (assertCount++),
      new GroupThenAssert<>(checkerFn, rewindowingStrategy, paneExtractor, site));
  return this;
}
 
源代码22 项目: beam   文件: SchemaRegistryTest.java
@Override
public <T> SerializableFunction<T, Row> toRowFunction(TypeDescriptor<T> typeDescriptor) {
  if (typeDescriptor.equals(TypeDescriptor.of(TestDefaultSchemaClass.class))) {
    return v -> Row.withSchema(EMPTY_SCHEMA).build();
  }
  return null;
}
 
源代码23 项目: beam   文件: PubsubIO.java
private static <T> SerializableFunction<T, PubsubMessage> formatPayloadUsingCoder(
    Coder<T> coder) {
  return input -> {
    try {
      return new PubsubMessage(CoderUtils.encodeToByteArray(coder, input), ImmutableMap.of());
    } catch (CoderException e) {
      throw new RuntimeException("Could not encode Pubsub message", e);
    }
  };
}
 
源代码24 项目: DataflowTemplates   文件: KafkaIO.java
/**
 * A function to assign a timestamp to a record. Default is processing timestamp.
 *
 * @deprecated as of version 2.4. Use {@link
 *     #withTimestampPolicyFactory(TimestampPolicyFactory)} instead.
 */
@Deprecated
public Read<K, V> withTimestampFn2(
    SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) {
  checkArgument(timestampFn != null, "timestampFn can not be null");
  return toBuilder()
      .setTimestampPolicyFactory(TimestampPolicyFactory.withTimestampFn(timestampFn))
      .build();
}
 
源代码25 项目: beam   文件: JdbcIOTest.java
@Test
public void testWriteWithoutPreparedStatementWithReadRows() throws Exception {
  SerializableFunction<Void, DataSource> dataSourceProvider = ignored -> dataSource;
  PCollection<Row> rows =
      pipeline.apply(
          JdbcIO.readRows()
              .withDataSourceProviderFn(dataSourceProvider)
              .withQuery(String.format("select name,id from %s where name = ?", readTableName))
              .withStatementPreparator(
                  preparedStatement ->
                      preparedStatement.setString(1, TestRow.getNameForSeed(1))));

  String writeTableName = DatabaseTestHelper.getTestTableName("UT_WRITE_PS_WITH_READ_ROWS");
  DatabaseTestHelper.createTableForRowWithSchema(dataSource, writeTableName);
  try {
    rows.apply(
        JdbcIO.<Row>write()
            .withDataSourceConfiguration(
                JdbcIO.DataSourceConfiguration.create(
                    "org.apache.derby.jdbc.ClientDriver",
                    "jdbc:derby://localhost:" + port + "/target/beam"))
            .withBatchSize(10L)
            .withTable(writeTableName));
    pipeline.run();
  } finally {
    DatabaseTestHelper.deleteTable(dataSource, writeTableName);
  }
}
 
源代码26 项目: beam   文件: PAssert.java
@Override
public PCollectionSingletonAssert<T> satisfies(SerializableFunction<T, Void> checkerFn) {
  actual.apply(
      "PAssert$" + (assertCount++),
      new GroupThenAssertForSingleton<>(checkerFn, rewindowingStrategy, paneExtractor, site));
  return this;
}
 
源代码27 项目: beam   文件: PAssert.java
private GroupThenAssert(
    SerializableFunction<Iterable<T>, Void> checkerFn,
    AssertionWindows rewindowingStrategy,
    SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> paneExtractor,
    PAssertionSite site) {
  this.checkerFn = checkerFn;
  this.rewindowingStrategy = rewindowingStrategy;
  this.paneExtractor = paneExtractor;
  this.site = site;
}
 
源代码28 项目: beam   文件: PaneExtractorsTest.java
@Test
public void onTimePaneOnlyEarlyAndLate() {
  SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, Iterable<Integer>> extractor =
      PaneExtractors.onTimePane();
  Iterable<ValueInSingleWindow<Integer>> onlyOnTime =
      ImmutableList.of(
          ValueInSingleWindow.of(
              8,
              new Instant(0L),
              GlobalWindow.INSTANCE,
              PaneInfo.createPane(false, false, Timing.LATE, 2L, 1L)),
          ValueInSingleWindow.of(
              4,
              new Instant(0L),
              GlobalWindow.INSTANCE,
              PaneInfo.createPane(false, false, Timing.ON_TIME, 1L, 0L)),
          ValueInSingleWindow.of(
              2,
              new Instant(0L),
              GlobalWindow.INSTANCE,
              PaneInfo.createPane(false, false, Timing.ON_TIME, 1L, 0L)),
          ValueInSingleWindow.of(
              1,
              new Instant(0L),
              GlobalWindow.INSTANCE,
              PaneInfo.createPane(true, false, Timing.EARLY)));

  assertThat(extractor.apply(onlyOnTime), containsInAnyOrder(2, 4));
}
 
源代码29 项目: beam   文件: PAssert.java
private OneSideInputAssert(
    PTransform<PBegin, PCollectionView<ActualT>> createActual,
    PTransform<PCollection<Integer>, PCollection<Integer>> windowToken,
    SerializableFunction<ActualT, Void> checkerFn,
    PAssertionSite site) {
  this.createActual = createActual;
  this.windowToken = windowToken;
  this.checkerFn = checkerFn;
  this.site = site;
}
 
源代码30 项目: beam   文件: ProtoDynamicMessageSchemaTest.java
@Test
public void testOneOfProtoToRow() throws InvalidProtocolBufferException {
  ProtoDynamicMessageSchema schemaProvider = schemaFromDescriptor(OneOf.getDescriptor());
  SerializableFunction<DynamicMessage, Row> toRow = schemaProvider.getToRowFunction();
  // equality doesn't work between dynamic messages and other,
  // so we compare string representation
  assertEquals(ONEOF_ROW_INT32.toString(), toRow.apply(toDynamic(ONEOF_PROTO_INT32)).toString());
  assertEquals(ONEOF_ROW_BOOL.toString(), toRow.apply(toDynamic(ONEOF_PROTO_BOOL)).toString());
  assertEquals(
      ONEOF_ROW_STRING.toString(), toRow.apply(toDynamic(ONEOF_PROTO_STRING)).toString());
  assertEquals(
      ONEOF_ROW_PRIMITIVE.toString(), toRow.apply(toDynamic(ONEOF_PROTO_PRIMITIVE)).toString());
}
 
 类方法
 同包方法