org.joda.time.Instant#plus ( )源码实例Demo

下面列出了org.joda.time.Instant#plus ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: beam   文件: WinningBids.java
/**
 * Return a bid window for {@code bid}. It should later be merged into the corresponding auction
 * window. However, it is possible this bid is for an already expired auction, or for an auction
 * which the system has not yet seen. So we give the bid a bit of wiggle room in its interval.
 */
public static AuctionOrBidWindow forBid(
    long expectedAuctionDurationMs, Instant timestamp, Bid bid) {
  // At this point we don't know which auctions are still valid, and the bid may
  // be for an auction which won't start until some unknown time in the future
  // (due to Generator.AUCTION_ID_LEAD in Generator.nextBid).
  // A real system would atomically reconcile bids and auctions by a separate mechanism.
  // If we give bids an unbounded window it is possible a bid for an auction which
  // has already expired would cause the system watermark to stall, since that window
  // would never be retired.
  // Instead, we will just give the bid a finite window which expires at
  // the upper bound of auctions assuming the auction starts at the same time as the bid,
  // and assuming the system is running at its lowest event rate (as per interEventDelayUs).
  return new AuctionOrBidWindow(
      timestamp, timestamp.plus(expectedAuctionDurationMs * 2), bid.auction, false);
}
 
源代码2 项目: beam   文件: FixedWindows.java
@Override
public IntervalWindow assignWindow(Instant timestamp) {
  Instant start =
      new Instant(
          timestamp.getMillis()
              - timestamp.plus(size).minus(offset).getMillis() % size.getMillis());

  // The global window is inclusive of max timestamp, while interval window excludes its
  // upper bound
  Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp().plus(1);

  // The end of the window is either start + size if that is within the allowable range, otherwise
  // the end of the global window. Truncating the window drives many other
  // areas of this system in the appropriate way automatically.
  //
  // Though it is curious that the very last representable fixed window is shorter than the rest,
  // when we are processing data in the year 294247, we'll probably have technology that can
  // account for this.
  Instant end =
      start.isAfter(endOfGlobalWindow.minus(size)) ? endOfGlobalWindow : start.plus(size);

  return new IntervalWindow(start, end);
}
 
源代码3 项目: beam   文件: DoFnTesterTest.java
@Test
public void testSupportsWindowParameter() throws Exception {
  Instant now = Instant.now();
  try (DoFnTester<Integer, KV<Integer, BoundedWindow>> tester =
      DoFnTester.of(new DoFnWithWindowParameter())) {
    BoundedWindow firstWindow = new IntervalWindow(now, now.plus(Duration.standardMinutes(1)));
    tester.processWindowedElement(1, now, firstWindow);
    tester.processWindowedElement(2, now, firstWindow);
    BoundedWindow secondWindow = new IntervalWindow(now, now.plus(Duration.standardMinutes(4)));
    tester.processWindowedElement(3, now, secondWindow);
    tester.finishBundle();

    assertThat(
        tester.peekOutputElementsInWindow(firstWindow),
        containsInAnyOrder(
            TimestampedValue.of(KV.of(1, firstWindow), now),
            TimestampedValue.of(KV.of(2, firstWindow), now)));
    assertThat(
        tester.peekOutputElementsInWindow(secondWindow),
        containsInAnyOrder(TimestampedValue.of(KV.of(3, secondWindow), now)));
  }
}
 
源代码4 项目: beam   文件: UnboundedSourceSystemTest.java
@Test
public void testAdvanceWatermark() throws IOException, InterruptedException {
  final Instant now = Instant.now();
  final Instant nowPlusOne = now.plus(1L);
  final TestUnboundedSource<String> source =
      TestUnboundedSource.<String>createBuilder()
          .setTimestamp(now)
          .addElements("first")
          .setTimestamp(nowPlusOne)
          .addElements("second")
          .advanceWatermarkTo(now)
          .build();

  final UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer =
      createConsumer(source);

  consumer.register(DEFAULT_SSP, NULL_STRING);
  consumer.start();
  assertEquals(
      Arrays.asList(
          createElementMessage(DEFAULT_SSP, offset(0), "first", now),
          createElementMessage(DEFAULT_SSP, offset(1), "second", nowPlusOne),
          createWatermarkMessage(DEFAULT_SSP, now)),
      consumeUntilTimeoutOrWatermark(consumer, DEFAULT_SSP, DEFAULT_WATERMARK_TIMEOUT_MILLIS));
  consumer.stop();
}
 
源代码5 项目: beam   文件: WindowedValueTest.java
@Test
public void testExplodeWindowsManyWindowsMultipleWindowedValues() {
  Instant now = Instant.now();
  BoundedWindow centerWindow = new IntervalWindow(now.minus(1000L), now.plus(1000L));
  BoundedWindow pastWindow = new IntervalWindow(now.minus(1500L), now.plus(500L));
  BoundedWindow futureWindow = new IntervalWindow(now.minus(500L), now.plus(1500L));
  BoundedWindow futureFutureWindow = new IntervalWindow(now, now.plus(2000L));
  PaneInfo pane = PaneInfo.createPane(false, false, Timing.ON_TIME, 3L, 0L);
  WindowedValue<String> value =
      WindowedValue.of(
          "foo",
          now,
          ImmutableList.of(pastWindow, centerWindow, futureWindow, futureFutureWindow),
          pane);

  assertThat(
      value.explodeWindows(),
      containsInAnyOrder(
          WindowedValue.of("foo", now, futureFutureWindow, pane),
          WindowedValue.of("foo", now, futureWindow, pane),
          WindowedValue.of("foo", now, centerWindow, pane),
          WindowedValue.of("foo", now, pastWindow, pane)));

  assertThat(value.isSingleWindowedValue(), equalTo(false));
}
 
源代码6 项目: beam   文件: StatefulDoFnRunner.java
/**
 * Setup timer for flush time @{code flush}. The time is adjusted to respect allowed lateness and
 * window garbage collection time. Setup watermark hold for the flush time.
 *
 * <p>Note that this is equivalent to {@link org.apache.beam.sdk.state.Timer#withOutputTimestamp}
 * and should be reworked to use that feature once that is stable.
 */
private void setupFlushTimerAndWatermarkHold(
    StateNamespace namespace, BoundedWindow window, Instant flush) {
  Instant flushWithLateness = flush.plus(windowingStrategy.getAllowedLateness());
  Instant windowGcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
  if (flushWithLateness.isAfter(windowGcTime)) {
    flushWithLateness = windowGcTime;
  }
  WatermarkHoldState watermark = stepContext.stateInternals().state(namespace, watermarkHold);
  stepContext
      .timerInternals()
      .setTimer(
          namespace,
          SORT_FLUSH_TIMER,
          SORT_FLUSH_TIMER,
          flushWithLateness,
          flush,
          TimeDomain.EVENT_TIME);
  watermark.clear();
  watermark.add(flush);
}
 
源代码7 项目: beam   文件: SnippetsTest.java
@Test
@Category({NeedsRunner.class, UsesStatefulParDo.class})
public void testSlowlyUpdatingSideInputsWindowed() {
  Instant startAt = Instant.now().minus(Duration.standardMinutes(3));
  Duration duration = Duration.standardSeconds(10);
  Instant stopAt = startAt.plus(duration);
  Duration interval1 = Duration.standardSeconds(1);
  Duration interval2 = Duration.standardSeconds(1);

  File f = null;
  try {
    f = File.createTempFile("testSlowlyUpdatingSIWindowed", "txt");
    try (BufferedWriter fw = Files.newWriter(f, Charset.forName("UTF-8"))) {
      fw.append("testdata");
    }
  } catch (IOException e) {
    Assert.fail("failed to create temp file: " + e.toString());
    throw new RuntimeException("Should never reach here");
  }

  PCollection<Long> result =
      Snippets.PeriodicallyUpdatingSideInputs.main(
          p, startAt, stopAt, interval1, interval2, f.getPath());

  ArrayList<Long> expectedResults = new ArrayList<Long>();
  expectedResults.add(0L);
  for (Long i = startAt.getMillis(); i < stopAt.getMillis(); i = i + interval2.getMillis()) {
    expectedResults.add(1L);
  }

  PAssert.that(result).containsInAnyOrder(expectedResults);

  p.run().waitUntilFinish();
  f.deleteOnExit();
}
 
源代码8 项目: beam   文件: PeriodicSequenceTest.java
@Test
@Category({
  NeedsRunner.class,
  UsesImpulse.class,
  UsesStatefulParDo.class,
})
public void testOutputsProperElements() {
  Instant instant = Instant.now();

  Instant startTime = instant.minus(Duration.standardHours(100));
  long duration = 500;
  Duration interval = Duration.millis(250);
  long intervalMillis = interval.getMillis();
  Instant stopTime = startTime.plus(duration);

  PCollection<KV<Instant, Instant>> result =
      p.apply(
              Create.<PeriodicSequence.SequenceDefinition>of(
                  new PeriodicSequence.SequenceDefinition(startTime, stopTime, interval)))
          .apply(PeriodicSequence.create())
          .apply(ParDo.of(new ExtractTsDoFn<>())); // used to validate timestamp

  ArrayList<KV<Instant, Instant>> expectedResults =
      new ArrayList<>((int) (duration / intervalMillis + 1));
  for (long i = 0; i <= duration; i += intervalMillis) {
    Instant el = startTime.plus(i);
    expectedResults.add(KV.of(el, el));
  }

  PAssert.that(result).containsInAnyOrder(expectedResults);

  p.run().waitUntilFinish();
}
 
源代码9 项目: beam   文件: UnboundedSourceSystemTest.java
@Test
public void testMultipleAdvanceWatermark() throws IOException, InterruptedException {
  final Instant now = Instant.now();
  final Instant nowPlusOne = now.plus(1L);
  final Instant nowPlusTwo = now.plus(2L);
  final TestUnboundedSource<String> source =
      TestUnboundedSource.<String>createBuilder()
          .setTimestamp(now)
          .addElements("first")
          .advanceWatermarkTo(now)
          .noElements() // will output the first watermark
          .setTimestamp(nowPlusOne)
          .addElements("second")
          .setTimestamp(nowPlusTwo)
          .addElements("third")
          .advanceWatermarkTo(nowPlusOne)
          .build();

  final UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer =
      createConsumer(source);

  consumer.register(DEFAULT_SSP, NULL_STRING);
  consumer.start();
  // consume to the first watermark
  assertEquals(
      Arrays.asList(
          createElementMessage(DEFAULT_SSP, offset(0), "first", now),
          createWatermarkMessage(DEFAULT_SSP, now)),
      consumeUntilTimeoutOrWatermark(consumer, DEFAULT_SSP, DEFAULT_WATERMARK_TIMEOUT_MILLIS));

  // consume to the second watermark
  assertEquals(
      Arrays.asList(
          createElementMessage(DEFAULT_SSP, offset(1), "second", nowPlusOne),
          createElementMessage(DEFAULT_SSP, offset(2), "third", nowPlusTwo),
          createWatermarkMessage(DEFAULT_SSP, nowPlusOne)),
      consumeUntilTimeoutOrWatermark(consumer, DEFAULT_SSP, DEFAULT_WATERMARK_TIMEOUT_MILLIS));

  consumer.stop();
}
 
源代码10 项目: beam   文件: UnboundedSourceSystemTest.java
@Test
public void testTimeout() throws Exception {
  final CountDownLatch advanceLatch = new CountDownLatch(1);
  final Instant now = Instant.now();
  final Instant nowPlusOne = now.plus(1);

  final TestUnboundedSource<String> source =
      TestUnboundedSource.<String>createBuilder()
          .setTimestamp(now)
          .addElements("before")
          .addLatch(advanceLatch)
          .setTimestamp(nowPlusOne)
          .addElements("after")
          .advanceWatermarkTo(nowPlusOne)
          .build();

  final UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer =
      createConsumer(source);

  consumer.register(DEFAULT_SSP, NULL_STRING);
  consumer.start();
  assertEquals(
      Collections.singletonList(createElementMessage(DEFAULT_SSP, offset(0), "before", now)),
      consumeUntilTimeoutOrWatermark(consumer, DEFAULT_SSP, DEFAULT_TIMEOUT_MILLIS));

  advanceLatch.countDown();

  assertEquals(
      Arrays.asList(
          createElementMessage(DEFAULT_SSP, offset(1), "after", nowPlusOne),
          createWatermarkMessage(DEFAULT_SSP, nowPlusOne)),
      consumeUntilTimeoutOrWatermark(consumer, DEFAULT_SSP, DEFAULT_TIMEOUT_MILLIS));
  consumer.stop();
}
 
源代码11 项目: beam   文件: WindowedValueTest.java
@Test
public void testSingleWindowedValueInFixedWindow() {
  Instant now = Instant.now();
  BoundedWindow w = new IntervalWindow(now, now.plus(1));
  WindowedValue<Integer> value = WindowedValue.of(1, now, w, PaneInfo.NO_FIRING);
  assertThat(value.isSingleWindowedValue(), equalTo(true));
  assertThat(((WindowedValue.SingleWindowedValue) value).getWindow(), equalTo(w));
}
 
源代码12 项目: beam   文件: StatefulDoFnRunner.java
@Override
public void setForWindow(InputT input, BoundedWindow window) {
  Instant gcTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy);
  // make sure this fires after any window.maxTimestamp() timers
  gcTime = gcTime.plus(GC_DELAY_MS);
  timerInternals.setTimer(
      StateNamespaces.window(windowCoder, window),
      GC_TIMER_ID,
      "",
      gcTime,
      window.maxTimestamp(),
      TimeDomain.EVENT_TIME);
}
 
源代码13 项目: beam   文件: StreamingModeExecutionContextTest.java
@Test
public void testTimerInternalsProcessingTimeSkew() {
  Windmill.WorkItemCommitRequest.Builder outputBuilder =
      Windmill.WorkItemCommitRequest.newBuilder();

  NameContext nameContext = NameContextsForTests.nameContextForTest();
  DataflowOperationContext operationContext =
      executionContext.createOperationContext(nameContext);
  StreamingModeExecutionContext.StepContext stepContext =
      executionContext.getStepContext(operationContext);
  Windmill.WorkItem.Builder workItemBuilder =
      Windmill.WorkItem.newBuilder().setKey(ByteString.EMPTY).setWorkToken(17L);
  Windmill.Timer.Builder timerBuilder = workItemBuilder.getTimersBuilder().addTimersBuilder();

  // Trigger a realtime timer that with clock skew but ensure that it would
  // still fire.
  Instant now = Instant.now();
  long offsetMillis = 60 * 1000;
  Instant timerTimestamp = now.plus(offsetMillis);
  timerBuilder
      .setTag(ByteString.copyFromUtf8("a"))
      .setTimestamp(timerTimestamp.getMillis() * 1000)
      .setType(Windmill.Timer.Type.REALTIME);

  executionContext.start(
      "key",
      workItemBuilder.build(),
      new Instant(1000), // input watermark
      null, // output watermark
      null, // synchronized processing time
      stateReader,
      stateFetcher,
      outputBuilder);
  TimerInternals timerInternals = stepContext.timerInternals();
  assertTrue(timerTimestamp.isBefore(timerInternals.currentProcessingTime()));
}
 
源代码14 项目: beam   文件: SplittableParDoProcessFnTest.java
@Test
public void testTrivialProcessFnPropagatesOutputWindowAndTimestamp() throws Exception {
  // Tests that ProcessFn correctly propagates the window and timestamp of the element
  // inside the KeyedWorkItem.
  // The underlying DoFn is actually monolithic, so this doesn't test splitting.
  DoFn<Integer, String> fn = new ToStringFn();

  Instant base = Instant.now();

  IntervalWindow w =
      new IntervalWindow(
          base.minus(Duration.standardMinutes(1)), base.plus(Duration.standardMinutes(1)));

  ProcessFnTester<Integer, String, SomeRestriction, Void, Void> tester =
      new ProcessFnTester<>(
          base,
          fn,
          BigEndianIntegerCoder.of(),
          SerializableCoder.of(SomeRestriction.class),
          VoidCoder.of(),
          MAX_OUTPUTS_PER_BUNDLE,
          MAX_BUNDLE_DURATION);
  tester.startElement(
      WindowedValue.of(
          KV.of(42, new SomeRestriction()),
          base,
          Collections.singletonList(w),
          PaneInfo.ON_TIME_AND_ONLY_FIRING));

  assertEquals(
      Arrays.asList(
          TimestampedValue.of("42a", base),
          TimestampedValue.of("42b", base),
          TimestampedValue.of("42c", base)),
      tester.peekOutputElementsInWindow(w));
}
 
源代码15 项目: beam   文件: IntervalWindow.java
public IntervalWindow(Instant start, ReadableDuration size) {
  this.start = start;
  this.end = start.plus(size);
}
 
源代码16 项目: beam   文件: StatefulDoFnRunnerTest.java
private void testOutput(
    boolean ordered,
    BiFunction<MyDoFn, OutputManager, DoFnRunner<KV<String, Integer>, Integer>> runnerFactory)
    throws Exception {

  timerInternals.advanceInputWatermark(new Instant(1L));

  MyDoFn fn = MyDoFn.create(ordered);
  StateTag<ValueState<Integer>> stateTag = StateTags.tagForSpec(MyDoFn.STATE_ID, fn.intState());

  List<KV<TupleTag<?>, WindowedValue<?>>> outputs = new ArrayList<>();
  OutputManager output = asOutputManager(outputs);
  DoFnRunner<KV<String, Integer>, Integer> runner = runnerFactory.apply(fn, output);

  Instant elementTime = new Instant(5);

  // write two elements, with descending timestamps
  runner.processElement(
      WindowedValue.of(KV.of("hello", 1), elementTime, WINDOW_1, PaneInfo.NO_FIRING));
  runner.processElement(
      WindowedValue.of(KV.of("hello", 2), elementTime.minus(1), WINDOW_1, PaneInfo.NO_FIRING));

  if (ordered) {
    // move forward in time so that the input might get flushed
    advanceInputWatermark(timerInternals, elementTime.plus(ALLOWED_LATENESS + 1), runner);
  }

  assertEquals(3, (int) stateInternals.state(windowNamespace(WINDOW_1), stateTag).read());
  assertEquals(2, outputs.size());
  if (ordered) {
    assertEquals(
        Arrays.asList(
            KV.of(
                outputTag,
                WindowedValue.of(2, elementTime.minus(1), WINDOW_1, PaneInfo.NO_FIRING)),
            KV.of(outputTag, WindowedValue.of(3, elementTime, WINDOW_1, PaneInfo.NO_FIRING))),
        outputs);
  } else {
    assertEquals(
        Arrays.asList(
            KV.of(outputTag, WindowedValue.of(1, elementTime, WINDOW_1, PaneInfo.NO_FIRING)),
            KV.of(
                outputTag,
                WindowedValue.of(3, elementTime.minus(1), WINDOW_1, PaneInfo.NO_FIRING))),
        outputs);
  }
  outputs.clear();

  // another window
  elementTime = elementTime.plus(WINDOW_SIZE);
  runner.processElement(
      WindowedValue.of(KV.of("hello", 1), elementTime, WINDOW_2, PaneInfo.NO_FIRING));

  runner.processElement(
      WindowedValue.of(KV.of("hello", 2), elementTime.minus(1), WINDOW_2, PaneInfo.NO_FIRING));

  runner.processElement(
      WindowedValue.of(KV.of("hello", 3), elementTime.minus(2), WINDOW_2, PaneInfo.NO_FIRING));

  if (ordered) {
    // move forward in time so that the input might get flushed
    advanceInputWatermark(timerInternals, elementTime.plus(ALLOWED_LATENESS + 1), runner);
  }

  assertEquals(6, (int) stateInternals.state(windowNamespace(WINDOW_2), stateTag).read());
  assertEquals(3, outputs.size());
  if (ordered) {
    assertEquals(
        Arrays.asList(
            KV.of(
                outputTag,
                WindowedValue.of(3, elementTime.minus(2), WINDOW_2, PaneInfo.NO_FIRING)),
            KV.of(
                outputTag,
                WindowedValue.of(5, elementTime.minus(1), WINDOW_2, PaneInfo.NO_FIRING)),
            KV.of(outputTag, WindowedValue.of(6, elementTime, WINDOW_2, PaneInfo.NO_FIRING))),
        outputs);
  } else {
    assertEquals(
        Arrays.asList(
            KV.of(outputTag, WindowedValue.of(1, elementTime, WINDOW_2, PaneInfo.NO_FIRING)),
            KV.of(
                outputTag,
                WindowedValue.of(3, elementTime.minus(1), WINDOW_2, PaneInfo.NO_FIRING)),
            KV.of(
                outputTag,
                WindowedValue.of(6, elementTime.minus(2), WINDOW_2, PaneInfo.NO_FIRING))),
        outputs);
  }
}
 
源代码17 项目: beam   文件: CreateStreamTest.java
@Test
public void testLateDataAccumulating() throws IOException {
  Instant instant = new Instant(0);
  CreateStream<Integer> source =
      CreateStream.of(VarIntCoder.of(), batchDuration())
          .emptyBatch()
          .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(6)))
          .nextBatch(
              TimestampedValue.of(1, instant),
              TimestampedValue.of(2, instant),
              TimestampedValue.of(3, instant))
          .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(20)))
          // These elements are late but within the allowed lateness
          .nextBatch(TimestampedValue.of(4, instant), TimestampedValue.of(5, instant))
          // These elements are droppably late
          .advanceNextBatchWatermarkToInfinity()
          .nextBatch(
              TimestampedValue.of(-1, instant),
              TimestampedValue.of(-2, instant),
              TimestampedValue.of(-3, instant));

  PCollection<Integer> windowed =
      p.apply(source)
          .apply(
              Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5)))
                  .triggering(
                      AfterWatermark.pastEndOfWindow()
                          .withEarlyFirings(
                              AfterProcessingTime.pastFirstElementInPane()
                                  .plusDelayOf(Duration.standardMinutes(2)))
                          .withLateFirings(AfterPane.elementCountAtLeast(1)))
                  .accumulatingFiredPanes()
                  .withAllowedLateness(
                      Duration.standardMinutes(5), Window.ClosingBehavior.FIRE_ALWAYS));
  PCollection<Integer> triggered =
      windowed
          .apply(WithKeys.of(1))
          .apply(GroupByKey.create())
          .apply(Values.create())
          .apply(Flatten.iterables());
  PCollection<Long> count =
      windowed.apply(Combine.globally(Count.<Integer>combineFn()).withoutDefaults());
  PCollection<Integer> sum = windowed.apply(Sum.integersGlobally().withoutDefaults());

  IntervalWindow window = new IntervalWindow(instant, instant.plus(Duration.standardMinutes(5L)));
  PAssert.that(triggered).inFinalPane(window).containsInAnyOrder(1, 2, 3, 4, 5);
  PAssert.that(triggered).inOnTimePane(window).containsInAnyOrder(1, 2, 3);
  PAssert.that(count)
      .inWindow(window)
      .satisfies(
          input -> {
            for (Long count1 : input) {
              assertThat(count1, allOf(greaterThanOrEqualTo(3L), lessThanOrEqualTo(5L)));
            }
            return null;
          });
  PAssert.that(sum)
      .inWindow(window)
      .satisfies(
          input -> {
            for (Integer sum1 : input) {
              assertThat(sum1, allOf(greaterThanOrEqualTo(6), lessThanOrEqualTo(15)));
            }
            return null;
          });

  p.run();
}
 
源代码18 项目: beam   文件: WaitTest.java
/**
 * Generates a {@link TestStream} of the given duration containing the values [0, numElements) and
 * the same number of random but monotonic watermark updates, with each element within
 * allowedLateness of the respective watermark update.
 *
 * <p>TODO: Consider moving this into TestStream if it's useful enough.
 */
private PCollection<Long> generateStreamWithBoundedDisorder(
    String name,
    Instant base,
    Duration totalDuration,
    int numElements,
    Duration allowedLateness) {
  TestStream.Builder<Long> stream = TestStream.create(VarLongCoder.of());

  // Generate numElements random watermark updates. After each one also generate an element within
  // allowedLateness of it.
  List<Instant> watermarks = Lists.newArrayList();
  for (int i = 0; i < numElements; ++i) {
    watermarks.add(base.plus(new Duration((long) (totalDuration.getMillis() * Math.random()))));
  }
  Collections.sort(watermarks);

  List<Event<Long>> events = Lists.newArrayList();
  for (int i = 0; i < numElements; ++i) {
    Instant processingTimestamp =
        base.plus((long) (1.0 * i * totalDuration.getMillis() / (numElements + 1)));
    Instant watermark = watermarks.get(i);
    Instant elementTimestamp =
        watermark.minus((long) (Math.random() * allowedLateness.getMillis()));
    events.add(new Event<>(processingTimestamp, watermark));
    events.add(new Event<>(processingTimestamp, TimestampedValue.of((long) i, elementTimestamp)));
  }

  Instant lastProcessingTime = base;
  for (Event<Long> event : events) {
    Duration processingTimeDelta = new Duration(lastProcessingTime, event.processingTime);
    if (processingTimeDelta.getMillis() > 0) {
      stream = stream.advanceProcessingTime(processingTimeDelta);
    }
    lastProcessingTime = event.processingTime;

    if (event.element != null) {
      stream = stream.addElements(event.element);
    } else {
      stream = stream.advanceWatermarkTo(event.watermarkUpdate);
    }
  }
  return p.apply(name, stream.advanceWatermarkToInfinity());
}
 
源代码19 项目: beam   文件: TestStreamTest.java
@Test
@Category({NeedsRunner.class, UsesTestStream.class})
public void testLateDataAccumulating() {
  Instant instant = new Instant(0);
  TestStream<Integer> source =
      TestStream.create(VarIntCoder.of())
          .addElements(
              TimestampedValue.of(1, instant),
              TimestampedValue.of(2, instant),
              TimestampedValue.of(3, instant))
          .advanceWatermarkTo(instant.plus(Duration.standardMinutes(6)))
          // These elements are late but within the allowed lateness
          .addElements(TimestampedValue.of(4, instant), TimestampedValue.of(5, instant))
          .advanceWatermarkTo(instant.plus(Duration.standardMinutes(20)))
          // These elements are droppably late
          .addElements(
              TimestampedValue.of(-1, instant),
              TimestampedValue.of(-2, instant),
              TimestampedValue.of(-3, instant))
          .advanceWatermarkToInfinity();

  PCollection<Integer> windowed =
      p.apply(source)
          .apply(
              Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5)))
                  .triggering(
                      AfterWatermark.pastEndOfWindow()
                          .withEarlyFirings(
                              AfterProcessingTime.pastFirstElementInPane()
                                  .plusDelayOf(Duration.standardMinutes(2)))
                          .withLateFirings(AfterPane.elementCountAtLeast(1)))
                  .accumulatingFiredPanes()
                  .withAllowedLateness(Duration.standardMinutes(5), ClosingBehavior.FIRE_ALWAYS));
  PCollection<Integer> triggered =
      windowed
          .apply(WithKeys.of(1))
          .apply(GroupByKey.create())
          .apply(Values.create())
          .apply(Flatten.iterables());
  PCollection<Long> count =
      windowed.apply(Combine.globally(Count.<Integer>combineFn()).withoutDefaults());
  PCollection<Integer> sum = windowed.apply(Sum.integersGlobally().withoutDefaults());

  IntervalWindow window = new IntervalWindow(instant, instant.plus(Duration.standardMinutes(5L)));
  PAssert.that(triggered).inFinalPane(window).containsInAnyOrder(1, 2, 3, 4, 5);
  PAssert.that(triggered).inOnTimePane(window).containsInAnyOrder(1, 2, 3);
  PAssert.that(count)
      .inWindow(window)
      .satisfies(
          input -> {
            for (Long count1 : input) {
              assertThat(count1, allOf(greaterThanOrEqualTo(3L), lessThanOrEqualTo(5L)));
            }
            return null;
          });
  PAssert.that(sum)
      .inWindow(window)
      .satisfies(
          input -> {
            for (Integer sum1 : input) {
              assertThat(sum1, allOf(greaterThanOrEqualTo(6), lessThanOrEqualTo(15)));
            }
            return null;
          });

  p.run();
}
 
源代码20 项目: data-timeseries-java   文件: GenerateSampleData.java
/**
 * Generate 5 timeseries values across a 10 min window TS1, TS2, TS3, TS4, TS5. TS1 / TS2 Will
 * contain a single value per 1 min and will increase by a value of 1 till min 5 and then decrease
 * by a value of 1 until 10 mins TS3 Will have missing values at 2, 3, 7 and 8 mins and will
 * decrease by a value of 1 till min 5 and then increase by a value of 1 until 10 mins TS4 Will
 * have missing values at 2, 3, 7 and 8 mins and will decrease by a value of 1 till min 5 and then
 * increase by a value of 1 until 10 mins TS5 Will have random values assigned to it throughout
 * the process this is the control time series
 */
public static List<KV<String, TSProto>> getTestData() {

  List<KV<String, TSProto>> ts = new ArrayList<KV<String, TSProto>>();

  String dateTime = "01/01/2016 00:00:00";
  DateTimeFormatter dtf = DateTimeFormat.forPattern("dd/MM/yyyy HH:mm:ss");
  Instant time = Instant.parse(dateTime, dtf);

  // KEY 001 & KEY 002
  String[] list = {TS1, TS2};

  for (String s : list) {
    GenerateSampleData.generateSequentialList(ts, time, s, 1d, 1d);
  }

  // KEY003
  List<KV<String, TSProto>> key003List = new ArrayList<KV<String, TSProto>>();

  GenerateSampleData.generateSequentialList(key003List, time, TS3, 10d, -1d);

  // Remove values
  key003List.remove(2); // Remove time value 02
  key003List.remove(2); // Remove time value 03
  key003List.remove(5); // Remove time value 07
  key003List.remove(5); // Remove time value 08

  ts.addAll(key003List);

  // KEY004
  List<KV<String, TSProto>> key004List = new ArrayList<KV<String, TSProto>>();

  GenerateSampleData.generateSequentialList(key004List, time, TS4, 10d, -1d);

  // Remove values
  key004List.remove(2); // Remove time value 02
  key004List.remove(2); // Remove time value 03
  key004List.remove(5); // Remove time value 07
  key004List.remove(5); // Remove time value 08


  ts.addAll(key004List);

  // KEY005
  Instant tsTime = new Instant(time);
  for (int i = 0; i < 10; i++) {

    ts.add(KV.of(TS5, TSProto.newBuilder().setAskPrice(Math.random()).setBidPrice(Math.random())
        .setKey(TS5).setIsLive(true).setTime(tsTime.getMillis()).build()));
    tsTime = tsTime.plus(Duration.standardMinutes(1));

  }

  return ts;

}