类org.apache.beam.sdk.transforms.windowing.FixedWindows源码实例Demo

下面列出了怎么用org.apache.beam.sdk.transforms.windowing.FixedWindows的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: beam   文件: FlattenTest.java
@Test
public void testIncompatibleWindowFnPropagationFailure() {
  p.enableAbandonedNodeEnforcement(false);

  PCollection<String> input1 =
      p.apply("CreateInput1", Create.of("Input1"))
          .apply("Window1", Window.into(FixedWindows.of(Duration.standardMinutes(1))));
  PCollection<String> input2 =
      p.apply("CreateInput2", Create.of("Input2"))
          .apply("Window2", Window.into(FixedWindows.of(Duration.standardMinutes(2))));

  try {
    PCollectionList.of(input1).and(input2).apply(Flatten.pCollections());
    Assert.fail("Exception should have been thrown");
  } catch (IllegalStateException e) {
    Assert.assertTrue(
        e.getMessage().startsWith("Inputs to Flatten had incompatible window windowFns"));
  }
}
 
源代码2 项目: beam   文件: WriteFilesTest.java
/** Test a WriteFiles with a windowed PCollection. */
@Test
@Category(NeedsRunner.class)
public void testWriteWindowed() throws IOException {
  List<String> inputs =
      Arrays.asList(
          "Critical canary",
          "Apprehensive eagle",
          "Intimidating pigeon",
          "Pedantic gull",
          "Frisky finch");
  runWrite(
      inputs,
      new WindowAndReshuffle<>(Window.into(FixedWindows.of(Duration.millis(2)))),
      getBaseOutputFilename(),
      WriteFiles.to(makeSimpleSink()));
}
 
源代码3 项目: beam   文件: AfterFirstStateMachineTest.java
@Test
public void testOnlyT1ShouldFireFixedWindows() throws Exception {
  tester =
      TriggerStateMachineTester.forTrigger(
          AfterFirstStateMachine.of(mockTrigger1, mockTrigger2),
          FixedWindows.of(Duration.millis(10)));
  tester.injectElements(1);
  IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11));

  when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(true);
  when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(false);

  assertTrue(tester.shouldFire(window)); // should fire

  tester.fireIfShouldFire(window);
  assertTrue(tester.isMarkedFinished(window));
}
 
源代码4 项目: beam-starter   文件: StreamWordCount.java
public static void main(String[] args) throws Exception {

    Options options = PipelineOptionsFactory.fromArgs(args).withValidation()
        .as(Options.class);
    options.setRunner(FlinkRunner.class);

    Pipeline p = Pipeline.create(options);

    KafkaIO.Read<byte[], String> kafkaIOReader = KafkaIO.read()
        .withBootstrapServers("192.168.99.100:32771")
        .withTopics(Arrays.asList("beam".split(",")))
        .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))
        .withValueCoder(StringUtf8Coder.of());

    p.apply(kafkaIOReader.withoutMetadata())
        .apply(Values.<String>create())
        .apply(Window.<String>into(
          FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
        .apply(new CountWords())
        .apply(MapElements.via(new FormatAsTextFn()))
        .apply("WriteCounts", TextIO.Write.to(options.getOutput()));

    p.run();
  }
 
源代码5 项目: beam   文件: LeaderBoard.java
@Override
public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> infos) {
  return infos
      .apply(
          "LeaderboardTeamFixedWindows",
          Window.<GameActionInfo>into(FixedWindows.of(teamWindowDuration))
              // We will get early (speculative) results as well as cumulative
              // processing of late data.
              .triggering(
                  AfterWatermark.pastEndOfWindow()
                      .withEarlyFirings(
                          AfterProcessingTime.pastFirstElementInPane()
                              .plusDelayOf(FIVE_MINUTES))
                      .withLateFirings(
                          AfterProcessingTime.pastFirstElementInPane()
                              .plusDelayOf(TEN_MINUTES)))
              .withAllowedLateness(allowedLateness)
              .accumulatingFiredPanes())
      // Extract and sum teamname/score pairs from the event data.
      .apply("ExtractTeamScore", new ExtractAndSumScore("team"));
}
 
源代码6 项目: beam   文件: GroupByKeyTest.java
/**
 * Tests that when two elements are combined via a GroupByKey their output timestamp agrees with
 * the windowing function customized to actually be the same as the default, the earlier of the
 * two values.
 */
@Test
@Category(ValidatesRunner.class)
public void testTimestampCombinerEarliest() {

  p.apply(
          Create.timestamped(
              TimestampedValue.of(KV.of(0, "hello"), new Instant(0)),
              TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10))))
      .apply(
          Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10)))
              .withTimestampCombiner(TimestampCombiner.EARLIEST))
      .apply(GroupByKey.create())
      .apply(ParDo.of(new AssertTimestamp(new Instant(0))));

  p.run();
}
 
源代码7 项目: beam   文件: PortablePipelineDotRendererTest.java
@Test
public void testCompositePipeline() {
  p.apply(Create.timestamped(TimestampedValue.of(KV.of(1, 1), new Instant(1))))
      .apply(Window.into(FixedWindows.of(Duration.millis(10))))
      .apply(Sum.integersPerKey());

  assertEquals(
      "digraph {"
          + "    rankdir=LR"
          + "    0 [label=\"Create.TimestampedValues\\n\"]"
          + "    1 [label=\"Window.Into()\\n\"]"
          + "    0 -> 1 [style=solid label=\"Create.TimestampedValues/ParDo(ConvertTimestamps)/ParMultiDo(ConvertTimestamps).output\"]"
          + "    2 [label=\"Combine.perKey(SumInteger)\\nbeam:transform:combine_per_key:v1\"]"
          + "    1 -> 2 [style=solid label=\"Window.Into()/Window.Assign.out\"]"
          + "}",
      PortablePipelineDotRenderer.toDotString(PipelineTranslation.toProto(p))
          .replaceAll(System.lineSeparator(), ""));
}
 
源代码8 项目: beam   文件: DocumentationExamplesTest.java
@Test
public void windowingSection() {

  PCollection<Integer> input =
      pipeline.apply(Create.of(1, 2, 3, 4)).setTypeDescriptor(TypeDescriptors.integers());

  PCollection<KV<Integer, Long>> countedElements =
      CountByKey.of(input)
          .keyBy(e -> e)
          .windowBy(FixedWindows.of(Duration.standardSeconds(1)))
          .triggeredBy(DefaultTrigger.of())
          .discardingFiredPanes()
          .withAllowedLateness(Duration.standardSeconds(5))
          .withOnTimeBehavior(OnTimeBehavior.FIRE_IF_NON_EMPTY)
          .withTimestampCombiner(TimestampCombiner.EARLIEST)
          .output();

  pipeline.run();
}
 
源代码9 项目: beam   文件: ReduceByKeyTest.java
@Test
public void testBuild_Windowing() {
  final PCollection<String> dataset = TestUtils.createMockDataset(TypeDescriptors.strings());
  final PCollection<KV<String, Long>> reduced =
      ReduceByKey.of(dataset)
          .keyBy(s -> s)
          .valueBy(s -> 1L)
          .combineBy(Sums.ofLongs())
          .windowBy(FixedWindows.of(Duration.standardHours(1)))
          .triggeredBy(DefaultTrigger.of())
          .accumulationMode(AccumulationMode.DISCARDING_FIRED_PANES)
          .output();

  final ReduceByKey reduce = (ReduceByKey) TestUtils.getProducer(reduced);

  assertTrue(reduce.getWindow().isPresent());
  @SuppressWarnings("unchecked")
  final Window<? extends BoundedWindow> window = (Window) reduce.getWindow().get();
  assertEquals(FixedWindows.of(org.joda.time.Duration.standardHours(1)), window.getWindowFn());
  assertEquals(DefaultTrigger.of(), WindowDesc.of(window).getTrigger());
  assertSame(
      AccumulationMode.DISCARDING_FIRED_PANES, WindowDesc.of(window).getAccumulationMode());
  assertFalse(reduce.getValueComparator().isPresent());
}
 
源代码10 项目: beam   文件: ReduceByKeyTest.java
@Test
public void testBuild_sortedValues() {
  final PCollection<String> dataset = TestUtils.createMockDataset(TypeDescriptors.strings());
  final PCollection<KV<String, List<Long>>> reduced =
      ReduceByKey.of(dataset)
          .keyBy(s -> s)
          .valueBy(s -> 1L)
          .reduceBy(s -> s.collect(Collectors.toList()))
          .withSortedValues(Long::compare)
          .windowBy(FixedWindows.of(Duration.standardHours(1)))
          .triggeredBy(DefaultTrigger.of())
          .accumulationMode(AccumulationMode.DISCARDING_FIRED_PANES)
          .output();
  final ReduceByKey reduce = (ReduceByKey) TestUtils.getProducer(reduced);
  assertTrue(reduce.getValueComparator().isPresent());
}
 
源代码11 项目: beam   文件: SumByKeyTest.java
@Test
public void testBuild_Windowing() {
  final PCollection<String> dataset = TestUtils.createMockDataset(TypeDescriptors.strings());
  final PCollection<KV<String, Long>> counted =
      SumByKey.of(dataset)
          .keyBy(s -> s)
          .valueBy(s -> 1L)
          .windowBy(FixedWindows.of(org.joda.time.Duration.standardHours(1)))
          .triggeredBy(DefaultTrigger.of())
          .discardingFiredPanes()
          .withAllowedLateness(Duration.millis(1000))
          .output();
  final SumByKey sum = (SumByKey) TestUtils.getProducer(counted);
  assertTrue(sum.getWindow().isPresent());
  @SuppressWarnings("unchecked")
  final WindowDesc<?> windowDesc = WindowDesc.of((Window) sum.getWindow().get());
  assertEquals(
      FixedWindows.of(org.joda.time.Duration.standardHours(1)), windowDesc.getWindowFn());
  assertEquals(DefaultTrigger.of(), windowDesc.getTrigger());
  assertEquals(AccumulationMode.DISCARDING_FIRED_PANES, windowDesc.getAccumulationMode());
  assertEquals(Duration.millis(1000), windowDesc.getAllowedLateness());
}
 
源代码12 项目: beam   文件: LateDataUtilsTest.java
@Test
public void garbageCollectionTimeAfterEndOfGlobalWindowWithLateness() {
  FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(5));
  Duration allowedLateness = Duration.millis(Long.MAX_VALUE);
  WindowingStrategy<?, ?> strategy =
      WindowingStrategy.globalDefault()
          .withWindowFn(windowFn)
          .withAllowedLateness(allowedLateness);

  IntervalWindow window = windowFn.assignWindow(new Instant(-100));
  assertThat(
      window.maxTimestamp().plus(allowedLateness),
      Matchers.greaterThan(GlobalWindow.INSTANCE.maxTimestamp()));
  assertThat(
      LateDataUtils.garbageCollectionTime(window, strategy),
      equalTo(GlobalWindow.INSTANCE.maxTimestamp()));
}
 
源代码13 项目: beam   文件: ReduceFnRunnerTest.java
/**
 * When the watermark passes the end-of-window and window expiration time in a single update, this
 * tests that it does not crash.
 */
@Test
public void testFixedWindowsEowAndGcTogether() throws Exception {
  ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
      ReduceFnTester.nonCombining(
          FixedWindows.of(Duration.millis(10)),
          DefaultTriggerStateMachine.of(),
          AccumulationMode.ACCUMULATING_FIRED_PANES,
          Duration.millis(50),
          ClosingBehavior.FIRE_ALWAYS);

  tester.setAutoAdvanceOutputWatermark(true);

  tester.advanceInputWatermark(new Instant(0));
  injectElement(tester, 1);
  tester.advanceInputWatermark(new Instant(100));

  assertThat(
      tester.extractOutput(),
      contains(
          isSingleWindowedValue(
              contains(1), 1, 0, 10, PaneInfo.createPane(true, true, Timing.ON_TIME))));
}
 
源代码14 项目: beam   文件: ReshuffleTest.java
@Test
@Category(ValidatesRunner.class)
public void testReshuffleAfterSlidingWindowsAndGroupByKey() {

  PCollection<KV<String, Iterable<Integer>>> input =
      pipeline
          .apply(
              Create.of(GBK_TESTABLE_KVS)
                  .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())))
          .apply(Window.into(FixedWindows.of(Duration.standardMinutes(10L))))
          .apply(GroupByKey.create());

  PCollection<KV<String, Iterable<Integer>>> output = input.apply(Reshuffle.of());

  PAssert.that(output).satisfies(new AssertThatHasExpectedContents());

  assertEquals(input.getWindowingStrategy(), output.getWindowingStrategy());

  pipeline.run();
}
 
源代码15 项目: beam   文件: WriteFilesTest.java
@Test
@Category(NeedsRunner.class)
public void testWriteNoSpilling() throws IOException {
  List<String> inputs = Lists.newArrayList();
  for (int i = 0; i < 100; ++i) {
    inputs.add("mambo_number_" + i);
  }
  runWrite(
      inputs,
      Window.into(FixedWindows.of(Duration.millis(1))),
      getBaseOutputFilename(),
      WriteFiles.to(makeSimpleSink())
          .withMaxNumWritersPerBundle(2)
          .withWindowedWrites()
          .withNoSpilling());
}
 
源代码16 项目: beam   文件: BeamSqlDslJoinTest.java
@Test
public void testJoinsUnboundedWithinWindowsWithDefaultTrigger() throws Exception {

  String sql =
      "SELECT o1.order_id, o1.price, o1.site_id, o2.order_id, o2.price, o2.site_id  "
          + "FROM ORDER_DETAILS1 o1"
          + " JOIN ORDER_DETAILS2 o2"
          + " on "
          + " o1.order_id=o2.site_id AND o2.price=o1.site_id";

  PCollection<Row> orders =
      ordersUnbounded()
          .apply("window", Window.into(FixedWindows.of(Duration.standardSeconds(50))));
  PCollectionTuple inputs = tuple("ORDER_DETAILS1", orders, "ORDER_DETAILS2", orders);

  PAssert.that(inputs.apply("sql", SqlTransform.query(sql)))
      .containsInAnyOrder(
          TestUtils.RowsBuilder.of(RESULT_ROW_TYPE)
              .addRows(1, 2, 2, 2, 2, 1, 1, 4, 3, 3, 3, 1)
              .getRows());

  pipeline.run();
}
 
源代码17 项目: beam   文件: AfterWatermarkStateMachineTest.java
@Test
public void testEarlyAndAtWatermark() throws Exception {
  tester =
      TriggerStateMachineTester.forTrigger(
          AfterWatermarkStateMachine.pastEndOfWindow().withEarlyFirings(mockEarly),
          FixedWindows.of(Duration.millis(100)));

  injectElements(1);
  IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));

  testRunningAsTrigger(mockEarly, window);

  // Fire due to watermark
  when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
  tester.advanceInputWatermark(new Instant(100));
  assertTrue(tester.shouldFire(window));
  tester.fireIfShouldFire(window);
  assertTrue(tester.isMarkedFinished(window));
}
 
源代码18 项目: beam   文件: GroupByKeyTest.java
@Test
@Category(NeedsRunner.class)
public void testIdentityWindowFnPropagation() {

  List<KV<String, Integer>> ungroupedPairs = Arrays.asList();

  PCollection<KV<String, Integer>> input =
      p.apply(
              Create.of(ungroupedPairs)
                  .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
          .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))));

  PCollection<KV<String, Iterable<Integer>>> output = input.apply(GroupByKey.create());

  p.run();

  Assert.assertTrue(
      output
          .getWindowingStrategy()
          .getWindowFn()
          .isCompatible(FixedWindows.of(Duration.standardMinutes(1))));
}
 
源代码19 项目: beam   文件: WindowIntoTranslationTest.java
@Parameters(name = "{index}: {0}")
public static Iterable<WindowFn<?, ?>> data() {
  // This pipeline exists for construction, not to run any test.
  return ImmutableList.<WindowFn<?, ?>>builder()
      .add(FixedWindows.of(Duration.standardMinutes(10L)))
      .add(new GlobalWindows())
      .add(Sessions.withGapDuration(Duration.standardMinutes(15L)))
      .add(SlidingWindows.of(Duration.standardMinutes(5L)).every(Duration.standardMinutes(1L)))
      .add(new CustomWindows())
      .build();
}
 
源代码20 项目: streamingbook   文件: BeamModel.java
@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
    return input
        .apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES)))
        .apply(Sum.integersPerKey())
        .apply(ParDo.of(new FormatAsStrings()));
}
 
源代码21 项目: beam   文件: GroupAlsoByWindowProperties.java
/**
 * Tests that the given GABW implementation correctly groups elements that fall into overlapping
 * windows that are not merged.
 */
public static void groupsIntoOverlappingNonmergingWindows(
    GroupAlsoByWindowDoFnFactory<String, String, Iterable<String>> gabwFactory) throws Exception {

  WindowingStrategy<?, IntervalWindow> windowingStrategy =
      WindowingStrategy.of(FixedWindows.of(Duration.millis(10)));

  List<WindowedValue<KV<String, Iterable<String>>>> result =
      runGABW(
          gabwFactory,
          windowingStrategy,
          "key",
          WindowedValue.of("v1", new Instant(1), Arrays.asList(window(0, 5)), PaneInfo.NO_FIRING),
          WindowedValue.of("v2", new Instant(4), Arrays.asList(window(1, 5)), PaneInfo.NO_FIRING),
          WindowedValue.of(
              "v3", new Instant(4), Arrays.asList(window(0, 5)), PaneInfo.NO_FIRING));

  assertThat(result, hasSize(2));

  TimestampedValue<KV<String, Iterable<String>>> item0 =
      getOnlyElementInWindow(result, window(0, 5));
  assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v3"));
  assertThat(item0.getTimestamp(), equalTo(window(1, 5).maxTimestamp()));

  TimestampedValue<KV<String, Iterable<String>>> item1 =
      getOnlyElementInWindow(result, window(1, 5));
  assertThat(item1.getValue().getValue(), contains("v2"));
  assertThat(item1.getTimestamp(), equalTo(window(0, 5).maxTimestamp()));
}
 
源代码22 项目: beam   文件: DoFnOperatorTest.java
@Before
public void setUp() {
  PCollection<String> pc = Pipeline.create().apply(Create.of("1"));
  view1 =
      pc.apply(Window.into(FixedWindows.of(new Duration(WINDOW_MSECS_1))))
          .apply(View.asIterable());
  view2 =
      pc.apply(Window.into(FixedWindows.of(new Duration(WINDOW_MSECS_2))))
          .apply(View.asIterable());
}
 
源代码23 项目: streamingbook   文件: BeamModel.java
@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
    return input
        .apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES)))
        .apply(Sum.integersPerKey())
        .apply(ParDo.of(new FormatAsStrings()));
}
 
源代码24 项目: beam   文件: Query7.java
@Override
public PCollection<Bid> expand(PCollection<Event> events) {
  // Window the bids.
  PCollection<Bid> slidingBids =
      events
          .apply(NexmarkQueryUtil.JUST_BIDS)
          .apply(
              Window.into(
                  FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))));

  // Find the largest price in all bids.
  // NOTE: It would be more efficient to write this query much as we did for Query5, using
  // a binary combiner to accumulate the bids with maximal price. As written this query
  // requires an additional scan per window, with the associated cost of snapshotted state and
  // its I/O. We'll keep this implementation since it illustrates the use of side inputs.
  final PCollectionView<Long> maxPriceView =
      slidingBids
          .apply("BidToPrice", NexmarkQueryUtil.BID_TO_PRICE)
          .apply(Max.longsGlobally().withFanout(configuration.fanout).asSingletonView());

  return slidingBids
      // Select all bids which have that maximum price (there may be more than one).
      .apply(
      name + ".Select",
      ParDo.of(
              new DoFn<Bid, Bid>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                  long maxPrice = c.sideInput(maxPriceView);
                  Bid bid = c.element();
                  if (bid.price == maxPrice) {
                    c.output(bid);
                  }
                }
              })
          .withSideInputs(maxPriceView));
}
 
源代码25 项目: beam   文件: BigtableIOTest.java
/** Tests that at least one result is emitted per element written in each window. */
@Test
public void testWritingEmitsResultsWhenDoneInFixedWindow() throws Exception {
  final String table = "table";
  final String key = "key";
  final String value = "value";

  service.createTable(table);

  Instant elementTimestamp = Instant.parse("2019-06-10T00:00:00");
  Duration windowDuration = Duration.standardMinutes(1);

  TestStream<Integer> input =
      TestStream.create(VarIntCoder.of())
          .advanceWatermarkTo(elementTimestamp)
          .addElements(1)
          .advanceWatermarkTo(elementTimestamp.plus(windowDuration))
          .addElements(2)
          .advanceWatermarkToInfinity();

  BoundedWindow expectedFirstWindow = new IntervalWindow(elementTimestamp, windowDuration);
  BoundedWindow expectedSecondWindow =
      new IntervalWindow(elementTimestamp.plus(windowDuration), windowDuration);

  PCollection<BigtableWriteResult> results =
      p.apply("rows", input)
          .apply("window", Window.into(FixedWindows.of(windowDuration)))
          .apply("expand", ParDo.of(new WriteGeneratorDoFn()))
          .apply("write", defaultWrite.withTableId(table).withWriteResults());
  PAssert.that(results)
      .inWindow(expectedFirstWindow)
      .containsInAnyOrder(BigtableWriteResult.create(1));
  PAssert.that(results)
      .inWindow(expectedSecondWindow)
      .containsInAnyOrder(BigtableWriteResult.create(2));

  p.run();
}
 
源代码26 项目: beam   文件: CountTest.java
@Test
public void testGlobalWindowErrorMessageShows() {
  PCollection<String> input = p.apply(Create.of(NO_LINES).withCoder(StringUtf8Coder.of()));
  PCollection<String> windowed =
      input.apply(Window.into(FixedWindows.of(Duration.standardDays(1))));

  String expected = Count.combineFn().getIncompatibleGlobalWindowErrorMessage();
  exceptionRule.expect(IllegalStateException.class);
  exceptionRule.expectMessage(expected);
  windowed.apply(Count.globally());
}
 
源代码27 项目: beam   文件: ReduceFnRunnerTest.java
@Test
public void testOnElementCombiningAccumulating() throws Exception {
  // Test basic execution of a trigger using a non-combining window set and accumulating mode.
  WindowingStrategy<?, IntervalWindow> strategy =
      WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
          .withTimestampCombiner(TimestampCombiner.EARLIEST)
          .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
          .withAllowedLateness(Duration.millis(100));

  ReduceFnTester<Integer, Integer, IntervalWindow> tester =
      ReduceFnTester.combining(
          strategy, mockTriggerStateMachine, Sum.ofIntegers(), VarIntCoder.of());

  injectElement(tester, 1);

  when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
  injectElement(tester, 2);

  when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
  triggerShouldFinish(mockTriggerStateMachine);
  injectElement(tester, 3);

  // This element shouldn't be seen, because the trigger has finished
  injectElement(tester, 4);

  assertThat(
      tester.extractOutput(),
      contains(
          isSingleWindowedValue(equalTo(3), 1, 0, 10),
          isSingleWindowedValue(equalTo(6), 3, 0, 10)));
  assertTrue(tester.isMarkedFinished(firstWindow));
  tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
}
 
源代码28 项目: incubator-nemo   文件: WindowedWordCount.java
/**
 * Main function for the MR BEAM program.
 *
 * @param args arguments.
 */
public static void main(final String[] args) {
  final String outputFilePath = args[0];
  final String windowType = args[1];

  final Window<KV<String, Long>> windowFn;
  if (windowType.equals("fixed")) {
    windowFn = Window.<KV<String, Long>>into(FixedWindows.of(Duration.standardSeconds(5)));
  } else {
    windowFn = Window.<KV<String, Long>>into(SlidingWindows.of(Duration.standardSeconds(10))
      .every(Duration.standardSeconds(5)));
  }

  final PipelineOptions options = NemoPipelineOptionsFactory.create();
  options.setJobName("WindowedWordCount");

  final Pipeline p = Pipeline.create(options);

  getSource(p, args)
    .apply(windowFn)
    .apply(Sum.longsPerKey())
    .apply(MapElements.<KV<String, Long>, String>via(new SimpleFunction<KV<String, Long>, String>() {
      @Override
      public String apply(final KV<String, Long> kv) {
        return kv.getKey() + ": " + kv.getValue();
      }
    }))
    .apply(new WriteOneFilePerWindow(outputFilePath, 1));

  p.run().waitUntilFinish();
}
 
源代码29 项目: beam   文件: ViewTest.java
@Test
@Category(ValidatesRunner.class)
public void testWindowedSideInputFixedToGlobal() {

  final PCollectionView<Integer> view =
      pipeline
          .apply(
              "CreateSideInput",
              Create.timestamped(
                  TimestampedValue.of(1, new Instant(1)),
                  TimestampedValue.of(2, new Instant(11)),
                  TimestampedValue.of(3, new Instant(13))))
          .apply("WindowSideInput", Window.into(new GlobalWindows()))
          .apply(Sum.integersGlobally())
          .apply(View.asSingleton());

  PCollection<String> output =
      pipeline
          .apply(
              "CreateMainInput",
              Create.timestamped(
                  TimestampedValue.of("A", new Instant(4)),
                  TimestampedValue.of("B", new Instant(15)),
                  TimestampedValue.of("C", new Instant(7))))
          .apply("WindowMainInput", Window.into(FixedWindows.of(Duration.millis(10))))
          .apply(
              "OutputMainAndSideInputs",
              ParDo.of(
                      new DoFn<String, String>() {
                        @ProcessElement
                        public void processElement(ProcessContext c) {
                          c.output(c.element() + c.sideInput(view));
                        }
                      })
                  .withSideInputs(view));

  PAssert.that(output).containsInAnyOrder("A6", "B6", "C6");

  pipeline.run();
}
 
源代码30 项目: gcp-ingestion   文件: Write.java
@Override
public WithFailures.Result<PDone, PubsubMessage> expand(PCollection<PubsubMessage> input) {
  ValueProvider<DynamicPathTemplate> pathTemplate = NestedValueProvider.of(outputPrefix,
      DynamicPathTemplate::new);
  ValueProvider<String> staticPrefix = NestedValueProvider.of(pathTemplate,
      value -> value.staticPrefix);

  FileIO.Write<List<String>, PubsubMessage> write = FileIO
      .<List<String>, PubsubMessage>writeDynamic()
      // We can't pass the attribute map to by() directly since MapCoder isn't
      // deterministic;
      // instead, we extract an ordered list of the needed placeholder values.
      // That list is later available to withNaming() to determine output location.
      .by(message -> pathTemplate.get()
          .extractValuesFrom(DerivedAttributesMap.of(message.getAttributeMap())))
      .withDestinationCoder(ListCoder.of(StringUtf8Coder.of())) //
      .withCompression(compression) //
      .via(Contextful.fn(format::encodeSingleMessage), TextIO.sink()) //
      .to(staticPrefix) //
      .withNaming(placeholderValues -> NoColonFileNaming.defaultNaming(
          pathTemplate.get().replaceDynamicPart(placeholderValues), format.suffix()));

  if (inputType == InputType.pubsub) {
    // Passing a ValueProvider to withNumShards disables runner-determined sharding, so we
    // need to be careful to pass this only for streaming input (where runner-determined
    // sharding is not an option).
    write = write.withNumShards(numShards);
  }

  input //
      .apply(Window.<PubsubMessage>into(FixedWindows.of(windowDuration))
          // We allow lateness up to the maximum Cloud Pub/Sub retention of 7 days documented in
          // https://cloud.google.com/pubsub/docs/subscriber
          .withAllowedLateness(Duration.standardDays(7)) //
          .discardingFiredPanes())
      .apply(write);
  return WithFailures.Result.of(PDone.in(input.getPipeline()),
      EmptyErrors.in(input.getPipeline()));
}
 
 类方法
 同包方法