下面列出了java.util.concurrent.ConcurrentLinkedQueue#add ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void main(String[] args) {
int i = 0;
// Without bug fix, OutOfMemoryError was observed at iteration 65120
int iterations = 10 * 65120;
try {
ConcurrentLinkedQueue<Long> queue = new ConcurrentLinkedQueue<>();
queue.add(0L);
while (i++ < iterations) {
queue.add(1L);
queue.remove(1L);
}
} catch (Error t) {
System.err.printf("failed at iteration %d/%d%n", i, iterations);
throw t;
}
}
private void processRecords(OneInputStreamTaskTestHarness<String, String> testHarness) throws Exception {
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.processElement(new StreamRecord<>("10"), 0, 0);
testHarness.processElement(new StreamRecord<>("20"), 0, 0);
testHarness.processElement(new StreamRecord<>("30"), 0, 0);
testHarness.waitForInputProcessing();
expectedOutput.add(new StreamRecord<>("10"));
expectedOutput.add(new StreamRecord<>("20"));
expectedOutput.add(new StreamRecord<>("30"));
TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
}
@Test
public void testEventTimeTimers() throws Exception {
KeyedCoProcessOperator<String, Integer, String, String> operator =
new KeyedCoProcessOperator<>(new EventTimeTriggeringProcessFunction());
TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
new KeyedTwoInputStreamOperatorTestHarness<>(
operator,
new IntToStringKeySelector<>(),
new IdentityKeySelector<String>(),
BasicTypeInfo.STRING_TYPE_INFO);
testHarness.setup();
testHarness.open();
testHarness.processElement1(new StreamRecord<>(17, 42L));
testHarness.processElement2(new StreamRecord<>("18", 42L));
testHarness.processWatermark1(new Watermark(5));
testHarness.processWatermark2(new Watermark(5));
testHarness.processWatermark1(new Watermark(6));
testHarness.processWatermark2(new Watermark(6));
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new StreamRecord<>("INPUT1:17", 42L));
expectedOutput.add(new StreamRecord<>("INPUT2:18", 42L));
expectedOutput.add(new StreamRecord<>("17:1777", 5L));
expectedOutput.add(new Watermark(5L));
expectedOutput.add(new StreamRecord<>("18:1777", 6L));
expectedOutput.add(new Watermark(6L));
TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.close();
}
/**
* This also verifies that the timestamps ouf side-emitted records is correct.
*/
@Test
public void testSideOutput() throws Exception {
KeyedProcessOperator<Integer, Integer, String> operator = new KeyedProcessOperator<>(new SideOutputProcessFunction());
OneInputStreamOperatorTestHarness<Integer, String> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(
operator, new IdentityKeySelector<>(), BasicTypeInfo.INT_TYPE_INFO);
testHarness.setup();
testHarness.open();
testHarness.processElement(new StreamRecord<>(42, 17L /* timestamp */));
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new StreamRecord<>("IN:42", 17L /* timestamp */));
TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
ConcurrentLinkedQueue<StreamRecord<Integer>> expectedIntSideOutput = new ConcurrentLinkedQueue<>();
expectedIntSideOutput.add(new StreamRecord<>(42, 17L /* timestamp */));
ConcurrentLinkedQueue<StreamRecord<Integer>> intSideOutput =
testHarness.getSideOutput(SideOutputProcessFunction.INTEGER_OUTPUT_TAG);
TestHarnessUtil.assertOutputEquals(
"Side output was not correct.",
expectedIntSideOutput,
intSideOutput);
ConcurrentLinkedQueue<StreamRecord<Long>> expectedLongSideOutput = new ConcurrentLinkedQueue<>();
expectedLongSideOutput.add(new StreamRecord<>(42L, 17L /* timestamp */));
ConcurrentLinkedQueue<StreamRecord<Long>> longSideOutput =
testHarness.getSideOutput(SideOutputProcessFunction.LONG_OUTPUT_TAG);
TestHarnessUtil.assertOutputEquals(
"Side output was not correct.",
expectedLongSideOutput,
longSideOutput);
testHarness.close();
}
@Test
@SuppressWarnings("unchecked")
public void testFilter() throws Exception {
StreamFilter<Integer> operator = new StreamFilter<Integer>(new MyFilter());
OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);
long initialTime = 0L;
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
testHarness.open();
testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));
testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 2));
testHarness.processWatermark(new Watermark(initialTime + 2));
testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 3));
testHarness.processElement(new StreamRecord<Integer>(4, initialTime + 4));
testHarness.processElement(new StreamRecord<Integer>(5, initialTime + 5));
testHarness.processElement(new StreamRecord<Integer>(6, initialTime + 6));
testHarness.processElement(new StreamRecord<Integer>(7, initialTime + 7));
expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 2));
expectedOutput.add(new Watermark(initialTime + 2));
expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 4));
expectedOutput.add(new StreamRecord<Integer>(6, initialTime + 6));
TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
}
/**
* containsAll(c) is true when c contains a subset of elements
*/
public void testContainsAll() {
ConcurrentLinkedQueue q = populatedQueue(SIZE);
ConcurrentLinkedQueue p = new ConcurrentLinkedQueue();
for (int i = 0; i < SIZE; ++i) {
assertTrue(q.containsAll(p));
assertFalse(p.containsAll(q));
p.add(new Integer(i));
}
assertTrue(p.containsAll(q));
}
@Test
public void testTimestampAndWatermarkQuerying() throws Exception {
LegacyKeyedCoProcessOperator<String, Integer, String, String> operator =
new LegacyKeyedCoProcessOperator<>(new WatermarkQueryingProcessFunction());
TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
new KeyedTwoInputStreamOperatorTestHarness<>(
operator,
new IntToStringKeySelector<>(),
new IdentityKeySelector<String>(),
BasicTypeInfo.STRING_TYPE_INFO);
testHarness.setup();
testHarness.open();
testHarness.processWatermark1(new Watermark(17));
testHarness.processWatermark2(new Watermark(17));
testHarness.processElement1(new StreamRecord<>(5, 12L));
testHarness.processWatermark1(new Watermark(42));
testHarness.processWatermark2(new Watermark(42));
testHarness.processElement2(new StreamRecord<>("6", 13L));
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new Watermark(17L));
expectedOutput.add(new StreamRecord<>("5WM:17 TS:12", 12L));
expectedOutput.add(new Watermark(42L));
expectedOutput.add(new StreamRecord<>("6WM:42 TS:13", 13L));
TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.close();
}
@Test
public void testCleanupTimeOverflow() throws Exception {
long windowSize = 1000;
long lateness = 2000;
WindowOperator operator = WindowOperatorBuilder
.builder()
.withInputFields(inputFieldTypes)
.tumble(Duration.ofMillis(windowSize))
.withEventTime(2)
.withAllowedLateness(Duration.ofMillis(lateness))
.withSendRetraction()
.aggregateAndBuild(new SumAndCountAggTimeWindow(), equaliser, accTypes, aggResultTypes, windowTypes);
OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness =
new KeyedOneInputStreamOperatorTestHarness<BaseRow, BaseRow, BaseRow>(
operator, keySelector, keyType);
testHarness.open();
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
WindowAssigner<TimeWindow> windowAssigner = TumblingWindowAssigner.of(Duration.ofMillis(windowSize));
long timestamp = Long.MAX_VALUE - 1750;
Collection<TimeWindow> windows = windowAssigner.assignWindows(GenericRow.of(fromString("key2"), 1), timestamp);
TimeWindow window = windows.iterator().next();
testHarness.processElement(record("key2", 1, timestamp));
// the garbage collection timer would wrap-around
assertTrue(window.maxTimestamp() + lateness < window.maxTimestamp());
// and it would prematurely fire with watermark (Long.MAX_VALUE - 1500)
assertTrue(window.maxTimestamp() + lateness < Long.MAX_VALUE - 1500);
// if we don't correctly prevent wrap-around in the garbage collection
// timers this watermark will clean our window state for the just-added
// element/window
testHarness.processWatermark(new Watermark(Long.MAX_VALUE - 1500));
// this watermark is before the end timestamp of our only window
assertTrue(Long.MAX_VALUE - 1500 < window.maxTimestamp());
assertTrue(window.maxTimestamp() < Long.MAX_VALUE);
// push in a watermark that will trigger computation of our window
testHarness.processWatermark(new Watermark(window.maxTimestamp()));
expected.add(new Watermark(Long.MAX_VALUE - 1500));
expected.add(record("key2", 1L, 1L, window.getStart(), window.getEnd(), window.maxTimestamp()));
expected.add(new Watermark(window.maxTimestamp()));
assertor.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput());
testHarness.close();
}
/**
* Manually run this to write binary snapshot data.
*/
@Ignore
@Test
public void writeReducingProcessingTimeWindowsSnapshot() throws Exception {
final int windowSize = 3;
ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
new SumReducer<>(),
STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector<>(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
ProcessingTimeTrigger.create(),
0,
null /* late data output tag */);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector<>(), BasicTypeInfo.STRING_TYPE_INFO);
testHarness.setup();
testHarness.open();
testHarness.setProcessingTime(10);
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1)));
testHarness.setProcessingTime(3010);
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key3", 1)));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 1), 2999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 1), 2999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator<>());
// do snapshot and save to file
OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
OperatorSnapshotUtil.writeStateHandle(
snapshot,
"src/test/resources/win-op-migration-test-reduce-processing-time-flink" + flinkGenerateSavepointVersion + "-snapshot");
testHarness.close();
}
@Test
public void testProcessingTimeTimers() throws Exception {
LegacyKeyedProcessOperator<Integer, Integer, Integer> operator =
new LegacyKeyedProcessOperator<>(new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));
OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
testHarness.setup();
testHarness.open();
testHarness.processElement(new StreamRecord<>(17));
testHarness.setProcessingTime(5);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new StreamRecord<>(17));
expectedOutput.add(new StreamRecord<>(1777));
TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.close();
}
@Test
public void testNotSideOutputDueToLatenessSessionWithLateness() throws Exception {
// same as testSideOutputDueToLatenessSessionWithLateness() but with an accumulating trigger, i.e.
// one that does not return FIRE_AND_PURGE when firing but just FIRE. The expected
// results are therefore slightly different.
final int gapSize = 3;
final long lateness = 10;
ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
new SumReducer(),
STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Long, Long>, TimeWindow> operator =
new WindowOperator<>(
EventTimeSessionWindows.withGap(Time.seconds(gapSize)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
EventTimeTrigger.create(),
lateness,
lateOutputTag /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
createTestHarness(operator);
testHarness.open();
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
testHarness.processWatermark(new Watermark(1999));
expected.add(new Watermark(1999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 2000));
testHarness.processWatermark(new Watermark(4998));
expected.add(new Watermark(4998));
// this will not be sideoutput because the session we're adding two has maxTimestamp
// after the current watermark
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 4500));
// new session
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 8500));
testHarness.processWatermark(new Watermark(7400));
expected.add(new Watermark(7400));
// this will merge the two sessions into one
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
testHarness.processWatermark(new Watermark(11501));
expected.add(new StreamRecord<>(new Tuple3<>("key2-5", 1000L, 11500L), 11499));
expected.add(new Watermark(11501));
// new session
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 11600));
testHarness.processWatermark(new Watermark(14600));
expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 11600L, 14600L), 14599));
expected.add(new Watermark(14600));
// because of the small allowed lateness and because the trigger is accumulating
// this will be merged into the session (11600-14600) and therefore will not
// be sideoutput as late
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
// adding ("key2", 1) extended the session to (10000-146000) for which
// maxTimestamp <= currentWatermark. Therefore, we immediately get a firing
// with the current version of EventTimeTrigger/EventTimeTriggerAccum
expected.add(new StreamRecord<>(new Tuple3<>("key2-2", 10000L, 14600L), 14599));
ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
ConcurrentLinkedQueue<StreamRecord<Tuple2<String, Integer>>> sideActual = testHarness.getSideOutput(
lateOutputTag);
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
assertEquals(null, sideActual);
testHarness.processWatermark(new Watermark(20000));
expected.add(new StreamRecord<>(new Tuple3<>("key2-3", 10000L, 17500L), 17499));
expected.add(new Watermark(20000));
testHarness.processWatermark(new Watermark(100000));
expected.add(new Watermark(100000));
actual = testHarness.getOutput();
sideActual = testHarness.getSideOutput(lateOutputTag);
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
assertEquals(null, sideActual);
testHarness.close();
}
@Test
@SuppressWarnings("unchecked")
public void testEventTimeTumblingWindows() throws Exception {
closeCalled.set(0);
WindowOperator operator = WindowOperatorBuilder
.builder()
.withInputFields(inputFieldTypes)
.tumble(Duration.ofSeconds(3))
.withEventTime(2)
.aggregateAndBuild(getTimeWindowAggFunction(), equaliser, accTypes, aggResultTypes, windowTypes);
OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = createTestHarness(operator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// add elements out-of-order
testHarness.processElement(record("key2", 1, 3999L));
testHarness.processElement(record("key2", 1, 3000L));
testHarness.processElement(record("key1", 1, 20L));
testHarness.processElement(record("key1", 1, 0L));
testHarness.processElement(record("key1", 1, 999L));
testHarness.processElement(record("key2", 1, 1998L));
testHarness.processElement(record("key2", 1, 1999L));
testHarness.processElement(record("key2", 1, 1000L));
testHarness.processWatermark(new Watermark(999));
expectedOutput.add(new Watermark(999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processWatermark(new Watermark(1999));
expectedOutput.add(new Watermark(1999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
// do a snapshot, close and restore again
OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
testHarness.close();
expectedOutput.clear();
testHarness = createTestHarness(operator);
testHarness.setup();
testHarness.initializeState(snapshot);
testHarness.open();
testHarness.processWatermark(new Watermark(2999));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key1", 3L, 3L, 0L, 3000L, 2999L)));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key2", 3L, 3L, 0L, 3000L, 2999L)));
expectedOutput.add(new Watermark(2999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processWatermark(new Watermark(3999));
expectedOutput.add(new Watermark(3999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processWatermark(new Watermark(4999));
expectedOutput.add(new Watermark(4999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processWatermark(new Watermark(5999));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key2", 2L, 2L, 3000L, 6000L, 5999L)));
expectedOutput.add(new Watermark(5999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
// those don't have any effect...
testHarness.processWatermark(new Watermark(6999));
testHarness.processWatermark(new Watermark(7999));
expectedOutput.add(new Watermark(6999));
expectedOutput.add(new Watermark(7999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.close();
// we close once in the rest...
assertEquals("Close was not called.", 2, closeCalled.get());
}
/**
* Delay a while before async invocation to check whether end input waits for all elements finished or not.
*/
@Test
public void testEndInput() throws Exception {
final AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>(
new DelayedAsyncFunction(10),
-1,
2,
AsyncDataStream.OutputMode.ORDERED);
final OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
new OneInputStreamOperatorTestHarness<>(operator, IntSerializer.INSTANCE);
final long initialTime = 0L;
final ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new StreamRecord<>(2, initialTime + 1));
expectedOutput.add(new StreamRecord<>(4, initialTime + 2));
expectedOutput.add(new Watermark(initialTime + 2));
expectedOutput.add(new StreamRecord<>(6, initialTime + 3));
testHarness.open();
try {
synchronized (testHarness.getCheckpointLock()) {
testHarness.processElement(new StreamRecord<>(1, initialTime + 1));
testHarness.processElement(new StreamRecord<>(2, initialTime + 2));
testHarness.processWatermark(new Watermark(initialTime + 2));
testHarness.processElement(new StreamRecord<>(3, initialTime + 3));
}
// wait until all async collectors in the buffer have been emitted out.
synchronized (testHarness.getCheckpointLock()) {
testHarness.endInput();
}
TestHarnessUtil.assertOutputEquals("Output with watermark was not correct.", expectedOutput, testHarness.getOutput());
} finally {
synchronized (testHarness.getCheckpointLock()) {
testHarness.close();
}
}
}
@Test
public void testProcessingTimeTumblingWindows() throws Throwable {
final int windowSize = 3;
ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
new SumReducer(),
STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
ProcessingTimeTrigger.create(),
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
createTestHarness(operator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
testHarness.setProcessingTime(3);
// timestamp is ignored in processing time
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
testHarness.setProcessingTime(5000);
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
testHarness.setProcessingTime(7000);
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.close();
}
@Test
public void testRestoreSessionWindowsWithCountTrigger() throws Exception {
final int sessionSize = 3;
ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
new TimeWindow.Serializer(),
new TupleKeySelector<String>(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new SessionWindowFunction()),
PurgingTrigger.of(CountTrigger.of(4)),
0,
null /* late data output tag */);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector<>(), BasicTypeInfo.STRING_TYPE_INFO);
testHarness.setup();
testHarness.initializeState(
OperatorSnapshotUtil.getResourceFilename(
"win-op-migration-test-session-with-stateful-trigger-flink" + testMigrateVersion + "-snapshot"));
testHarness.open();
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 6000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 6500));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 7000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
// add an element that merges the two "key1" sessions, they should now have count 6, and therefore fire
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 10), 4500));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-22", 10L, 10000L), 9999L));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
testHarness.close();
}
@Test
@SuppressWarnings("unchecked")
public void testCountTrigger() throws Exception {
closeCalled.set(0);
final int windowSize = 4;
ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
new SumReducer(),
STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>(
GlobalWindows.create(),
new GlobalWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
PurgingTrigger.of(CountTrigger.of(windowSize)),
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
createTestHarness(operator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// The global window actually ignores these timestamps...
// add elements out-of-order
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 20));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 0));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
// do a snapshot, close and restore again
OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
testHarness.close();
ConcurrentLinkedQueue<Object> outputBeforeClose = testHarness.getOutput();
stateDesc = new ReducingStateDescriptor<>("window-contents",
new SumReducer(),
STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
operator = new WindowOperator<>(
GlobalWindows.create(),
new GlobalWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
PurgingTrigger.of(CountTrigger.of(windowSize)),
0,
null /* late data output tag */);
testHarness = createTestHarness(operator);
testHarness.setup();
testHarness.initializeState(snapshot);
testHarness.open();
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, Iterables.concat(outputBeforeClose, testHarness.getOutput()), new Tuple2ResultSortComparator());
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, Iterables.concat(outputBeforeClose, testHarness.getOutput()), new Tuple2ResultSortComparator());
testHarness.close();
}
/**
* Tests DeltaEvictor, evictAfter behavior.
*/
@Test
public void testDeltaEvictorEvictAfter() throws Exception {
AtomicInteger closeCalled = new AtomicInteger(0);
final int triggerCount = 2;
final boolean evictAfter = true;
final int threshold = 2;
@SuppressWarnings({"unchecked", "rawtypes"})
TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
(TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);
EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
GlobalWindows.create(),
new GlobalWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
CountTrigger.of(triggerCount),
DeltaEvictor.of(threshold, new DeltaFunction<Tuple2<String, Integer>>() {
@Override
public double getDelta(Tuple2<String, Integer> oldDataPoint, Tuple2<String, Integer> newDataPoint) {
return newDataPoint.f1 - oldDataPoint.f1;
}
}, evictAfter),
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
long initialTime = 0L;
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), initialTime + 3999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 5), initialTime + 999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 1998));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), initialTime + 1999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 15), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 9), initialTime + 10999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), initialTime + 1000));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 16), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 22), Long.MAX_VALUE));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
testHarness.close();
Assert.assertEquals("Close was not called.", 1, closeCalled.get());
}
@Test
@SuppressWarnings("unchecked")
public void testReduceSessionWindows() throws Exception {
closeCalled.set(0);
final int sessionSize = 3;
ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>(
"window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
EventTimeTrigger.create(),
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
createTestHarness(operator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// add elements out-of-order
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
// do a snapshot, close and restore again
OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
testHarness.close();
testHarness = createTestHarness(operator);
testHarness.setup();
testHarness.initializeState(snapshot);
testHarness.open();
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 5501));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 6050));
testHarness.processWatermark(new Watermark(12000));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-6", 10L, 5500L), 5499));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-20", 5501L, 9050L), 9049));
expectedOutput.add(new Watermark(12000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 15000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 20), 15000));
testHarness.processWatermark(new Watermark(17999));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-30", 15000L, 18000L), 17999));
expectedOutput.add(new Watermark(17999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
testHarness.close();
}
@Test
@SuppressWarnings("unchecked")
public void testReduceSessionWindowsWithProcessFunction() throws Exception {
closeCalled.set(0);
final int sessionSize = 3;
ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>(
"window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalSingleValueProcessWindowFunction<>(new ReducedProcessSessionWindowFunction()),
EventTimeTrigger.create(),
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
createTestHarness(operator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// add elements out-of-order
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
// do a snapshot, close and restore again
OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
testHarness.close();
testHarness = createTestHarness(operator);
testHarness.setup();
testHarness.initializeState(snapshot);
testHarness.open();
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 5501));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 6050));
testHarness.processWatermark(new Watermark(12000));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-6", 10L, 5500L), 5499));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-20", 5501L, 9050L), 9049));
expectedOutput.add(new Watermark(12000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 15000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 20), 15000));
testHarness.processWatermark(new Watermark(17999));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-30", 15000L, 18000L), 17999));
expectedOutput.add(new Watermark(17999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
testHarness.close();
}
@Test
public void testEventTimeTimers() throws Exception {
final int expectedKey = 17;
KeyedProcessOperator<Integer, Integer, Integer> operator =
new KeyedProcessOperator<>(new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME, expectedKey));
OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
testHarness.setup();
testHarness.open();
testHarness.processWatermark(new Watermark(0));
testHarness.processElement(new StreamRecord<>(expectedKey, 42L));
testHarness.processWatermark(new Watermark(5));
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new Watermark(0L));
expectedOutput.add(new StreamRecord<>(expectedKey, 42L));
expectedOutput.add(new StreamRecord<>(1777, 5L));
expectedOutput.add(new Watermark(5L));
TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.close();
}