下面列出了org.mockito.verification.VerificationMode#org.apache.flink.streaming.api.windowing.windows.TimeWindow 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<Long, String>> input, Collector<String> out) throws Exception {
//获取分组字段
String usergender = tuple.getField(0).toString();
Iterator<Tuple2<Long, String>> it = input.iterator();
//计数器
Long count = 0L;
//bigcount = 0L;
while (it.hasNext()) {
Tuple2<Long, String> next = it.next();
count++;
}
System.err.println(Thread.currentThread().getId()+"【男女比例】window触发了=数据条数:"+count);
//组装结果
String res = "{\"usergender\":\""+usergender+"\",\"count\":\""+count+"\"}";
System.err.println("【男女比例】窗口数据 : "+ res);
out.collect(res);
}
@Override
public TriggerResult onElement(WordEvent element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
log.info("======onElement====window start = {}, window end = {}", window.getStart(), window.getEnd());
return TriggerResult.CONTINUE;
// ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
//
// timestamp = ctx.getCurrentProcessingTime();
//
// if (fireTimestamp.get() == null) {
// long start = timestamp - (timestamp % interval);
// long nextFireTimestamp = start + interval;
//
// ctx.registerProcessingTimeTimer(nextFireTimestamp);
//
// fireTimestamp.add(nextFireTimestamp);
// return TriggerResult.CONTINUE;
// }
// return TriggerResult.CONTINUE;
}
@Test
public void testMergeConsecutiveWindows() {
MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
ProcessingTimeSessionWindows assigner = ProcessingTimeSessionWindows.withGap(Time.milliseconds(5000));
assigner.mergeWindows(
Lists.newArrayList(
new TimeWindow(0, 1),
new TimeWindow(1, 2),
new TimeWindow(2, 3),
new TimeWindow(4, 5),
new TimeWindow(5, 6)),
callback);
verify(callback, times(1)).merge(
(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(0, 1), new TimeWindow(1, 2), new TimeWindow(2, 3))),
eq(new TimeWindow(0, 3)));
verify(callback, times(1)).merge(
(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(4, 5), new TimeWindow(5, 6))),
eq(new TimeWindow(4, 6)));
verify(callback, times(2)).merge(anyCollection(), Matchers.anyObject());
}
@Override
public void apply(Tuple key, TimeWindow window,
Iterable<Tuple3<Long, Long, Boolean>> values,
Collector<Tuple2<Integer, Long>> out) throws Exception {
int candidates = 0;
int edges = 0;
for (Tuple3<Long, Long, Boolean> t: values) {
if (t.f2) { // candidate
candidates++;
}
else {
edges++;
}
}
if (edges > 0) {
out.collect(new Tuple2<Integer, Long>(candidates, window.maxTimestamp()));
}
}
@Test
public void testPersistOnlyIfHaveUpdates() throws Exception {
@SuppressWarnings("unchecked")
ListState<Tuple2<TimeWindow, TimeWindow>> mockState = mock(ListState.class);
when(mockState.get()).thenReturn(Lists.newArrayList(
new Tuple2<>(new TimeWindow(17, 42), new TimeWindow(42, 17)),
new Tuple2<>(new TimeWindow(1, 2), new TimeWindow(3, 4))
));
MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState);
assertEquals(new TimeWindow(42, 17), windowSet.getStateWindow(new TimeWindow(17, 42)));
assertEquals(new TimeWindow(3, 4), windowSet.getStateWindow(new TimeWindow(1, 2)));
windowSet.persist();
verify(mockState, times(0)).add(Matchers.<Tuple2<TimeWindow, TimeWindow>>anyObject());
}
/**
* Test merging of a large new window that covers one existing windows.
*/
@Test
public void testMergeLargeWindowCoveringSingleWindow() throws Exception {
@SuppressWarnings("unchecked")
ListState<Tuple2<TimeWindow, TimeWindow>> mockState = mock(ListState.class);
MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState);
TestingMergeFunction mergeFunction = new TestingMergeFunction();
// add an initial small window
mergeFunction.reset();
assertEquals(new TimeWindow(1, 2), windowSet.addWindow(new TimeWindow(1, 2), mergeFunction));
assertFalse(mergeFunction.hasMerged());
assertEquals(new TimeWindow(1, 2), windowSet.getStateWindow(new TimeWindow(1, 2)));
// add a new window that completely covers the existing window
mergeFunction.reset();
assertEquals(new TimeWindow(0, 3), windowSet.addWindow(new TimeWindow(0, 3), mergeFunction));
assertTrue(mergeFunction.hasMerged());
assertEquals(new TimeWindow(1, 2), windowSet.getStateWindow(new TimeWindow(0, 3)));
}
@Test
public void testAssignerWithMultipleWindows() throws Exception {
WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
OneInputStreamOperatorTestHarness<Integer, Void> testHarness =
createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
.thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2)));
shouldFireOnElement(mockTrigger);
testHarness.processElement(new StreamRecord<>(0, 0L));
verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
verify(mockWindowFunction, times(1)).process(eq(0), eq((new TimeWindow(0, 2))), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
}
@Test
public void testMergeCoveringWindow() {
MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
ProcessingTimeSessionWindows assigner = ProcessingTimeSessionWindows.withGap(Time.milliseconds(5000));
assigner.mergeWindows(
Lists.newArrayList(
new TimeWindow(1, 1),
new TimeWindow(0, 2),
new TimeWindow(4, 7),
new TimeWindow(5, 6)),
callback);
verify(callback, times(1)).merge(
(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(1, 1), new TimeWindow(0, 2))),
eq(new TimeWindow(0, 2)));
verify(callback, times(1)).merge(
(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(5, 6), new TimeWindow(4, 7))),
eq(new TimeWindow(4, 7)));
verify(callback, times(2)).merge(anyCollection(), Matchers.anyObject());
}
static WindowedStream<Event, Integer, TimeWindow> applyTumblingWindows(
KeyedStream<Event, Integer> keyedStream, ParameterTool pt) {
long eventTimeProgressPerEvent = pt.getLong(
SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS.key(),
SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS.defaultValue());
return keyedStream.timeWindow(
Time.milliseconds(
pt.getLong(
TUMBLING_WINDOW_OPERATOR_NUM_EVENTS.key(),
TUMBLING_WINDOW_OPERATOR_NUM_EVENTS.defaultValue()
) * eventTimeProgressPerEvent
)
);
}
@Test
public void testRestoreFromState() throws Exception {
@SuppressWarnings("unchecked")
ListState<Tuple2<TimeWindow, TimeWindow>> mockState = mock(ListState.class);
when(mockState.get()).thenReturn(Lists.newArrayList(
new Tuple2<>(new TimeWindow(17, 42), new TimeWindow(42, 17)),
new Tuple2<>(new TimeWindow(1, 2), new TimeWindow(3, 4))
));
MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState);
assertEquals(new TimeWindow(42, 17), windowSet.getStateWindow(new TimeWindow(17, 42)));
assertEquals(new TimeWindow(3, 4), windowSet.getStateWindow(new TimeWindow(1, 2)));
}
@Override
@SuppressWarnings("unchecked")
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext ctx) {
if (element instanceof Tuple2) {
Tuple2<String, Integer> t2 = (Tuple2<String, Integer>) element;
if (t2.f1 == 33) {
return Collections.singletonList(new TimeWindow(timestamp, timestamp));
}
}
return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
}
@Test
public void testOnElementPurgeDoesNotCleanupMergingSet() throws Exception {
MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
.thenReturn(Arrays.asList(new TimeWindow(0, 2)));
assertEquals(0, testHarness.getOutput().size());
assertEquals(0, testHarness.numKeyedStateEntries());
doAnswer(new Answer<TriggerResult>() {
@Override
public TriggerResult answer(InvocationOnMock invocation) throws Exception {
return TriggerResult.PURGE;
}
}).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
testHarness.processElement(new StreamRecord<>(0, 0L));
assertEquals(1, testHarness.numKeyedStateEntries()); // the merging window set
assertEquals(1, testHarness.numEventTimeTimers()); // one cleanup timer
assertEquals(0, testHarness.getOutput().size());
}
@Test
public void testProperties() {
TumblingEventTimeWindows assigner = TumblingEventTimeWindows.of(Time.seconds(5), Time.milliseconds(100));
assertTrue(assigner.isEventTime());
assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(EventTimeTrigger.class));
}
private void testMergingWindowSetClearedAtGarbageCollection(TimeDomainAdaptor timeAdaptor) throws Exception {
WindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
testHarness.open();
when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
.thenReturn(Arrays.asList(new TimeWindow(0, 20)));
assertEquals(0, testHarness.getOutput().size());
assertEquals(0, testHarness.numKeyedStateEntries());
testHarness.processElement(new StreamRecord<>(0, 0L));
assertEquals(2, testHarness.numKeyedStateEntries()); // window contents plus merging window set
assertEquals(1, timeAdaptor.numTimers(testHarness)); // gc timers
timeAdaptor.advanceTime(testHarness, 19 + 20); // 19 is maxTime of the window
assertEquals(0, testHarness.numKeyedStateEntries());
assertEquals(0, timeAdaptor.numTimers(testHarness));
}
@Override
public void apply(String key,
TimeWindow window,
Iterable<Integer> values,
Collector<String> out) throws Exception {
for (Integer in : values) {
out.collect(in.toString());
}
}
@Test
@SuppressWarnings("rawtypes")
public void testApplyWithEvictor() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(1))
.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void apply(
TimeWindow window,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
for (Tuple2<String, Integer> in : values) {
out.collect(in);
}
}
});
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof EvictingWindowOperator);
EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
Assert.assertTrue(winOperator.getEvictor() instanceof TimeEvictor);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
@Test
@SuppressWarnings("rawtypes")
public void testReduceWithProcessWindowFunctionEventTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DummyReducer reducer = new DummyReducer();
DataStream<Tuple3<String, String, Integer>> window = source
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.reduce(reducer, new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void process(
Context ctx,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple3<String, String, Integer>> out) throws Exception {
for (Tuple2<String, Integer> in : values) {
out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
}
}
});
OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
@Test
public void testMergeSinglePointWindow() {
MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
when(extractor.extract(any())).thenReturn(5000L);
DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);
assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 0)), callback);
verify(callback, never()).merge(anyCollection(), Matchers.anyObject());
}
@Override
public void apply(String key,
TimeWindow window,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple3<String, Long, Long>> out) throws Exception {
for (Tuple2<String, Integer> val: values) {
out.collect(new Tuple3<>(key + "-" + val.f1, window.getStart(), window.getEnd()));
}
}
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
// Long.MIN_VALUE is currently assigned when no timestamp is present
long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
return Collections.singletonList(new TimeWindow(start, start + size));
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
@Test
@SuppressWarnings("rawtypes")
public void testApplyEventTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void apply(
TimeWindow window,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
for (Tuple2<String, Integer> in : values) {
out.collect(in);
}
}
});
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
/**
* Verifies that calls to timeWindow() instantiate a regular
* windowOperator instead of an aligned one.
*/
@Test
public void testAlignedWindowDeprecation() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DummyReducer reducer = new DummyReducer();
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS))
.reduce(reducer);
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
Assert.assertTrue(operator1 instanceof WindowOperator);
DataStream<Tuple2<String, Integer>> window2 = source
.keyBy(0)
.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS))
.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void apply(Tuple tuple,
TimeWindow window,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
}
});
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
Assert.assertTrue(operator2 instanceof WindowOperator);
}
@Test
@SuppressWarnings("rawtypes")
public void testReduceWithProcessWindowFunctionProcessingTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple3<String, String, Integer>> window = source
.keyBy(new TupleKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.reduce(new DummyReducer(), new ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void process(String tuple,
Context ctx,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple3<String, String, Integer>> out) throws Exception {
for (Tuple2<String, Integer> in : values) {
out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
}
}
});
OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
private static <T> void shouldRegisterProcessingTimeTimerOnElement(Trigger<T, TimeWindow> mockTrigger, final long timestamp) throws Exception {
doAnswer(new Answer<TriggerResult>() {
@Override
public TriggerResult answer(InvocationOnMock invocation) throws Exception {
@SuppressWarnings("unchecked")
Trigger.TriggerContext context =
(Trigger.TriggerContext) invocation.getArguments()[3];
context.registerProcessingTimeTimer(timestamp);
return TriggerResult.CONTINUE;
}
})
.when(mockTrigger).onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
}
/**
* Merging a late window should not register a timer, otherwise we would get two firings:
* one from onElement() on the merged window and one from the timer.
*/
@Test
public void testMergingLateWindows() throws Exception {
TriggerTestHarness<Object, TimeWindow> testHarness =
new TriggerTestHarness<>(EventTimeTrigger.create(), new TimeWindow.Serializer());
assertTrue(EventTimeTrigger.create().canMerge());
assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
assertEquals(0, testHarness.numStateEntries());
assertEquals(0, testHarness.numProcessingTimeTimers());
assertEquals(2, testHarness.numEventTimeTimers());
assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
testHarness.advanceWatermark(10);
assertEquals(0, testHarness.numStateEntries());
assertEquals(0, testHarness.numProcessingTimeTimers());
assertEquals(0, testHarness.numEventTimeTimers());
assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
testHarness.mergeWindows(new TimeWindow(0, 4), Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)));
assertEquals(0, testHarness.numStateEntries());
assertEquals(0, testHarness.numProcessingTimeTimers());
assertEquals(0, testHarness.numEventTimeTimers());
assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(0, 4)));
}
@Test
@SuppressWarnings("rawtypes")
public void testApplyWithEvictor() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(1))
.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void apply(
TimeWindow window,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
for (Tuple2<String, Integer> in : values) {
out.collect(in);
}
}
});
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof EvictingWindowOperator);
EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
Assert.assertTrue(winOperator.getEvictor() instanceof TimeEvictor);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
@Test
public void testNoEventTimeGarbageCollectionTimerForLongMax() throws Exception {
WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
testHarness.open();
when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
.thenReturn(Arrays.asList(new TimeWindow(0, Long.MAX_VALUE - 10)));
assertEquals(0, testHarness.getOutput().size());
assertEquals(0, testHarness.numKeyedStateEntries());
testHarness.processElement(new StreamRecord<>(0, 0L));
// just the window contents
assertEquals(1, testHarness.numKeyedStateEntries());
// no GC timer
assertEquals(0, testHarness.numEventTimeTimers());
assertEquals(0, testHarness.numProcessingTimeTimers());
}
@Test
public void testMergingWindows() throws Exception {
TriggerTestHarness<Object, TimeWindow> testHarness =
new TriggerTestHarness<>(EventTimeTrigger.create(), new TimeWindow.Serializer());
assertTrue(EventTimeTrigger.create().canMerge());
assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
assertEquals(0, testHarness.numStateEntries());
assertEquals(0, testHarness.numProcessingTimeTimers());
assertEquals(2, testHarness.numEventTimeTimers());
assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
testHarness.mergeWindows(new TimeWindow(0, 4), Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)));
assertEquals(0, testHarness.numStateEntries());
assertEquals(0, testHarness.numProcessingTimeTimers());
assertEquals(1, testHarness.numEventTimeTimers());
assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(0, 4)));
assertEquals(TriggerResult.FIRE, testHarness.advanceWatermark(4, new TimeWindow(0, 4)));
assertEquals(0, testHarness.numStateEntries());
assertEquals(0, testHarness.numProcessingTimeTimers());
assertEquals(0, testHarness.numEventTimeTimers());
}
@Test
@SuppressWarnings("rawtypes")
public void testProcessWithEvictor() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(new TupleKeySelector())
.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(1))
.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
.process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void process(String key,
Context ctx,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
for (Tuple2<String, Integer> in : values) {
out.collect(in);
}
}
});
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof EvictingWindowOperator);
EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
Assert.assertTrue(winOperator.getEvictor() instanceof TimeEvictor);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
@Test
public void testProperties() {
SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
when(extractor.extract(any())).thenReturn(5000L);
DynamicProcessingTimeSessionWindows assigner = DynamicProcessingTimeSessionWindows.withDynamicGap(extractor);
assertFalse(assigner.isEventTime());
assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(ProcessingTimeTrigger.class));
}