下面列出了org.joda.time.Instant#plus ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Return a bid window for {@code bid}. It should later be merged into the corresponding auction
* window. However, it is possible this bid is for an already expired auction, or for an auction
* which the system has not yet seen. So we give the bid a bit of wiggle room in its interval.
*/
public static AuctionOrBidWindow forBid(
long expectedAuctionDurationMs, Instant timestamp, Bid bid) {
// At this point we don't know which auctions are still valid, and the bid may
// be for an auction which won't start until some unknown time in the future
// (due to Generator.AUCTION_ID_LEAD in Generator.nextBid).
// A real system would atomically reconcile bids and auctions by a separate mechanism.
// If we give bids an unbounded window it is possible a bid for an auction which
// has already expired would cause the system watermark to stall, since that window
// would never be retired.
// Instead, we will just give the bid a finite window which expires at
// the upper bound of auctions assuming the auction starts at the same time as the bid,
// and assuming the system is running at its lowest event rate (as per interEventDelayUs).
return new AuctionOrBidWindow(
timestamp, timestamp.plus(expectedAuctionDurationMs * 2), bid.auction, false);
}
@Override
public IntervalWindow assignWindow(Instant timestamp) {
Instant start =
new Instant(
timestamp.getMillis()
- timestamp.plus(size).minus(offset).getMillis() % size.getMillis());
// The global window is inclusive of max timestamp, while interval window excludes its
// upper bound
Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp().plus(1);
// The end of the window is either start + size if that is within the allowable range, otherwise
// the end of the global window. Truncating the window drives many other
// areas of this system in the appropriate way automatically.
//
// Though it is curious that the very last representable fixed window is shorter than the rest,
// when we are processing data in the year 294247, we'll probably have technology that can
// account for this.
Instant end =
start.isAfter(endOfGlobalWindow.minus(size)) ? endOfGlobalWindow : start.plus(size);
return new IntervalWindow(start, end);
}
@Test
public void testSupportsWindowParameter() throws Exception {
Instant now = Instant.now();
try (DoFnTester<Integer, KV<Integer, BoundedWindow>> tester =
DoFnTester.of(new DoFnWithWindowParameter())) {
BoundedWindow firstWindow = new IntervalWindow(now, now.plus(Duration.standardMinutes(1)));
tester.processWindowedElement(1, now, firstWindow);
tester.processWindowedElement(2, now, firstWindow);
BoundedWindow secondWindow = new IntervalWindow(now, now.plus(Duration.standardMinutes(4)));
tester.processWindowedElement(3, now, secondWindow);
tester.finishBundle();
assertThat(
tester.peekOutputElementsInWindow(firstWindow),
containsInAnyOrder(
TimestampedValue.of(KV.of(1, firstWindow), now),
TimestampedValue.of(KV.of(2, firstWindow), now)));
assertThat(
tester.peekOutputElementsInWindow(secondWindow),
containsInAnyOrder(TimestampedValue.of(KV.of(3, secondWindow), now)));
}
}
@Test
public void testAdvanceWatermark() throws IOException, InterruptedException {
final Instant now = Instant.now();
final Instant nowPlusOne = now.plus(1L);
final TestUnboundedSource<String> source =
TestUnboundedSource.<String>createBuilder()
.setTimestamp(now)
.addElements("first")
.setTimestamp(nowPlusOne)
.addElements("second")
.advanceWatermarkTo(now)
.build();
final UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer =
createConsumer(source);
consumer.register(DEFAULT_SSP, NULL_STRING);
consumer.start();
assertEquals(
Arrays.asList(
createElementMessage(DEFAULT_SSP, offset(0), "first", now),
createElementMessage(DEFAULT_SSP, offset(1), "second", nowPlusOne),
createWatermarkMessage(DEFAULT_SSP, now)),
consumeUntilTimeoutOrWatermark(consumer, DEFAULT_SSP, DEFAULT_WATERMARK_TIMEOUT_MILLIS));
consumer.stop();
}
@Test
public void testExplodeWindowsManyWindowsMultipleWindowedValues() {
Instant now = Instant.now();
BoundedWindow centerWindow = new IntervalWindow(now.minus(1000L), now.plus(1000L));
BoundedWindow pastWindow = new IntervalWindow(now.minus(1500L), now.plus(500L));
BoundedWindow futureWindow = new IntervalWindow(now.minus(500L), now.plus(1500L));
BoundedWindow futureFutureWindow = new IntervalWindow(now, now.plus(2000L));
PaneInfo pane = PaneInfo.createPane(false, false, Timing.ON_TIME, 3L, 0L);
WindowedValue<String> value =
WindowedValue.of(
"foo",
now,
ImmutableList.of(pastWindow, centerWindow, futureWindow, futureFutureWindow),
pane);
assertThat(
value.explodeWindows(),
containsInAnyOrder(
WindowedValue.of("foo", now, futureFutureWindow, pane),
WindowedValue.of("foo", now, futureWindow, pane),
WindowedValue.of("foo", now, centerWindow, pane),
WindowedValue.of("foo", now, pastWindow, pane)));
assertThat(value.isSingleWindowedValue(), equalTo(false));
}
/**
* Setup timer for flush time @{code flush}. The time is adjusted to respect allowed lateness and
* window garbage collection time. Setup watermark hold for the flush time.
*
* <p>Note that this is equivalent to {@link org.apache.beam.sdk.state.Timer#withOutputTimestamp}
* and should be reworked to use that feature once that is stable.
*/
private void setupFlushTimerAndWatermarkHold(
StateNamespace namespace, BoundedWindow window, Instant flush) {
Instant flushWithLateness = flush.plus(windowingStrategy.getAllowedLateness());
Instant windowGcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
if (flushWithLateness.isAfter(windowGcTime)) {
flushWithLateness = windowGcTime;
}
WatermarkHoldState watermark = stepContext.stateInternals().state(namespace, watermarkHold);
stepContext
.timerInternals()
.setTimer(
namespace,
SORT_FLUSH_TIMER,
SORT_FLUSH_TIMER,
flushWithLateness,
flush,
TimeDomain.EVENT_TIME);
watermark.clear();
watermark.add(flush);
}
@Test
@Category({NeedsRunner.class, UsesStatefulParDo.class})
public void testSlowlyUpdatingSideInputsWindowed() {
Instant startAt = Instant.now().minus(Duration.standardMinutes(3));
Duration duration = Duration.standardSeconds(10);
Instant stopAt = startAt.plus(duration);
Duration interval1 = Duration.standardSeconds(1);
Duration interval2 = Duration.standardSeconds(1);
File f = null;
try {
f = File.createTempFile("testSlowlyUpdatingSIWindowed", "txt");
try (BufferedWriter fw = Files.newWriter(f, Charset.forName("UTF-8"))) {
fw.append("testdata");
}
} catch (IOException e) {
Assert.fail("failed to create temp file: " + e.toString());
throw new RuntimeException("Should never reach here");
}
PCollection<Long> result =
Snippets.PeriodicallyUpdatingSideInputs.main(
p, startAt, stopAt, interval1, interval2, f.getPath());
ArrayList<Long> expectedResults = new ArrayList<Long>();
expectedResults.add(0L);
for (Long i = startAt.getMillis(); i < stopAt.getMillis(); i = i + interval2.getMillis()) {
expectedResults.add(1L);
}
PAssert.that(result).containsInAnyOrder(expectedResults);
p.run().waitUntilFinish();
f.deleteOnExit();
}
@Test
@Category({
NeedsRunner.class,
UsesImpulse.class,
UsesStatefulParDo.class,
})
public void testOutputsProperElements() {
Instant instant = Instant.now();
Instant startTime = instant.minus(Duration.standardHours(100));
long duration = 500;
Duration interval = Duration.millis(250);
long intervalMillis = interval.getMillis();
Instant stopTime = startTime.plus(duration);
PCollection<KV<Instant, Instant>> result =
p.apply(
Create.<PeriodicSequence.SequenceDefinition>of(
new PeriodicSequence.SequenceDefinition(startTime, stopTime, interval)))
.apply(PeriodicSequence.create())
.apply(ParDo.of(new ExtractTsDoFn<>())); // used to validate timestamp
ArrayList<KV<Instant, Instant>> expectedResults =
new ArrayList<>((int) (duration / intervalMillis + 1));
for (long i = 0; i <= duration; i += intervalMillis) {
Instant el = startTime.plus(i);
expectedResults.add(KV.of(el, el));
}
PAssert.that(result).containsInAnyOrder(expectedResults);
p.run().waitUntilFinish();
}
@Test
public void testMultipleAdvanceWatermark() throws IOException, InterruptedException {
final Instant now = Instant.now();
final Instant nowPlusOne = now.plus(1L);
final Instant nowPlusTwo = now.plus(2L);
final TestUnboundedSource<String> source =
TestUnboundedSource.<String>createBuilder()
.setTimestamp(now)
.addElements("first")
.advanceWatermarkTo(now)
.noElements() // will output the first watermark
.setTimestamp(nowPlusOne)
.addElements("second")
.setTimestamp(nowPlusTwo)
.addElements("third")
.advanceWatermarkTo(nowPlusOne)
.build();
final UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer =
createConsumer(source);
consumer.register(DEFAULT_SSP, NULL_STRING);
consumer.start();
// consume to the first watermark
assertEquals(
Arrays.asList(
createElementMessage(DEFAULT_SSP, offset(0), "first", now),
createWatermarkMessage(DEFAULT_SSP, now)),
consumeUntilTimeoutOrWatermark(consumer, DEFAULT_SSP, DEFAULT_WATERMARK_TIMEOUT_MILLIS));
// consume to the second watermark
assertEquals(
Arrays.asList(
createElementMessage(DEFAULT_SSP, offset(1), "second", nowPlusOne),
createElementMessage(DEFAULT_SSP, offset(2), "third", nowPlusTwo),
createWatermarkMessage(DEFAULT_SSP, nowPlusOne)),
consumeUntilTimeoutOrWatermark(consumer, DEFAULT_SSP, DEFAULT_WATERMARK_TIMEOUT_MILLIS));
consumer.stop();
}
@Test
public void testTimeout() throws Exception {
final CountDownLatch advanceLatch = new CountDownLatch(1);
final Instant now = Instant.now();
final Instant nowPlusOne = now.plus(1);
final TestUnboundedSource<String> source =
TestUnboundedSource.<String>createBuilder()
.setTimestamp(now)
.addElements("before")
.addLatch(advanceLatch)
.setTimestamp(nowPlusOne)
.addElements("after")
.advanceWatermarkTo(nowPlusOne)
.build();
final UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer =
createConsumer(source);
consumer.register(DEFAULT_SSP, NULL_STRING);
consumer.start();
assertEquals(
Collections.singletonList(createElementMessage(DEFAULT_SSP, offset(0), "before", now)),
consumeUntilTimeoutOrWatermark(consumer, DEFAULT_SSP, DEFAULT_TIMEOUT_MILLIS));
advanceLatch.countDown();
assertEquals(
Arrays.asList(
createElementMessage(DEFAULT_SSP, offset(1), "after", nowPlusOne),
createWatermarkMessage(DEFAULT_SSP, nowPlusOne)),
consumeUntilTimeoutOrWatermark(consumer, DEFAULT_SSP, DEFAULT_TIMEOUT_MILLIS));
consumer.stop();
}
@Test
public void testSingleWindowedValueInFixedWindow() {
Instant now = Instant.now();
BoundedWindow w = new IntervalWindow(now, now.plus(1));
WindowedValue<Integer> value = WindowedValue.of(1, now, w, PaneInfo.NO_FIRING);
assertThat(value.isSingleWindowedValue(), equalTo(true));
assertThat(((WindowedValue.SingleWindowedValue) value).getWindow(), equalTo(w));
}
@Override
public void setForWindow(InputT input, BoundedWindow window) {
Instant gcTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy);
// make sure this fires after any window.maxTimestamp() timers
gcTime = gcTime.plus(GC_DELAY_MS);
timerInternals.setTimer(
StateNamespaces.window(windowCoder, window),
GC_TIMER_ID,
"",
gcTime,
window.maxTimestamp(),
TimeDomain.EVENT_TIME);
}
@Test
public void testTimerInternalsProcessingTimeSkew() {
Windmill.WorkItemCommitRequest.Builder outputBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
NameContext nameContext = NameContextsForTests.nameContextForTest();
DataflowOperationContext operationContext =
executionContext.createOperationContext(nameContext);
StreamingModeExecutionContext.StepContext stepContext =
executionContext.getStepContext(operationContext);
Windmill.WorkItem.Builder workItemBuilder =
Windmill.WorkItem.newBuilder().setKey(ByteString.EMPTY).setWorkToken(17L);
Windmill.Timer.Builder timerBuilder = workItemBuilder.getTimersBuilder().addTimersBuilder();
// Trigger a realtime timer that with clock skew but ensure that it would
// still fire.
Instant now = Instant.now();
long offsetMillis = 60 * 1000;
Instant timerTimestamp = now.plus(offsetMillis);
timerBuilder
.setTag(ByteString.copyFromUtf8("a"))
.setTimestamp(timerTimestamp.getMillis() * 1000)
.setType(Windmill.Timer.Type.REALTIME);
executionContext.start(
"key",
workItemBuilder.build(),
new Instant(1000), // input watermark
null, // output watermark
null, // synchronized processing time
stateReader,
stateFetcher,
outputBuilder);
TimerInternals timerInternals = stepContext.timerInternals();
assertTrue(timerTimestamp.isBefore(timerInternals.currentProcessingTime()));
}
@Test
public void testTrivialProcessFnPropagatesOutputWindowAndTimestamp() throws Exception {
// Tests that ProcessFn correctly propagates the window and timestamp of the element
// inside the KeyedWorkItem.
// The underlying DoFn is actually monolithic, so this doesn't test splitting.
DoFn<Integer, String> fn = new ToStringFn();
Instant base = Instant.now();
IntervalWindow w =
new IntervalWindow(
base.minus(Duration.standardMinutes(1)), base.plus(Duration.standardMinutes(1)));
ProcessFnTester<Integer, String, SomeRestriction, Void, Void> tester =
new ProcessFnTester<>(
base,
fn,
BigEndianIntegerCoder.of(),
SerializableCoder.of(SomeRestriction.class),
VoidCoder.of(),
MAX_OUTPUTS_PER_BUNDLE,
MAX_BUNDLE_DURATION);
tester.startElement(
WindowedValue.of(
KV.of(42, new SomeRestriction()),
base,
Collections.singletonList(w),
PaneInfo.ON_TIME_AND_ONLY_FIRING));
assertEquals(
Arrays.asList(
TimestampedValue.of("42a", base),
TimestampedValue.of("42b", base),
TimestampedValue.of("42c", base)),
tester.peekOutputElementsInWindow(w));
}
public IntervalWindow(Instant start, ReadableDuration size) {
this.start = start;
this.end = start.plus(size);
}
private void testOutput(
boolean ordered,
BiFunction<MyDoFn, OutputManager, DoFnRunner<KV<String, Integer>, Integer>> runnerFactory)
throws Exception {
timerInternals.advanceInputWatermark(new Instant(1L));
MyDoFn fn = MyDoFn.create(ordered);
StateTag<ValueState<Integer>> stateTag = StateTags.tagForSpec(MyDoFn.STATE_ID, fn.intState());
List<KV<TupleTag<?>, WindowedValue<?>>> outputs = new ArrayList<>();
OutputManager output = asOutputManager(outputs);
DoFnRunner<KV<String, Integer>, Integer> runner = runnerFactory.apply(fn, output);
Instant elementTime = new Instant(5);
// write two elements, with descending timestamps
runner.processElement(
WindowedValue.of(KV.of("hello", 1), elementTime, WINDOW_1, PaneInfo.NO_FIRING));
runner.processElement(
WindowedValue.of(KV.of("hello", 2), elementTime.minus(1), WINDOW_1, PaneInfo.NO_FIRING));
if (ordered) {
// move forward in time so that the input might get flushed
advanceInputWatermark(timerInternals, elementTime.plus(ALLOWED_LATENESS + 1), runner);
}
assertEquals(3, (int) stateInternals.state(windowNamespace(WINDOW_1), stateTag).read());
assertEquals(2, outputs.size());
if (ordered) {
assertEquals(
Arrays.asList(
KV.of(
outputTag,
WindowedValue.of(2, elementTime.minus(1), WINDOW_1, PaneInfo.NO_FIRING)),
KV.of(outputTag, WindowedValue.of(3, elementTime, WINDOW_1, PaneInfo.NO_FIRING))),
outputs);
} else {
assertEquals(
Arrays.asList(
KV.of(outputTag, WindowedValue.of(1, elementTime, WINDOW_1, PaneInfo.NO_FIRING)),
KV.of(
outputTag,
WindowedValue.of(3, elementTime.minus(1), WINDOW_1, PaneInfo.NO_FIRING))),
outputs);
}
outputs.clear();
// another window
elementTime = elementTime.plus(WINDOW_SIZE);
runner.processElement(
WindowedValue.of(KV.of("hello", 1), elementTime, WINDOW_2, PaneInfo.NO_FIRING));
runner.processElement(
WindowedValue.of(KV.of("hello", 2), elementTime.minus(1), WINDOW_2, PaneInfo.NO_FIRING));
runner.processElement(
WindowedValue.of(KV.of("hello", 3), elementTime.minus(2), WINDOW_2, PaneInfo.NO_FIRING));
if (ordered) {
// move forward in time so that the input might get flushed
advanceInputWatermark(timerInternals, elementTime.plus(ALLOWED_LATENESS + 1), runner);
}
assertEquals(6, (int) stateInternals.state(windowNamespace(WINDOW_2), stateTag).read());
assertEquals(3, outputs.size());
if (ordered) {
assertEquals(
Arrays.asList(
KV.of(
outputTag,
WindowedValue.of(3, elementTime.minus(2), WINDOW_2, PaneInfo.NO_FIRING)),
KV.of(
outputTag,
WindowedValue.of(5, elementTime.minus(1), WINDOW_2, PaneInfo.NO_FIRING)),
KV.of(outputTag, WindowedValue.of(6, elementTime, WINDOW_2, PaneInfo.NO_FIRING))),
outputs);
} else {
assertEquals(
Arrays.asList(
KV.of(outputTag, WindowedValue.of(1, elementTime, WINDOW_2, PaneInfo.NO_FIRING)),
KV.of(
outputTag,
WindowedValue.of(3, elementTime.minus(1), WINDOW_2, PaneInfo.NO_FIRING)),
KV.of(
outputTag,
WindowedValue.of(6, elementTime.minus(2), WINDOW_2, PaneInfo.NO_FIRING))),
outputs);
}
}
@Test
public void testLateDataAccumulating() throws IOException {
Instant instant = new Instant(0);
CreateStream<Integer> source =
CreateStream.of(VarIntCoder.of(), batchDuration())
.emptyBatch()
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(6)))
.nextBatch(
TimestampedValue.of(1, instant),
TimestampedValue.of(2, instant),
TimestampedValue.of(3, instant))
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(20)))
// These elements are late but within the allowed lateness
.nextBatch(TimestampedValue.of(4, instant), TimestampedValue.of(5, instant))
// These elements are droppably late
.advanceNextBatchWatermarkToInfinity()
.nextBatch(
TimestampedValue.of(-1, instant),
TimestampedValue.of(-2, instant),
TimestampedValue.of(-3, instant));
PCollection<Integer> windowed =
p.apply(source)
.apply(
Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5)))
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(2)))
.withLateFirings(AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes()
.withAllowedLateness(
Duration.standardMinutes(5), Window.ClosingBehavior.FIRE_ALWAYS));
PCollection<Integer> triggered =
windowed
.apply(WithKeys.of(1))
.apply(GroupByKey.create())
.apply(Values.create())
.apply(Flatten.iterables());
PCollection<Long> count =
windowed.apply(Combine.globally(Count.<Integer>combineFn()).withoutDefaults());
PCollection<Integer> sum = windowed.apply(Sum.integersGlobally().withoutDefaults());
IntervalWindow window = new IntervalWindow(instant, instant.plus(Duration.standardMinutes(5L)));
PAssert.that(triggered).inFinalPane(window).containsInAnyOrder(1, 2, 3, 4, 5);
PAssert.that(triggered).inOnTimePane(window).containsInAnyOrder(1, 2, 3);
PAssert.that(count)
.inWindow(window)
.satisfies(
input -> {
for (Long count1 : input) {
assertThat(count1, allOf(greaterThanOrEqualTo(3L), lessThanOrEqualTo(5L)));
}
return null;
});
PAssert.that(sum)
.inWindow(window)
.satisfies(
input -> {
for (Integer sum1 : input) {
assertThat(sum1, allOf(greaterThanOrEqualTo(6), lessThanOrEqualTo(15)));
}
return null;
});
p.run();
}
/**
* Generates a {@link TestStream} of the given duration containing the values [0, numElements) and
* the same number of random but monotonic watermark updates, with each element within
* allowedLateness of the respective watermark update.
*
* <p>TODO: Consider moving this into TestStream if it's useful enough.
*/
private PCollection<Long> generateStreamWithBoundedDisorder(
String name,
Instant base,
Duration totalDuration,
int numElements,
Duration allowedLateness) {
TestStream.Builder<Long> stream = TestStream.create(VarLongCoder.of());
// Generate numElements random watermark updates. After each one also generate an element within
// allowedLateness of it.
List<Instant> watermarks = Lists.newArrayList();
for (int i = 0; i < numElements; ++i) {
watermarks.add(base.plus(new Duration((long) (totalDuration.getMillis() * Math.random()))));
}
Collections.sort(watermarks);
List<Event<Long>> events = Lists.newArrayList();
for (int i = 0; i < numElements; ++i) {
Instant processingTimestamp =
base.plus((long) (1.0 * i * totalDuration.getMillis() / (numElements + 1)));
Instant watermark = watermarks.get(i);
Instant elementTimestamp =
watermark.minus((long) (Math.random() * allowedLateness.getMillis()));
events.add(new Event<>(processingTimestamp, watermark));
events.add(new Event<>(processingTimestamp, TimestampedValue.of((long) i, elementTimestamp)));
}
Instant lastProcessingTime = base;
for (Event<Long> event : events) {
Duration processingTimeDelta = new Duration(lastProcessingTime, event.processingTime);
if (processingTimeDelta.getMillis() > 0) {
stream = stream.advanceProcessingTime(processingTimeDelta);
}
lastProcessingTime = event.processingTime;
if (event.element != null) {
stream = stream.addElements(event.element);
} else {
stream = stream.advanceWatermarkTo(event.watermarkUpdate);
}
}
return p.apply(name, stream.advanceWatermarkToInfinity());
}
@Test
@Category({NeedsRunner.class, UsesTestStream.class})
public void testLateDataAccumulating() {
Instant instant = new Instant(0);
TestStream<Integer> source =
TestStream.create(VarIntCoder.of())
.addElements(
TimestampedValue.of(1, instant),
TimestampedValue.of(2, instant),
TimestampedValue.of(3, instant))
.advanceWatermarkTo(instant.plus(Duration.standardMinutes(6)))
// These elements are late but within the allowed lateness
.addElements(TimestampedValue.of(4, instant), TimestampedValue.of(5, instant))
.advanceWatermarkTo(instant.plus(Duration.standardMinutes(20)))
// These elements are droppably late
.addElements(
TimestampedValue.of(-1, instant),
TimestampedValue.of(-2, instant),
TimestampedValue.of(-3, instant))
.advanceWatermarkToInfinity();
PCollection<Integer> windowed =
p.apply(source)
.apply(
Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5)))
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(2)))
.withLateFirings(AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.standardMinutes(5), ClosingBehavior.FIRE_ALWAYS));
PCollection<Integer> triggered =
windowed
.apply(WithKeys.of(1))
.apply(GroupByKey.create())
.apply(Values.create())
.apply(Flatten.iterables());
PCollection<Long> count =
windowed.apply(Combine.globally(Count.<Integer>combineFn()).withoutDefaults());
PCollection<Integer> sum = windowed.apply(Sum.integersGlobally().withoutDefaults());
IntervalWindow window = new IntervalWindow(instant, instant.plus(Duration.standardMinutes(5L)));
PAssert.that(triggered).inFinalPane(window).containsInAnyOrder(1, 2, 3, 4, 5);
PAssert.that(triggered).inOnTimePane(window).containsInAnyOrder(1, 2, 3);
PAssert.that(count)
.inWindow(window)
.satisfies(
input -> {
for (Long count1 : input) {
assertThat(count1, allOf(greaterThanOrEqualTo(3L), lessThanOrEqualTo(5L)));
}
return null;
});
PAssert.that(sum)
.inWindow(window)
.satisfies(
input -> {
for (Integer sum1 : input) {
assertThat(sum1, allOf(greaterThanOrEqualTo(6), lessThanOrEqualTo(15)));
}
return null;
});
p.run();
}
/**
* Generate 5 timeseries values across a 10 min window TS1, TS2, TS3, TS4, TS5. TS1 / TS2 Will
* contain a single value per 1 min and will increase by a value of 1 till min 5 and then decrease
* by a value of 1 until 10 mins TS3 Will have missing values at 2, 3, 7 and 8 mins and will
* decrease by a value of 1 till min 5 and then increase by a value of 1 until 10 mins TS4 Will
* have missing values at 2, 3, 7 and 8 mins and will decrease by a value of 1 till min 5 and then
* increase by a value of 1 until 10 mins TS5 Will have random values assigned to it throughout
* the process this is the control time series
*/
public static List<KV<String, TSProto>> getTestData() {
List<KV<String, TSProto>> ts = new ArrayList<KV<String, TSProto>>();
String dateTime = "01/01/2016 00:00:00";
DateTimeFormatter dtf = DateTimeFormat.forPattern("dd/MM/yyyy HH:mm:ss");
Instant time = Instant.parse(dateTime, dtf);
// KEY 001 & KEY 002
String[] list = {TS1, TS2};
for (String s : list) {
GenerateSampleData.generateSequentialList(ts, time, s, 1d, 1d);
}
// KEY003
List<KV<String, TSProto>> key003List = new ArrayList<KV<String, TSProto>>();
GenerateSampleData.generateSequentialList(key003List, time, TS3, 10d, -1d);
// Remove values
key003List.remove(2); // Remove time value 02
key003List.remove(2); // Remove time value 03
key003List.remove(5); // Remove time value 07
key003List.remove(5); // Remove time value 08
ts.addAll(key003List);
// KEY004
List<KV<String, TSProto>> key004List = new ArrayList<KV<String, TSProto>>();
GenerateSampleData.generateSequentialList(key004List, time, TS4, 10d, -1d);
// Remove values
key004List.remove(2); // Remove time value 02
key004List.remove(2); // Remove time value 03
key004List.remove(5); // Remove time value 07
key004List.remove(5); // Remove time value 08
ts.addAll(key004List);
// KEY005
Instant tsTime = new Instant(time);
for (int i = 0; i < 10; i++) {
ts.add(KV.of(TS5, TSProto.newBuilder().setAskPrice(Math.random()).setBidPrice(Math.random())
.setKey(TS5).setIsLive(true).setTime(tsTime.getMillis()).build()));
tsTime = tsTime.plus(Duration.standardMinutes(1));
}
return ts;
}