类org.apache.beam.sdk.transforms.Combine源码实例Demo

下面列出了怎么用org.apache.beam.sdk.transforms.Combine的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: beam   文件: AggregatorCombiner.java
public AggregatorCombiner(
    Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
    WindowingStrategy<?, ?> windowingStrategy,
    Coder<AccumT> accumulatorCoder,
    Coder<OutputT> outputCoder) {
  this.combineFn = combineFn;
  this.windowingStrategy = (WindowingStrategy<InputT, W>) windowingStrategy;
  this.timestampCombiner = windowingStrategy.getTimestampCombiner();
  this.accumulatorCoder =
      IterableCoder.of(
          WindowedValue.FullWindowedValueCoder.of(
              accumulatorCoder, windowingStrategy.getWindowFn().windowCoder()));
  this.outputCoder =
      IterableCoder.of(
          WindowedValue.FullWindowedValueCoder.of(
              outputCoder, windowingStrategy.getWindowFn().windowCoder()));
}
 
源代码2 项目: beam   文件: GroupByKeyTranslator.java
@SuppressWarnings("unchecked")
private static <K, InputT, OutputT>
    SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> getSystemReduceFn(
        PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform,
        Pipeline pipeline,
        KvCoder<K, InputT> kvInputCoder) {
  if (transform instanceof GroupByKey) {
    return (SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow>)
        SystemReduceFn.buffering(kvInputCoder.getValueCoder());
  } else if (transform instanceof Combine.PerKey) {
    final CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> combineFn =
        ((Combine.PerKey) transform).getFn();
    return SystemReduceFn.combining(
        kvInputCoder.getKeyCoder(),
        AppliedCombineFn.withInputCoder(combineFn, pipeline.getCoderRegistry(), kvInputCoder));
  } else {
    throw new RuntimeException("Transform " + transform + " cannot be translated as GroupByKey.");
  }
}
 
源代码3 项目: beam   文件: CombineTest.java
@Test
public void testBinaryCombineWithSlidingWindows() {
  PCollection<Integer> input =
      pipeline
          .apply(
              Create.timestamped(
                  TimestampedValue.of(1, new Instant(1)),
                  TimestampedValue.of(3, new Instant(2)),
                  TimestampedValue.of(5, new Instant(3))))
          .apply(Window.into(SlidingWindows.of(Duration.millis(3)).every(Duration.millis(1))))
          .apply(
              Combine.globally(
                      Combine.BinaryCombineFn.of(
                          (SerializableBiFunction<Integer, Integer, Integer>)
                              (integer1, integer2) -> integer1 > integer2 ? integer1 : integer2))
                  .withoutDefaults());
  PAssert.that(input).containsInAnyOrder(1, 3, 5, 5, 5);
  pipeline.run();
}
 
源代码4 项目: beam   文件: SparkCombineFnTest.java
private static Combine.CombineFn<Integer, Long, Long> getSumFn() {
  return new Combine.CombineFn<Integer, Long, Long>() {

    @Override
    public Long createAccumulator() {
      return 0L;
    }

    @Override
    public Long addInput(Long mutableAccumulator, Integer input) {
      return mutableAccumulator + input;
    }

    @Override
    public Long mergeAccumulators(Iterable<Long> accumulators) {
      return StreamSupport.stream(accumulators.spliterator(), false).mapToLong(e -> e).sum();
    }

    @Override
    public Long extractOutput(Long accumulator) {
      return accumulator;
    }
  };
}
 
源代码5 项目: beam   文件: CombineRunnersTest.java
@Before
public void createPipeline() throws Exception {
  // Create pipeline with an input pCollection, combine, and output pCollection.
  TestCombineFn combineFn = new TestCombineFn();
  Combine.PerKey<String, String, Integer> combine = Combine.perKey(combineFn);

  Pipeline p = Pipeline.create();
  PCollection<KV<String, String>> inputPCollection = p.apply(Create.of(KV.of("unused", "0")));
  inputPCollection.setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
  PCollection<KV<String, Integer>> outputPCollection =
      inputPCollection.apply(TEST_COMBINE_ID, combine);
  outputPCollection.setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));

  // Create FnApi protos needed for the runner.
  SdkComponents sdkComponents = SdkComponents.create(p.getOptions());
  pProto = PipelineTranslation.toProto(p, sdkComponents);
  inputPCollectionId = sdkComponents.registerPCollection(inputPCollection);
  outputPCollectionId = sdkComponents.registerPCollection(outputPCollection);
  pTransform = pProto.getComponents().getTransformsOrThrow(TEST_COMBINE_ID);
}
 
源代码6 项目: beam   文件: CombineTest.java
@Test
public void testCombineGloballyPreservesWindowing() {
  PCollection<Integer> input =
      pipeline
          .apply(
              Create.timestamped(
                  TimestampedValue.of(1, new Instant(1)),
                  TimestampedValue.of(2, new Instant(2)),
                  TimestampedValue.of(3, new Instant(11)),
                  TimestampedValue.of(4, new Instant(3)),
                  TimestampedValue.of(5, new Instant(11)),
                  TimestampedValue.of(6, new Instant(12))))
          .apply(Window.into(FixedWindows.of(Duration.millis(10))))
          .apply(Combine.globally(Sum.ofIntegers()).withoutDefaults());
  PAssert.that(input).containsInAnyOrder(7, 14);
}
 
源代码7 项目: beam   文件: WindowTest.java
@Test
@Category({ValidatesRunner.class, UsesCustomWindowMerging.class})
public void testMergingCustomWindows() {
  Instant startInstant = new Instant(0L);
  PCollection<String> inputCollection =
      pipeline.apply(
          Create.timestamped(
              TimestampedValue.of("big", startInstant.plus(Duration.standardSeconds(10))),
              TimestampedValue.of("small1", startInstant.plus(Duration.standardSeconds(20))),
              // This one will be outside of bigWindow thus not merged
              TimestampedValue.of("small2", startInstant.plus(Duration.standardSeconds(39)))));
  PCollection<String> windowedCollection =
      inputCollection.apply(Window.into(new CustomWindowFn<>()));
  PCollection<Long> count =
      windowedCollection.apply(Combine.globally(Count.<String>combineFn()).withoutDefaults());
  // "small1" and "big" elements merged into bigWindow "small2" not merged
  // because timestamp is not in bigWindow
  PAssert.that("Wrong number of elements in output collection", count).containsInAnyOrder(2L, 1L);
  pipeline.run();
}
 
源代码8 项目: beam   文件: WindowTest.java
@Test
@Category({ValidatesRunner.class, UsesCustomWindowMerging.class})
public void testMergingCustomWindowsKeyedCollection() {
  Instant startInstant = new Instant(0L);
  PCollection<KV<Integer, String>> inputCollection =
      pipeline.apply(
          Create.timestamped(
              TimestampedValue.of(
                  KV.of(0, "big"), startInstant.plus(Duration.standardSeconds(10))),
              TimestampedValue.of(
                  KV.of(1, "small1"), startInstant.plus(Duration.standardSeconds(20))),
              // This element is not contained within the bigWindow and not merged
              TimestampedValue.of(
                  KV.of(2, "small2"), startInstant.plus(Duration.standardSeconds(39)))));
  PCollection<KV<Integer, String>> windowedCollection =
      inputCollection.apply(Window.into(new CustomWindowFn<>()));
  PCollection<Long> count =
      windowedCollection.apply(
          Combine.globally(Count.<KV<Integer, String>>combineFn()).withoutDefaults());
  // "small1" and "big" elements merged into bigWindow "small2" not merged
  // because it is not contained in bigWindow
  PAssert.that("Wrong number of elements in output collection", count).containsInAnyOrder(2L, 1L);
  pipeline.run();
}
 
源代码9 项目: beam   文件: HBaseIOIT.java
/** Read the test dataset from hbase and validate its contents. */
private void runRead() {
  PCollection<Result> tableRows =
      pipelineRead.apply(HBaseIO.read().withConfiguration(conf).withTableId(TABLE_NAME));

  PAssert.thatSingleton(tableRows.apply("Count All", Count.<Result>globally()))
      .isEqualTo((long) numberOfRows);

  PCollection<String> consolidatedHashcode =
      tableRows
          .apply(ParDo.of(new SelectNameFn()))
          .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults());

  PAssert.that(consolidatedHashcode)
      .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows));

  pipelineRead.run().waitUntilFinish();
}
 
源代码10 项目: beam   文件: FlinkStreamingTransformTranslators.java
@Override
boolean canTranslate(
    PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform,
    FlinkStreamingTranslationContext context) {
  // if we have a merging window strategy and side inputs we cannot
  // translate as a proper combine. We have to group and then run the combine
  // over the final grouped values.
  PCollection<KV<K, InputT>> input = context.getInput(transform);

  @SuppressWarnings("unchecked")
  WindowingStrategy<?, BoundedWindow> windowingStrategy =
      (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();

  return windowingStrategy.getWindowFn().isNonMerging()
      || ((Combine.PerKey) transform).getSideInputs().isEmpty();
}
 
源代码11 项目: beam   文件: CassandraIOIT.java
private void runRead() {
  PCollection<Scientist> output =
      pipelineRead.apply(
          CassandraIO.<Scientist>read()
              .withHosts(options.getCassandraHost())
              .withPort(options.getCassandraPort())
              .withMinNumberOfSplits(20)
              .withKeyspace(KEYSPACE)
              .withTable(TABLE)
              .withEntity(Scientist.class)
              .withCoder(SerializableCoder.of(Scientist.class)));

  PCollection<String> consolidatedHashcode =
      output
          .apply(ParDo.of(new SelectNameFn()))
          .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults());

  PAssert.thatSingleton(consolidatedHashcode)
      .isEqualTo(TestRow.getExpectedHashForRowCount(options.getNumberOfRecords()));

  pipelineRead.run().waitUntilFinish();
}
 
源代码12 项目: beam   文件: HadoopFormatIOElasticTest.java
/**
 * Test to read data from embedded Elasticsearch instance and verify whether data is read
 * successfully.
 */
@Test
public void testHifIOWithElastic() {
  // Expected hashcode is evaluated during insertion time one time and hardcoded here.
  String expectedHashCode = "a62a85f5f081e3840baf1028d4d6c6bc";
  Configuration conf = getConfiguration();
  PCollection<KV<Text, LinkedMapWritable>> esData =
      pipeline.apply(HadoopFormatIO.<Text, LinkedMapWritable>read().withConfiguration(conf));
  PCollection<Long> count = esData.apply(Count.globally());
  // Verify that the count of objects fetched using HIFInputFormat IO is correct.
  PAssert.thatSingleton(count).isEqualTo((long) TEST_DATA_ROW_COUNT);
  PCollection<LinkedMapWritable> values = esData.apply(Values.create());
  PCollection<String> textValues = values.apply(transformFunc);
  // Verify the output values using checksum comparison.
  PCollection<String> consolidatedHashcode =
      textValues.apply(Combine.globally(new HashingFn()).withoutDefaults());
  PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedHashCode);
  pipeline.run().waitUntilFinish();
}
 
源代码13 项目: beam   文件: HadoopFormatIOCassandraIT.java
/** This test reads data from the Cassandra instance and verifies if data is read successfully. */
@Test
public void testHIFReadForCassandra() {
  // Expected hashcode is evaluated during insertion time one time and hardcoded here.
  String expectedHashCode = "1a30ad400afe4ebf5fde75f5d2d95408";
  Long expectedRecordsCount = 1000L;
  Configuration conf = getConfiguration(options);
  PCollection<KV<Long, String>> cassandraData =
      pipeline.apply(
          HadoopFormatIO.<Long, String>read()
              .withConfiguration(conf)
              .withValueTranslation(myValueTranslate));
  PAssert.thatSingleton(cassandraData.apply("Count", Count.globally()))
      .isEqualTo(expectedRecordsCount);
  PCollection<String> textValues = cassandraData.apply(Values.create());
  // Verify the output values using checksum comparison.
  PCollection<String> consolidatedHashcode =
      textValues.apply(Combine.globally(new HashingFn()).withoutDefaults());
  PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedHashCode);
  pipeline.run().waitUntilFinish();
}
 
源代码14 项目: beam   文件: HadoopFormatIOCassandraTest.java
/**
 * Test to read data from embedded Cassandra instance and verify whether data is read
 * successfully.
 */
@Test
public void testHIFReadForCassandra() {
  // Expected hashcode is evaluated during insertion time one time and hardcoded here.
  String expectedHashCode = "1b9780833cce000138b9afa25ba63486";
  Configuration conf = getConfiguration();
  PCollection<KV<Long, String>> cassandraData =
      p.apply(
          HadoopFormatIO.<Long, String>read()
              .withConfiguration(conf)
              .withValueTranslation(myValueTranslate));
  // Verify the count of data retrieved from Cassandra matches expected count.
  PAssert.thatSingleton(cassandraData.apply("Count", Count.globally()))
      .isEqualTo(TEST_DATA_ROW_COUNT);
  PCollection<String> textValues = cassandraData.apply(Values.create());
  // Verify the output values using checksum comparison.
  PCollection<String> consolidatedHashcode =
      textValues.apply(Combine.globally(new HashingFn()).withoutDefaults());
  PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedHashCode);
  p.run().waitUntilFinish();
}
 
源代码15 项目: beam   文件: CombineTranslation.java
@Override
public FunctionSpec translate(
    AppliedPTransform<?, ?, Combine.PerKey<?, ?, ?>> transform, SdkComponents components)
    throws IOException {
  if (transform.getTransform().getSideInputs().isEmpty()) {
    GlobalCombineFn<?, ?, ?> combineFn = transform.getTransform().getFn();
    Coder<?> accumulatorCoder =
        extractAccumulatorCoder(combineFn, (AppliedPTransform) transform);
    return FunctionSpec.newBuilder()
        .setUrn(getUrn(transform.getTransform()))
        .setPayload(combinePayload(combineFn, accumulatorCoder, components).toByteString())
        .build();
  } else {
    // Combines with side inputs are translated as generic composites, which have a blank
    // FunctionSpec.
    return null;
  }
}
 
源代码16 项目: beam   文件: CombineTranslation.java
@Override
public FunctionSpec translate(
    AppliedPTransform<?, ?, Combine.Globally<?, ?>> transform, SdkComponents components)
    throws IOException {
  if (transform.getTransform().getSideInputs().isEmpty()) {
    return FunctionSpec.newBuilder()
        .setUrn(getUrn(transform.getTransform()))
        .setPayload(
            payloadForCombineGlobally((AppliedPTransform) transform, components).toByteString())
        .build();
  } else {
    // Combines with side inputs are translated as generic composites, which have a blank
    // FunctionSpec.
    return null;
  }
}
 
源代码17 项目: beam   文件: CombineTranslation.java
private static <K, InputT, AccumT> Coder<AccumT> extractAccumulatorCoder(
    GlobalCombineFn<InputT, AccumT, ?> combineFn,
    AppliedPTransform<
            PCollection<KV<K, Iterable<InputT>>>, ?, Combine.GroupedValues<K, InputT, ?>>
        transform)
    throws IOException {
  try {
    @SuppressWarnings("unchecked")
    PCollection<KV<K, Iterable<InputT>>> mainInput =
        (PCollection<KV<K, Iterable<InputT>>>)
            Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(transform));
    KvCoder<K, Iterable<InputT>> kvCoder = (KvCoder<K, Iterable<InputT>>) mainInput.getCoder();
    IterableCoder<InputT> iterCoder = (IterableCoder<InputT>) kvCoder.getValueCoder();
    return combineFn.getAccumulatorCoder(
        transform.getPipeline().getCoderRegistry(), iterCoder.getElemCoder());
  } catch (CannotProvideCoderException e) {
    throw new IOException("Could not obtain a Coder for the accumulator", e);
  }
}
 
源代码18 项目: beam   文件: CombineValuesFnFactoryTest.java
@Test
public void testCombineValuesFnExtract() throws Exception {
  TestReceiver receiver = new TestReceiver();
  MeanInts mean = new MeanInts();

  Combine.CombineFn<Integer, CountSum, String> combiner = mean;

  ParDoFn combineParDoFn =
      createCombineValuesFn(
          CombinePhase.EXTRACT,
          combiner,
          StringUtf8Coder.of(),
          BigEndianIntegerCoder.of(),
          new CountSumCoder(),
          WindowingStrategy.globalDefault());

  combineParDoFn.startBundle(receiver);
  combineParDoFn.processElement(
      WindowedValue.valueInGlobalWindow(KV.of("a", new CountSum(6, 27))));
  combineParDoFn.processElement(
      WindowedValue.valueInGlobalWindow(KV.of("b", new CountSum(3, 21))));
  combineParDoFn.finishBundle();

  assertArrayEquals(
      new Object[] {
        WindowedValue.valueInGlobalWindow(KV.of("a", String.format("%.1f", 4.5))),
        WindowedValue.valueInGlobalWindow(KV.of("b", String.format("%.1f", 7.0)))
      },
      receiver.receivedElems.toArray());
}
 
源代码19 项目: DataflowTemplates   文件: ExportTransformTest.java
@Test
public void buildDatabaseManifestFile() throws InvalidProtocolBufferException {
  Map<String, String> tablesAndManifests =
      ImmutableMap.of("table1", "table1 manifest", "table2", "table2 manifest");

  Export.Builder builder = Export.newBuilder();
  builder.addTablesBuilder().setName("table1").setManifestFile("table1-manifest.json");
  builder.addTablesBuilder().setName("table2").setManifestFile("table2-manifest.json");
  String expectedManifest = JsonFormat.printer().print(builder.build());

  PCollection<String> databaseManifest =
      pipeline
          .apply(Create.of(tablesAndManifests))
          .apply(Combine.globally(new CreateDatabaseManifest()));

  // The output JSON may contain the tables in any order, so a string comparison is not
  // sufficient. Have to convert the manifest string to a protobuf. Also for the checker function
  // to be serializable, it has to be written as a lambda.
  PAssert.thatSingleton(databaseManifest)
      .satisfies( // Checker function.
          (SerializableFunction<String, Void>)
              input -> {
                Builder builder1 = Export.newBuilder();
                try {
                  JsonFormat.parser().merge(input, builder1);
                } catch (InvalidProtocolBufferException e) {
                  throw new RuntimeException(e);
                }
                Export manifestProto = builder1.build();
                assertThat(manifestProto.getTablesCount(), is(2));
                String table1Name = manifestProto.getTables(0).getName();
                assertThat(table1Name, startsWith("table"));
                assertThat(
                    manifestProto.getTables(0).getManifestFile(),
                    is(table1Name + "-manifest.json"));
                return null;
              });

  pipeline.run();
}
 
源代码20 项目: beam   文件: DataflowPTransformMatchersTest.java
/** Creates a simple pipeline with a {@link Combine.GroupedValues}. */
private static TestPipeline createCombineGroupedValuesPipeline() {
  TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
  PCollection<KV<String, Integer>> input =
      pipeline
          .apply(Create.of(KV.of("key", 1)))
          .setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
  input.apply(GroupByKey.create()).apply(Combine.groupedValues(new SumCombineFn()));

  return pipeline;
}
 
源代码21 项目: beam   文件: TrafficMaxLaneFlow.java
@Override
public PCollection<TableRow> expand(PCollection<KV<String, LaneInfo>> flowInfo) {
  // stationId, LaneInfo => stationId + max lane flow info
  PCollection<KV<String, LaneInfo>> flowMaxes = flowInfo.apply(Combine.perKey(new MaxFlow()));

  // <stationId, max lane flow info>... => row...
  PCollection<TableRow> results = flowMaxes.apply(ParDo.of(new FormatMaxesFn()));

  return results;
}
 
源代码22 项目: beam   文件: SamzaStoreStateInternals.java
protected SamzaAccumulatorCombiningState(
    StateNamespace namespace,
    StateTag<? extends State> address,
    Coder<AccumT> coder,
    Combine.CombineFn<InT, AccumT, OutT> combineFn) {
  super(namespace, address, coder);

  this.combineFn = combineFn;
}
 
源代码23 项目: beam   文件: SamzaPipelineTranslator.java
private static boolean canTranslate(String urn, PTransform<?, ?> transform) {
  if (!TRANSLATORS.containsKey(urn)) {
    return false;
  } else if (urn.equals(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN)) {
    // According to BEAM, Combines with side inputs are translated as generic composites
    return ((Combine.PerKey) transform).getSideInputs().isEmpty();
  } else {
    return true;
  }
}
 
源代码24 项目: beam   文件: Task.java
static PCollection<Long> applyTransform(PCollection<String> events) {
  return events
      .apply(
          Window.<String>into(FixedWindows.of(Duration.standardDays(1)))
              .triggering(
                  AfterWatermark.pastEndOfWindow()
                      .withEarlyFirings(
                          AfterProcessingTime.pastFirstElementInPane()))
              .withAllowedLateness(Duration.ZERO)
              .accumulatingFiredPanes())

      .apply(Combine.globally(Count.<String>combineFn()).withoutDefaults());
}
 
源代码25 项目: beam   文件: Task.java
static PCollection<Long> applyTransform(PCollection<String> events) {
  return events
      .apply(
          Window.<String>into(FixedWindows.of(Duration.standardSeconds(5)))
              .triggering(AfterWatermark.pastEndOfWindow())
              .withAllowedLateness(Duration.ZERO)
              .discardingFiredPanes())

      .apply(Combine.globally(Count.<String>combineFn()).withoutDefaults());
}
 
源代码26 项目: beam   文件: FlinkBroadcastStateInternals.java
FlinkKeyedCombiningState(
    OperatorStateBackend flinkStateBackend,
    StateTag<CombiningState<InputT, AccumT, OutputT>> address,
    Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
    StateNamespace namespace,
    Coder<AccumT> accumCoder,
    FlinkBroadcastStateInternals<K2> flinkStateInternals) {
  super(flinkStateBackend, address.getId(), namespace, accumCoder);

  this.namespace = namespace;
  this.address = address;
  this.combineFn = combineFn;
  this.flinkStateInternals = flinkStateInternals;
}
 
源代码27 项目: beam   文件: SketchFrequencies.java
@Override
public PCollection<Sketch<InputT>> expand(PCollection<InputT> input) {
  return input.apply(
      "Compute Count-Min Sketch",
      Combine.globally(
          CountMinSketchFn.create(input.getCoder())
              .withAccuracy(relativeError(), confidence())));
}
 
源代码28 项目: beam   文件: SketchFrequencies.java
@Override
public PCollection<KV<K, Sketch<V>>> expand(PCollection<KV<K, V>> input) {
  KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();
  return input.apply(
      "Compute Count-Min Sketch perKey",
      Combine.perKey(
          CountMinSketchFn.create(inputCoder.getValueCoder())
              .withAccuracy(relativeError(), confidence())));
}
 
源代码29 项目: beam   文件: ApproximateDistinct.java
@Override
public PCollection<Long> expand(PCollection<InputT> input) {
  return input
      .apply(
          "Compute HyperLogLog Structure",
          Combine.globally(
              ApproximateDistinctFn.create(input.getCoder())
                  .withPrecision(this.precision())
                  .withSparseRepresentation(this.sparsePrecision())))
      .apply("Retrieve Cardinality", ParDo.of(RetrieveCardinality.globally()));
}
 
源代码30 项目: beam   文件: BatchViewOverrides.java
@Override
public PCollection<?> expand(PCollection<T> input) {
  input = input.apply(Combine.globally(combineFn).withoutDefaults().withFanout(fanout));
  @SuppressWarnings("unchecked")
  Coder<BoundedWindow> windowCoder =
      (Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder();

  return BatchViewAsSingleton.applyForSingleton(
      runner,
      input,
      new IsmRecordForSingularValuePerWindowDoFn<>(windowCoder),
      input.getCoder(),
      view);
}
 
 类方法
 同包方法