org.joda.time.format.ISOPeriodFormat#org.joda.time.Instant源码实例Demo

下面列出了org.joda.time.format.ISOPeriodFormat#org.joda.time.Instant 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: beam   文件: FlinkStateInternals.java
public FlinkWatermarkHoldState(
    KeyedStateBackend<ByteBuffer> flinkStateBackend,
    MapStateDescriptor<String, Instant> watermarkHoldStateDescriptor,
    String stateId,
    StateNamespace namespace,
    TimestampCombiner timestampCombiner) {
  this.timestampCombiner = timestampCombiner;
  // Combines StateNamespace and stateId to generate a unique namespace for
  // watermarkHoldsState. We do not want to use Flink's namespacing to be
  // able to recover watermark holds efficiently during recovery.
  this.namespaceString = namespace.stringKey() + stateId;
  try {
    this.watermarkHoldsState =
        flinkStateBackend.getPartitionedState(
            VoidNamespace.INSTANCE,
            VoidNamespaceSerializer.INSTANCE,
            watermarkHoldStateDescriptor);
  } catch (Exception e) {
    throw new RuntimeException("Could not access state for watermark partition view");
  }
}
 
源代码2 项目: incubator-pinot   文件: ModelRetuneFlow.java
public DetectionConfigDTO maintain(DetectionConfigDTO config, Instant timestamp) {
  Preconditions.checkArgument(!Objects.isNull(config.getComponents()) && !config.getComponents().isEmpty(), "Components not initialized");
  if (isTunable(config)) {
    // if the pipeline is tunable, get the model evaluators
    Collection<? extends ModelEvaluator<? extends AbstractSpec>> modelEvaluators = getModelEvaluators(config);
    // check the status for model evaluators
    for (ModelEvaluator<? extends AbstractSpec> modelEvaluator : modelEvaluators) {
      // if returns bad model status, trigger model tuning
      if (modelEvaluator.evaluateModel(timestamp).getStatus().equals(ModelStatus.BAD)) {
        LOG.info("Status for detection pipeline {} is {}, re-tuning", config.getId(), ModelStatus.BAD.toString());
        detectionRetuneCounter.inc();
        DetectionConfigTuner detectionConfigTuner = new DetectionConfigTuner(config, provider);
        config = detectionConfigTuner.tune(timestamp.toDateTime().minusDays(DEFAULT_TUNING_WINDOW_DAYS).getMillis(),
            timestamp.getMillis());
        config.setLastTuningTimestamp(timestamp.getMillis());
        break;
      }
    }
  }
  return config;
}
 
源代码3 项目: flink-dataflow   文件: GroupAlsoByWindowTest.java
@Test
public void testAfterWatermarkProgram() throws Exception {
	WindowingStrategy strategy = fixedWindowWithAfterWatermarkTriggerStrategy;
	long initialTime = 0L;
	OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
			createTestingOperatorAndState(strategy, initialTime);
	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6),
			new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 1));
	expectedOutput.add(new Watermark(initialTime + 10000));

	expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11),
			new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 10000));
	expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
			new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
	expectedOutput.add(new Watermark(initialTime + 20000));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
	testHarness.close();
}
 
源代码4 项目: beam   文件: StatefulParDoP.java
private boolean flushTimers(long watermark) {
  if (timerInternals.currentInputWatermarkTime().isBefore(watermark)) {
    try {
      Instant watermarkInstant = new Instant(watermark);
      timerInternals.advanceInputWatermark(watermarkInstant);
      if (watermarkInstant.equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
        timerInternals.advanceProcessingTime(watermarkInstant);
        timerInternals.advanceSynchronizedProcessingTime(watermarkInstant);
      }
      fireEligibleTimers(timerInternals);
    } catch (Exception e) {
      throw new RuntimeException("Failed advancing processing time", e);
    }
  }
  return outputManager.tryFlush();
}
 
@Test
public void processElementSideInputReadyAllWindows() {
  when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
      .thenReturn(true);

  ImmutableList<PCollectionView<?>> views = ImmutableList.of(singletonView);
  SimplePushbackSideInputDoFnRunner<Integer, Integer> runner = createRunner(views);

  WindowedValue<Integer> multiWindow =
      WindowedValue.of(
          2,
          new Instant(-2),
          ImmutableList.of(
              new IntervalWindow(new Instant(-500L), new Instant(0L)),
              new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
              GlobalWindow.INSTANCE),
          PaneInfo.ON_TIME_AND_ONLY_FIRING);
  Iterable<WindowedValue<Integer>> multiWindowPushback =
      runner.processElementInReadyWindows(multiWindow);
  assertThat(multiWindowPushback, emptyIterable());
  assertThat(
      underlying.inputElems,
      containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray()));
}
 
源代码6 项目: beam   文件: FixedWindowsTest.java
@Test
public void testDefaultWindowMappingFn() {
  PartitioningWindowFn<?, ?> windowFn = FixedWindows.of(Duration.standardMinutes(20L));
  WindowMappingFn<?> mapping = windowFn.getDefaultWindowMappingFn();

  assertThat(
      mapping.getSideInputWindow(
          new BoundedWindow() {
            @Override
            public Instant maxTimestamp() {
              return new Instant(100L);
            }
          }),
      equalTo(
          new IntervalWindow(
              new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(20L)))));
  assertThat(mapping.maximumLookback(), equalTo(Duration.ZERO));
}
 
源代码7 项目: beam   文件: PaneExtractorsTest.java
@Test
public void finalPane() {
  SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, Iterable<Integer>> extractor =
      PaneExtractors.finalPane();
  Iterable<ValueInSingleWindow<Integer>> onlyOnTime =
      ImmutableList.of(
          ValueInSingleWindow.of(
              8,
              new Instant(0L),
              GlobalWindow.INSTANCE,
              PaneInfo.createPane(false, true, Timing.LATE, 2L, 1L)),
          ValueInSingleWindow.of(
              4,
              new Instant(0L),
              GlobalWindow.INSTANCE,
              PaneInfo.createPane(false, false, Timing.ON_TIME, 1L, 0L)),
          ValueInSingleWindow.of(
              1,
              new Instant(0L),
              GlobalWindow.INSTANCE,
              PaneInfo.createPane(true, false, Timing.EARLY)));

  assertThat(extractor.apply(onlyOnTime), containsInAnyOrder(8));
}
 
源代码8 项目: beam   文件: DoFnTesterTest.java
@Test
public void peekValuesInWindow() throws Exception {
  try (DoFnTester<Long, String> tester = DoFnTester.of(new CounterDoFn())) {
    tester.startBundle();
    tester.processElement(1L);
    tester.processElement(2L);
    tester.finishBundle();

    assertThat(
        tester.peekOutputElementsInWindow(GlobalWindow.INSTANCE),
        containsInAnyOrder(
            TimestampedValue.of("1", new Instant(1000L)),
            TimestampedValue.of("2", new Instant(2000L))));
    assertThat(
        tester.peekOutputElementsInWindow(new IntervalWindow(new Instant(0L), new Instant(10L))),
        Matchers.emptyIterable());
  }
}
 
源代码9 项目: beam   文件: InstantCoder.java
@Override
public Instant decode(InputStream inStream) throws CoderException, IOException {
  long shiftedMillis;
  try {
    shiftedMillis = new DataInputStream(inStream).readLong();
  } catch (EOFException | UTFDataFormatException exn) {
    // These exceptions correspond to decoding problems, so change
    // what kind of exception they're branded as.
    throw new CoderException(exn);
  }

  // Produces an {@link Instant} from a {@code long} representing its millis-since-epoch,
  // but shifted so that the byte representation of negative values are lexicographically
  // ordered before the byte representation of positive values.
  //
  // This deliberately utilizes the well-defined overflow for {@code long} values.
  // See http://docs.oracle.com/javase/specs/jls/se7/html/jls-15.html#jls-15.18.2
  return new Instant(shiftedMillis + Long.MIN_VALUE);
}
 
源代码10 项目: beam   文件: WatermarkManager.java
private synchronized Map<StructuralKey<?>, List<TimerData>> extractFiredDomainTimers(
    TimeDomain domain, Instant firingTime) {
  Map<StructuralKey<?>, List<TimerData>> firedTimers;
  switch (domain) {
    case PROCESSING_TIME:
      firedTimers = extractFiredTimers(firingTime, processingTimers);
      break;
    case SYNCHRONIZED_PROCESSING_TIME:
      firedTimers =
          extractFiredTimers(
              INSTANT_ORDERING.min(firingTime, earliestHold.get()),
              synchronizedProcessingTimers);
      break;
    default:
      throw new IllegalArgumentException(
          "Called getFiredTimers on a Synchronized Processing Time watermark"
              + " and gave a non-processing time domain "
              + domain);
  }
  for (Map.Entry<StructuralKey<?>, ? extends Collection<TimerData>> firedTimer :
      firedTimers.entrySet()) {
    pendingTimers.addAll(firedTimer.getValue());
  }
  return firedTimers;
}
 
源代码11 项目: beam   文件: PaneExtractorsTest.java
@Test
public void nonLatePanesSingleEarly() {
  SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, Iterable<Integer>> extractor =
      PaneExtractors.nonLatePanes();
  Iterable<ValueInSingleWindow<Integer>> onlyOnTime =
      ImmutableList.of(
          ValueInSingleWindow.of(
              8,
              new Instant(0L),
              GlobalWindow.INSTANCE,
              PaneInfo.createPane(true, false, Timing.EARLY)),
          ValueInSingleWindow.of(
              4,
              new Instant(0L),
              GlobalWindow.INSTANCE,
              PaneInfo.createPane(true, false, Timing.EARLY)));

  assertThat(extractor.apply(onlyOnTime), containsInAnyOrder(4, 8));
}
 
@Override
public void emit(final WindowedValue<KV<K, Iterable<InputT>>> output) {

  // The watermark advances only in ON_TIME
  if (output.getPane().getTiming().equals(PaneInfo.Timing.ON_TIME)) {
    final K key = output.getValue().getKey();
    final InMemoryTimerInternals timerInternals = (InMemoryTimerInternals)
      inMemoryTimerInternalsFactory.timerInternalsForKey(key);
    keyAndWatermarkHoldMap.put(key,
      // adds the output timestamp to the watermark hold of each key
      // +1 to the output timestamp because if the window is [0-5000), the timestamp is 4999
      new Watermark(output.getTimestamp().getMillis() + 1));
    timerInternals.advanceOutputWatermark(new Instant(output.getTimestamp().getMillis() + 1));
  }
  outputCollector.emit(output);
}
 
@ProcessElement
public void processElement(ProcessContext c) {
	WebresourceSocialCount sc = c.element();
	
	Instant countTime = new Instant(sc.countTime);
	
	TableRow row = new TableRow()
		.set("WebResourceHash", sc.webResourceHash)
		.set("WrPublicationDateId", sc.wrPublicationDateId)
		.set("CountTime", countTime.toString())
		.set("DocumentCollectionId", sc.documentCollectionId)
		.set("CollectionItemId", sc.collectionItemId)
		.set("FbCount", sc.fbCount)
		.set("TwCount", sc.twCount);

	c.output(row);

}
 
源代码14 项目: beam   文件: FnApiDoFnRunner.java
@Override
public void set(Instant absoluteTime) {
  // Verifies that the time domain of this timer is acceptable for absolute timers.
  if (!TimeDomain.EVENT_TIME.equals(timeDomain)) {
    throw new IllegalArgumentException(
        "Can only set relative timers in processing time domain. Use #setRelative()");
  }

  // Ensures that the target time is reasonable. For event time timers this means that the time
  // should be prior to window GC time.
  if (TimeDomain.EVENT_TIME.equals(timeDomain)) {
    Instant windowExpiry = LateDataUtils.garbageCollectionTime(currentWindow, allowedLateness);
    checkArgument(
        !absoluteTime.isAfter(windowExpiry),
        "Attempted to set event time timer for %s but that is after"
            + " the expiration of window %s",
        absoluteTime,
        windowExpiry);
  }

  output(absoluteTime);
}
 
@Override
public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<K, V> record) {
  Instant ts = timestampFunction.apply(record);
  if (ts.isAfter(maxEventTimestamp)) {
    maxEventTimestamp = ts;
  }
  return ts;
}
 
源代码16 项目: beam   文件: TimestampedValueTest.java
@Test
public void testValues() {
  Instant now = Instant.now();
  TimestampedValue<String> tsv = TimestampedValue.of("foobar", now);

  assertEquals(now, tsv.getTimestamp());
  assertEquals("foobar", tsv.getValue());
}
 
源代码17 项目: beam   文件: AutoValueSchemaTest.java
@SchemaCreate
static SimpleAutoValueWithStaticFactory create(
    String str,
    byte aByte,
    short aShort,
    int anInt,
    long aLong,
    boolean aBoolean,
    DateTime dateTime,
    byte[] bytes,
    ByteBuffer byteBuffer,
    Instant instant,
    BigDecimal bigDecimal,
    StringBuilder stringBuilder) {
  return new AutoValue_AutoValueSchemaTest_SimpleAutoValueWithStaticFactory(
      str,
      aByte,
      aShort,
      anInt,
      aLong,
      aBoolean,
      dateTime,
      bytes,
      byteBuffer,
      instant,
      bigDecimal,
      stringBuilder);
}
 
public TimeStampGenerator() {
    this.eventTime = new Instant(0);
    this.lastGap = new Instant(0);
    this.outOfOrderProbability = 0;
    this.minLateness = 0;
    this.maxLateness = 0;
    this.sessionPeriod = 0;
    this.minGap = 0;
    this.maxGap = 0;
}
 
@Override
public void onElement(OnElementContext c) throws Exception {
  GroupingState<Instant, Instant> delayUntilState = c.state().access(DELAYED_UNTIL_TAG);
  Instant oldDelayUntil = delayUntilState.read();

  // Since processing time can only advance, resulting in target wake-up times we would
  // ignore anyhow, we don't bother with it if it is already set.
  if (oldDelayUntil != null) {
    return;
  }

  Instant targetTimestamp = getTargetTimestamp(c);
  delayUntilState.add(targetTimestamp);
  c.setTimer(targetTimestamp, timeDomain);
}
 
源代码20 项目: beam   文件: SideInputHandlerTest.java
@Test
public void testIsReady() {
  SideInputHandler sideInputHandler =
      new SideInputHandler(
          ImmutableList.of(view1, view2), InMemoryStateInternals.<Void>forKey(null));

  IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(WINDOW_MSECS_1));

  IntervalWindow secondWindow = new IntervalWindow(new Instant(0), new Instant(WINDOW_MSECS_2));

  // side input should not yet be ready
  assertFalse(sideInputHandler.isReady(view1, firstWindow));

  // add a value for view1
  sideInputHandler.addSideInputValue(
      view1,
      valuesInWindow(
          materializeValuesFor(view1.getPipeline().getOptions(), View.asIterable(), "Hello"),
          new Instant(0),
          firstWindow));

  // now side input should be ready
  assertTrue(sideInputHandler.isReady(view1, firstWindow));

  // second window input should still not be ready
  assertFalse(sideInputHandler.isReady(view1, secondWindow));
}
 
源代码21 项目: beam   文件: FlattenEvaluatorFactoryTest.java
@Test
public void testFlattenInMemoryEvaluatorWithEmptyPCollectionList() throws Exception {
  PCollectionList<Integer> list = PCollectionList.empty(p);

  PCollection<Integer> flattened = list.apply(Flatten.pCollections());
  flattened.setCoder(VarIntCoder.of());

  EvaluationContext evaluationContext = mock(EvaluationContext.class);
  when(evaluationContext.createBundle(flattened))
      .thenReturn(bundleFactory.createBundle(flattened));

  FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory(evaluationContext);
  AppliedPTransform<?, ?, ?> flattendProducer = DirectGraphs.getProducer(flattened);
  TransformEvaluator<Integer> emptyEvaluator =
      factory.forApplication(
          flattendProducer,
          bundleFactory.createRootBundle().commit(BoundedWindow.TIMESTAMP_MAX_VALUE));

  TransformResult<Integer> leftSideResult = emptyEvaluator.finishBundle();

  CommittedBundle<?> outputBundle =
      Iterables.getOnlyElement(leftSideResult.getOutputBundles()).commit(Instant.now());
  assertThat(outputBundle.getElements(), emptyIterable());
  assertThat(
      leftSideResult.getTransform(),
      Matchers.<AppliedPTransform<?, ?, ?>>equalTo(flattendProducer));
}
 
源代码22 项目: beam   文件: DisplayDataTest.java
@Test
public void testStringFormatting() throws IOException {
  final Instant now = Instant.now();
  final Duration oneHour = Duration.standardHours(1);

  HasDisplayData component =
      new HasDisplayData() {
        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
          builder
              .add(DisplayData.item("string", "foobar"))
              .add(DisplayData.item("integer", 123))
              .add(DisplayData.item("float", 2.34))
              .add(DisplayData.item("boolean", true))
              .add(DisplayData.item("java_class", DisplayDataTest.class))
              .add(DisplayData.item("timestamp", now))
              .add(DisplayData.item("duration", oneHour));
        }
      };
  DisplayData data = DisplayData.from(component);

  assertThat(data, hasDisplayItem("string", "foobar"));
  assertThat(data, hasDisplayItem("integer", 123));
  assertThat(data, hasDisplayItem("float", 2.34));
  assertThat(data, hasDisplayItem("boolean", true));
  assertThat(data, hasDisplayItem("java_class", DisplayDataTest.class));
  assertThat(data, hasDisplayItem("timestamp", now));
  assertThat(data, hasDisplayItem("duration", oneHour));
}
 
源代码23 项目: beam   文件: FixedWindowsTest.java
@Test
public void testFixedOffsetWindow() throws Exception {
  Map<IntervalWindow, Set<String>> expected = new HashMap<>();
  expected.put(new IntervalWindow(new Instant(-5), new Instant(5)), set(1, 2));
  expected.put(new IntervalWindow(new Instant(5), new Instant(15)), set(5, 9, 10, 11));
  expected.put(new IntervalWindow(new Instant(95), new Instant(105)), set(100));
  assertEquals(
      expected,
      runWindowFn(
          FixedWindows.of(new Duration(10)).withOffset(new Duration(5)),
          Arrays.asList(1L, 2L, 5L, 9L, 10L, 11L, 100L)));
}
 
源代码24 项目: beam   文件: DoFnInvokersTest.java
@Test
public void testOnTimerWithWindow() throws Exception {
  final String timerId = "my-timer-id";
  final IntervalWindow testWindow = new IntervalWindow(new Instant(0), new Instant(15));
  when(mockArgumentProvider.window()).thenReturn(testWindow);

  class SimpleTimerDoFn extends DoFn<String, String> {

    public IntervalWindow window = null;

    @TimerId(timerId)
    private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

    @ProcessElement
    public void process(ProcessContext c) {}

    @OnTimer(timerId)
    public void onMyTimer(IntervalWindow w) {
      window = w;
    }
  }

  SimpleTimerDoFn fn = new SimpleTimerDoFn();

  DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
  invoker.invokeOnTimer(TimerDeclaration.PREFIX + timerId, "", mockArgumentProvider);
  assertThat(fn.window, equalTo(testWindow));
}
 
源代码25 项目: beam   文件: AfterProcessingTimeStateMachineTest.java
/**
 * Tests that when windows merge, if the trigger is waiting for "N millis after the first element"
 * that it is relative to the earlier of the two merged windows.
 */
@Test
public void testClear() throws Exception {
  SimpleTriggerStateMachineTester<IntervalWindow> tester =
      TriggerStateMachineTester.forTrigger(
          AfterProcessingTimeStateMachine.pastFirstElementInPane()
              .plusDelayOf(Duration.millis(5)),
          FixedWindows.of(Duration.millis(10)));

  tester.injectElements(1, 2, 3);
  IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
  tester.clearState(window);
  tester.assertCleared(window);
}
 
源代码26 项目: beam   文件: WatermarkManager.java
/**
 * Returns the {@link WatermarkUpdate} based on the former and current {@link Instant
 * timestamps}.
 */
public static WatermarkUpdate fromTimestamps(Instant oldTime, Instant currentTime) {
  if (currentTime.isAfter(oldTime)) {
    return ADVANCED;
  }
  return NO_CHANGE;
}
 
源代码27 项目: beam   文件: ReduceFnRunnerTest.java
/**
 * Tests that when a processing time timer comes in after a window is expired but in the same
 * bundle it does not cause a spurious output.
 */
@Test
public void testCombiningAccumulatingProcessingTime() throws Exception {
  WindowingStrategy<?, IntervalWindow> strategy =
      WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))
          .withTimestampCombiner(TimestampCombiner.EARLIEST)
          .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
          .withAllowedLateness(Duration.ZERO)
          .withTrigger(
              Repeatedly.forever(
                  AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10))));

  ReduceFnTester<Integer, Integer, IntervalWindow> tester =
      ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());

  tester.advanceProcessingTime(new Instant(5000));
  injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
  injectElement(tester, 5);

  tester.advanceInputWatermarkNoTimers(new Instant(100));
  tester.advanceProcessingTimeNoTimers(new Instant(5010));

  // Fires the GC/EOW timer at the same time as the processing time timer.
  tester.fireTimers(
      new IntervalWindow(new Instant(0), new Instant(100)),
      TimestampedValue.of(TimeDomain.EVENT_TIME, new Instant(100)),
      TimestampedValue.of(TimeDomain.PROCESSING_TIME, new Instant(5010)));

  assertThat(
      tester.extractOutput(),
      contains(
          isSingleWindowedValue(
              equalTo(7), 2, 0, 100, PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))));
}
 
源代码28 项目: beam   文件: GroupAlsoByWindowEvaluatorFactory.java
@Override
public <AdditionalOutputT> void outputWindowedValue(
    TupleTag<AdditionalOutputT> tag,
    AdditionalOutputT output,
    Instant timestamp,
    Collection<? extends BoundedWindow> windows,
    PaneInfo pane) {
  throw new UnsupportedOperationException(
      String.format(
          "%s should not use tagged outputs", DirectGroupAlsoByWindow.class.getSimpleName()));
}
 
源代码29 项目: beam   文件: LatestTest.java
@Test
@Category(NeedsRunner.class)
public void testGloballyEventTimestamp() {
  PCollection<String> output =
      p.apply(
              Create.timestamped(
                  TimestampedValue.of("foo", new Instant(100)),
                  TimestampedValue.of("bar", new Instant(300)),
                  TimestampedValue.of("baz", new Instant(200))))
          .apply(Latest.globally());

  PAssert.that(output).containsInAnyOrder("bar");
  p.run();
}
 
源代码30 项目: beam   文件: SessionsTest.java
@Test
public void testConsecutive() throws Exception {
  Map<IntervalWindow, Set<String>> expected = new HashMap<>();
  expected.put(new IntervalWindow(new Instant(1), new Instant(19)), set(1, 2, 5, 9));
  expected.put(new IntervalWindow(new Instant(100), new Instant(111)), set(100, 101));
  assertEquals(
      expected,
      runWindowFn(
          Sessions.withGapDuration(new Duration(10)), Arrays.asList(1L, 2L, 5L, 9L, 100L, 101L)));
}