下面列出了java.util.concurrent.ConcurrentLinkedQueue#clear ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Cancels all SnackBar messages for an activity
*
* @param activity
*/
public void cancelSnackBars(Activity activity) {
ConcurrentLinkedQueue<SnackBarItem> list = mQueue.get(activity);
if (list != null) {
mIsCanceling = true;
for (SnackBarItem items : list) {
items.cancel();
}
mIsCanceling = false;
list.clear();
mQueue.remove(activity);
}
}
@Test
public void testWatermarkAssignerWithIdleSource() throws Exception {
// with timeout 1000 ms
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(0, WATERMARK_GENERATOR, 1000);
testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
testHarness.open();
testHarness.processElement(new StreamRecord<>(GenericRowData.of(1L)));
testHarness.processElement(new StreamRecord<>(GenericRowData.of(2L)));
testHarness.processWatermark(new Watermark(2)); // this watermark should be ignored
testHarness.processElement(new StreamRecord<>(GenericRowData.of(3L)));
testHarness.processElement(new StreamRecord<>(GenericRowData.of(4L)));
// trigger watermark emit
testHarness.setProcessingTime(51);
ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
List<Watermark> watermarks = extractWatermarks(output);
assertEquals(1, watermarks.size());
assertEquals(new Watermark(3), watermarks.get(0));
assertEquals(StreamStatus.ACTIVE, testHarness.getStreamStatus());
output.clear();
testHarness.setProcessingTime(1001);
assertEquals(StreamStatus.IDLE, testHarness.getStreamStatus());
testHarness.processElement(new StreamRecord<>(GenericRowData.of(4L)));
testHarness.processElement(new StreamRecord<>(GenericRowData.of(5L)));
testHarness.processElement(new StreamRecord<>(GenericRowData.of(6L)));
testHarness.processElement(new StreamRecord<>(GenericRowData.of(7L)));
testHarness.processElement(new StreamRecord<>(GenericRowData.of(8L)));
assertEquals(StreamStatus.ACTIVE, testHarness.getStreamStatus());
testHarness.setProcessingTime(1060);
output = testHarness.getOutput();
watermarks = extractWatermarks(output);
assertEquals(1, watermarks.size());
assertEquals(new Watermark(7), watermarks.get(0));
}
/**
* clear removes all elements
*/
public void testClear() {
ConcurrentLinkedQueue q = populatedQueue(SIZE);
q.clear();
assertTrue(q.isEmpty());
assertEquals(0, q.size());
q.add(one);
assertFalse(q.isEmpty());
q.clear();
assertTrue(q.isEmpty());
}
private void testSlidingEventTimeWindows(OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator) throws Exception {
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
createTestHarness(operator);
testHarness.setup();
testHarness.open();
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
// add elements out-of-order
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 20));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 0));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
testHarness.processWatermark(new Watermark(999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 999));
expectedOutput.add(new Watermark(999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.processWatermark(new Watermark(1999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 1999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 1999));
expectedOutput.add(new Watermark(1999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.processWatermark(new Watermark(2999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 2999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
expectedOutput.add(new Watermark(2999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
// do a snapshot, close and restore again
OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
testHarness.close();
expectedOutput.clear();
testHarness = createTestHarness(operator);
testHarness.setup();
testHarness.initializeState(snapshot);
testHarness.open();
testHarness.processWatermark(new Watermark(3999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), 3999));
expectedOutput.add(new Watermark(3999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.processWatermark(new Watermark(4999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 4999));
expectedOutput.add(new Watermark(4999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.processWatermark(new Watermark(5999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 5999));
expectedOutput.add(new Watermark(5999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
// those don't have any effect...
testHarness.processWatermark(new Watermark(6999));
testHarness.processWatermark(new Watermark(7999));
expectedOutput.add(new Watermark(6999));
expectedOutput.add(new Watermark(7999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.close();
}
private void testTumblingEventTimeWindows(OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator) throws Exception {
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
createTestHarness(operator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// add elements out-of-order
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 20));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 0));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
testHarness.processWatermark(new Watermark(999));
expectedOutput.add(new Watermark(999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.processWatermark(new Watermark(1999));
expectedOutput.add(new Watermark(1999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
// do a snapshot, close and restore again
OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.close();
testHarness = createTestHarness(operator);
expectedOutput.clear();
testHarness.setup();
testHarness.initializeState(snapshot);
testHarness.open();
testHarness.processWatermark(new Watermark(2999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 2999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
expectedOutput.add(new Watermark(2999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.processWatermark(new Watermark(3999));
expectedOutput.add(new Watermark(3999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.processWatermark(new Watermark(4999));
expectedOutput.add(new Watermark(4999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.processWatermark(new Watermark(5999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 5999));
expectedOutput.add(new Watermark(5999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
// those don't have any effect...
testHarness.processWatermark(new Watermark(6999));
testHarness.processWatermark(new Watermark(7999));
expectedOutput.add(new Watermark(6999));
expectedOutput.add(new Watermark(7999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.close();
}
/**
* This tests whether merging works correctly with the ContinuousEventTimeTrigger.
*/
@Test
public void testSessionWindowsWithContinuousEventTimeTrigger() throws Exception {
closeCalled.set(0);
final int sessionSize = 3;
ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new SessionWindowFunction()),
ContinuousEventTimeTrigger.of(Time.seconds(2)),
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
createTestHarness(operator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// add elements out-of-order and first trigger time is 2000
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 1500));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
// triggers emit and next trigger time is 4000
testHarness.processWatermark(new Watermark(2500));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-1", 1500L, 4500L), 4499));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499));
expectedOutput.add(new Watermark(2500));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 4000));
testHarness.processWatermark(new Watermark(3000));
expectedOutput.add(new Watermark(3000));
// do a snapshot, close and restore again
OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
testHarness.close();
expectedOutput.clear();
testHarness = createTestHarness(operator);
testHarness.setup();
testHarness.initializeState(snapshot);
testHarness.open();
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 4000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 3500));
// triggers emit and next trigger time is 6000
testHarness.processWatermark(new Watermark(4000));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 1500L, 7000L), 6999));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-15", 0L, 7000L), 6999));
expectedOutput.add(new Watermark(4000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
testHarness.close();
}
@Test
@SuppressWarnings("unchecked")
public void testEventTimeTumblingWindows() throws Exception {
closeCalled.set(0);
WindowOperator operator = WindowOperatorBuilder
.builder()
.withInputFields(inputFieldTypes)
.tumble(Duration.ofSeconds(3))
.withEventTime(2)
.aggregateAndBuild(getTimeWindowAggFunction(), equaliser, accTypes, aggResultTypes, windowTypes);
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(operator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// add elements out-of-order
testHarness.processElement(insertRecord("key2", 1, 3999L));
testHarness.processElement(insertRecord("key2", 1, 3000L));
testHarness.processElement(insertRecord("key1", 1, 20L));
testHarness.processElement(insertRecord("key1", 1, 0L));
testHarness.processElement(insertRecord("key1", 1, 999L));
testHarness.processElement(insertRecord("key2", 1, 1998L));
testHarness.processElement(insertRecord("key2", 1, 1999L));
testHarness.processElement(insertRecord("key2", 1, 1000L));
testHarness.processWatermark(new Watermark(999));
expectedOutput.add(new Watermark(999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processWatermark(new Watermark(1999));
expectedOutput.add(new Watermark(1999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
// do a snapshot, close and restore again
OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
testHarness.close();
expectedOutput.clear();
testHarness = createTestHarness(operator);
testHarness.setup();
testHarness.initializeState(snapshot);
testHarness.open();
testHarness.processWatermark(new Watermark(2999));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key1", 3L, 3L, 0L, 3000L, 2999L)));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key2", 3L, 3L, 0L, 3000L, 2999L)));
expectedOutput.add(new Watermark(2999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processWatermark(new Watermark(3999));
expectedOutput.add(new Watermark(3999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processWatermark(new Watermark(4999));
expectedOutput.add(new Watermark(4999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processWatermark(new Watermark(5999));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key2", 2L, 2L, 3000L, 6000L, 5999L)));
expectedOutput.add(new Watermark(5999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
// those don't have any effect...
testHarness.processWatermark(new Watermark(6999));
testHarness.processWatermark(new Watermark(7999));
expectedOutput.add(new Watermark(6999));
expectedOutput.add(new Watermark(7999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.close();
// we close once in the rest...
assertEquals("Close was not called.", 2, closeCalled.get());
}
@Test
public void testMiniBatchedWatermarkAssignerWithIdleSource() throws Exception {
// with timeout 1000 ms
final MiniBatchedWatermarkAssignerOperator operator = new MiniBatchedWatermarkAssignerOperator(
0, 1, 0, 1000, 50);
OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness =
new OneInputStreamOperatorTestHarness<>(operator);
testHarness.open();
testHarness.processElement(new StreamRecord<>(GenericRow.of(1L)));
testHarness.processElement(new StreamRecord<>(GenericRow.of(2L)));
testHarness.processWatermark(new Watermark(2)); // this watermark should be ignored
testHarness.processElement(new StreamRecord<>(GenericRow.of(3L)));
testHarness.processElement(new StreamRecord<>(GenericRow.of(4L)));
// this watermark excess expected watermark, should emit a watermark of 49
testHarness.processElement(new StreamRecord<>(GenericRow.of(50L)));
ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
List<Watermark> watermarks = extractWatermarks(output);
assertEquals(1, watermarks.size());
assertEquals(new Watermark(49), watermarks.get(0));
assertEquals(StreamStatus.ACTIVE, testHarness.getStreamStatus());
output.clear();
testHarness.setProcessingTime(1001);
assertEquals(StreamStatus.IDLE, testHarness.getStreamStatus());
testHarness.processElement(new StreamRecord<>(GenericRow.of(51L)));
assertEquals(StreamStatus.ACTIVE, testHarness.getStreamStatus());
// process time will not trigger to emit watermark
testHarness.setProcessingTime(1060);
output = testHarness.getOutput();
watermarks = extractWatermarks(output);
assertTrue(watermarks.isEmpty());
output.clear();
testHarness.processElement(new StreamRecord<>(GenericRow.of(100L)));
output = testHarness.getOutput();
watermarks = extractWatermarks(output);
assertEquals(1, watermarks.size());
assertEquals(new Watermark(99), watermarks.get(0));
}
/**
*
* @param backendServer
*/
public static void clear(BackendServerConf backendServer)
{
ConcurrentLinkedQueue<ChannelContext> queue = queueMap.get(backendServer);
queue.clear();
}
@Test
public void testEventTimeSlidingWindows() throws Exception {
closeCalled.set(0);
WindowOperator operator = WindowOperatorBuilder
.builder()
.withInputFields(inputFieldTypes)
.sliding(Duration.ofSeconds(3), Duration.ofSeconds(1))
.withEventTime(2)
.aggregateAndBuild(getTimeWindowAggFunction(), equaliser, accTypes, aggResultTypes, windowTypes);
OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = createTestHarness(operator);
testHarness.open();
// process elements
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
// add elements out-of-order
testHarness.processElement(record("key2", 1, 3999L));
testHarness.processElement(record("key2", 1, 3000L));
testHarness.processElement(record("key1", 1, 20L));
testHarness.processElement(record("key1", 1, 0L));
testHarness.processElement(record("key1", 1, 999L));
testHarness.processElement(record("key2", 1, 1998L));
testHarness.processElement(record("key2", 1, 1999L));
testHarness.processElement(record("key2", 1, 1000L));
testHarness.processWatermark(new Watermark(999));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key1", 3L, 3L, -2000L, 1000L, 999L)));
expectedOutput.add(new Watermark(999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processWatermark(new Watermark(1999));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key1", 3L, 3L, -1000L, 2000L, 1999L)));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key2", 3L, 3L, -1000L, 2000L, 1999L)));
expectedOutput.add(new Watermark(1999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processWatermark(new Watermark(2999));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key1", 3L, 3L, 0L, 3000L, 2999L)));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key2", 3L, 3L, 0L, 3000L, 2999L)));
expectedOutput.add(new Watermark(2999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
// do a snapshot, close and restore again
OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
testHarness.close();
expectedOutput.clear();
testHarness = createTestHarness(operator);
testHarness.setup();
testHarness.initializeState(snapshot);
testHarness.open();
testHarness.processWatermark(new Watermark(3999));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key2", 5L, 5L, 1000L, 4000L, 3999L)));
expectedOutput.add(new Watermark(3999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processWatermark(new Watermark(4999));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key2", 2L, 2L, 2000L, 5000L, 4999L)));
expectedOutput.add(new Watermark(4999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processWatermark(new Watermark(5999));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key2", 2L, 2L, 3000L, 6000L, 5999L)));
expectedOutput.add(new Watermark(5999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
// those don't have any effect...
testHarness.processWatermark(new Watermark(6999));
testHarness.processWatermark(new Watermark(7999));
expectedOutput.add(new Watermark(6999));
expectedOutput.add(new Watermark(7999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.close();
// we close once in the rest...
assertEquals("Close was not called.", 2, closeCalled.get());
}
private ConcurrentLinkedQueue<Integer> _resetSliceQueue(ConcurrentLinkedQueue<Integer> queue, int max_bytes) {
if (max_bytes > 0) {
queue.clear();
int slice_num = (int) Math.floor((double) max_bytes / _slice_size);
for (int i = 0; i < slice_num; i++) {
queue.add(_slice_size);
}
if (max_bytes % _slice_size != 0) {
queue.add(max_bytes % _slice_size);
}
}
return queue;
}
@Test
public void testTumblingCountWindow() throws Exception {
closeCalled.set(0);
final int windowSize = 3;
LogicalType[] windowTypes = new LogicalType[] { new BigIntType() };
WindowOperator operator = WindowOperatorBuilder
.builder()
.withInputFields(inputFieldTypes)
.countWindow(windowSize)
.aggregateAndBuild(getCountWindowAggFunction(), equaliser, accTypes, aggResultTypes, windowTypes);
OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = createTestHarness(operator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
testHarness.processElement(record("key2", 1, 0L));
testHarness.processElement(record("key2", 2, 1000L));
testHarness.processElement(record("key2", 3, 2500L));
testHarness.processElement(record("key1", 1, 10L));
testHarness.processElement(record("key1", 2, 1000L));
testHarness.processWatermark(new Watermark(12000));
testHarness.setProcessingTime(12000L);
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key2", 6L, 3L, 0L)));
expectedOutput.add(new Watermark(12000));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
// do a snapshot, close and restore again
OperatorSubtaskState snapshotV2 = testHarness.snapshot(0L, 0);
testHarness.close();
expectedOutput.clear();
testHarness = createTestHarness(operator);
testHarness.setup();
testHarness.initializeState(snapshotV2);
testHarness.open();
testHarness.processElement(record("key1", 2, 2500L));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key1", 5L, 3L, 0L)));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processElement(record("key2", 4, 5501L));
testHarness.processElement(record("key2", 5, 6000L));
testHarness.processElement(record("key2", 5, 6000L));
testHarness.processElement(record("key2", 6, 6050L));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key2", 14L, 3L, 1L)));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processElement(record("key1", 3, 4000L));
testHarness.processElement(record("key2", 10, 15000L));
testHarness.processElement(record("key2", 20, 15000L));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key2", 36L, 3L, 2L)));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processElement(record("key1", 2, 2500L));
testHarness.processElement(record("key1", 2, 2500L));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key1", 7L, 3L, 1L)));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.close();
// we close once in the rest...
assertEquals("Close was not called.", 2, closeCalled.get());
}
@Test
public void testSlidingCountWindow() throws Exception {
closeCalled.set(0);
final int windowSize = 5;
final int windowSlide = 3;
LogicalType[] windowTypes = new LogicalType[] { new BigIntType() };
WindowOperator operator = WindowOperatorBuilder
.builder()
.withInputFields(inputFieldTypes)
.countWindow(windowSize, windowSlide)
.aggregateAndBuild(getCountWindowAggFunction(), equaliser, accTypes, aggResultTypes, windowTypes);
OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = createTestHarness(operator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
testHarness.processElement(record("key2", 1, 0L));
testHarness.processElement(record("key2", 2, 1000L));
testHarness.processElement(record("key2", 3, 2500L));
testHarness.processElement(record("key2", 4, 2500L));
testHarness.processElement(record("key2", 5, 2500L));
testHarness.processElement(record("key1", 1, 10L));
testHarness.processElement(record("key1", 2, 1000L));
testHarness.processWatermark(new Watermark(12000));
testHarness.setProcessingTime(12000L);
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key2", 15L, 5L, 0L)));
expectedOutput.add(new Watermark(12000));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
// do a snapshot, close and restore again
OperatorSubtaskState snapshotV2 = testHarness.snapshot(0L, 0);
testHarness.close();
expectedOutput.clear();
testHarness = createTestHarness(operator);
testHarness.setup();
testHarness.initializeState(snapshotV2);
testHarness.open();
testHarness.processElement(record("key1", 3, 2500L));
testHarness.processElement(record("key1", 4, 2500L));
testHarness.processElement(record("key1", 5, 2500L));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key1", 15L, 5L, 0L)));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processElement(record("key2", 6, 6000L));
testHarness.processElement(record("key2", 7, 6000L));
testHarness.processElement(record("key2", 8, 6050L));
testHarness.processElement(record("key2", 9, 6050L));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key2", 30L, 5L, 1L)));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processElement(record("key1", 6, 4000L));
testHarness.processElement(record("key1", 7, 4000L));
testHarness.processElement(record("key1", 8, 4000L));
testHarness.processElement(record("key2", 10, 15000L));
testHarness.processElement(record("key2", 11, 15000L));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key1", 30L, 5L, 1L)));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key2", 45L, 5L, 2L)));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.close();
// we close once in the rest...
assertEquals("Close was not called.", 2, closeCalled.get());
}
/**
* This tests whether merging works correctly with the ContinuousEventTimeTrigger.
*/
@Test
public void testSessionWindowsWithContinuousEventTimeTrigger() throws Exception {
closeCalled.set(0);
final int sessionSize = 3;
ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new SessionWindowFunction()),
ContinuousEventTimeTrigger.of(Time.seconds(2)),
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
createTestHarness(operator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// add elements out-of-order and first trigger time is 2000
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 1500));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
// triggers emit and next trigger time is 4000
testHarness.processWatermark(new Watermark(2500));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-1", 1500L, 4500L), 4499));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499));
expectedOutput.add(new Watermark(2500));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 4000));
testHarness.processWatermark(new Watermark(3000));
expectedOutput.add(new Watermark(3000));
// do a snapshot, close and restore again
OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
testHarness.close();
expectedOutput.clear();
testHarness = createTestHarness(operator);
testHarness.setup();
testHarness.initializeState(snapshot);
testHarness.open();
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 4000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 3500));
// triggers emit and next trigger time is 6000
testHarness.processWatermark(new Watermark(4000));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 1500L, 7000L), 6999));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-15", 0L, 7000L), 6999));
expectedOutput.add(new Watermark(4000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
testHarness.close();
}
private void testSlidingEventTimeWindows(OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator) throws Exception {
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
createTestHarness(operator);
testHarness.setup();
testHarness.open();
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
// add elements out-of-order
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 20));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 0));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
testHarness.processWatermark(new Watermark(999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 999));
expectedOutput.add(new Watermark(999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.processWatermark(new Watermark(1999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 1999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 1999));
expectedOutput.add(new Watermark(1999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.processWatermark(new Watermark(2999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 2999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
expectedOutput.add(new Watermark(2999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
// do a snapshot, close and restore again
OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
testHarness.close();
expectedOutput.clear();
testHarness = createTestHarness(operator);
testHarness.setup();
testHarness.initializeState(snapshot);
testHarness.open();
testHarness.processWatermark(new Watermark(3999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), 3999));
expectedOutput.add(new Watermark(3999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.processWatermark(new Watermark(4999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 4999));
expectedOutput.add(new Watermark(4999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.processWatermark(new Watermark(5999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 5999));
expectedOutput.add(new Watermark(5999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
// those don't have any effect...
testHarness.processWatermark(new Watermark(6999));
testHarness.processWatermark(new Watermark(7999));
expectedOutput.add(new Watermark(6999));
expectedOutput.add(new Watermark(7999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.close();
}
/**
* This tests whether merging works correctly with the CountTrigger.
*/
@Test
public void testSessionWindowsWithCountTrigger() throws Exception {
closeCalled.set(0);
final int sessionSize = 3;
ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new SessionWindowFunction()),
PurgingTrigger.of(CountTrigger.of(4)),
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
createTestHarness(operator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// add elements out-of-order
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 3500));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
// do a snapshot, close and restore again
OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
testHarness.close();
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-10", 0L, 6500L), 6499));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
expectedOutput.clear();
testHarness = createTestHarness(operator);
testHarness.setup();
testHarness.initializeState(snapshot);
testHarness.open();
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 6000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 6500));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 7000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
// add an element that merges the two "key1" sessions, they should now have count 6, and therefore fire
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 10), 4500));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-22", 10L, 10000L), 9999L));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
testHarness.close();
}
/**
* This tests whether merging works correctly with the ContinuousEventTimeTrigger.
*/
@Test
public void testSessionWindowsWithContinuousEventTimeTrigger() throws Exception {
closeCalled.set(0);
final int sessionSize = 3;
ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new SessionWindowFunction()),
ContinuousEventTimeTrigger.of(Time.seconds(2)),
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
createTestHarness(operator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// add elements out-of-order and first trigger time is 2000
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 1500));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
// triggers emit and next trigger time is 4000
testHarness.processWatermark(new Watermark(2500));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-1", 1500L, 4500L), 4499));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499));
expectedOutput.add(new Watermark(2500));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 4000));
testHarness.processWatermark(new Watermark(3000));
expectedOutput.add(new Watermark(3000));
// do a snapshot, close and restore again
OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
testHarness.close();
expectedOutput.clear();
testHarness = createTestHarness(operator);
testHarness.setup();
testHarness.initializeState(snapshot);
testHarness.open();
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 4000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 3500));
// triggers emit and next trigger time is 6000
testHarness.processWatermark(new Watermark(4000));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 1500L, 7000L), 6999));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-15", 0L, 7000L), 6999));
expectedOutput.add(new Watermark(4000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
testHarness.close();
}
@Test
public void testSlidingCountWindow() throws Exception {
closeCalled.set(0);
final int windowSize = 5;
final int windowSlide = 3;
LogicalType[] windowTypes = new LogicalType[] { new BigIntType() };
WindowOperator operator = WindowOperatorBuilder
.builder()
.withInputFields(inputFieldTypes)
.countWindow(windowSize, windowSlide)
.aggregateAndBuild(getCountWindowAggFunction(), equaliser, accTypes, aggResultTypes, windowTypes);
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(operator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
testHarness.processElement(insertRecord("key2", 1, 0L));
testHarness.processElement(insertRecord("key2", 2, 1000L));
testHarness.processElement(insertRecord("key2", 3, 2500L));
testHarness.processElement(insertRecord("key2", 4, 2500L));
testHarness.processElement(insertRecord("key2", 5, 2500L));
testHarness.processElement(insertRecord("key1", 1, 10L));
testHarness.processElement(insertRecord("key1", 2, 1000L));
testHarness.processWatermark(new Watermark(12000));
testHarness.setProcessingTime(12000L);
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key2", 15L, 5L, 0L)));
expectedOutput.add(new Watermark(12000));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
// do a snapshot, close and restore again
OperatorSubtaskState snapshotV2 = testHarness.snapshot(0L, 0);
testHarness.close();
expectedOutput.clear();
testHarness = createTestHarness(operator);
testHarness.setup();
testHarness.initializeState(snapshotV2);
testHarness.open();
testHarness.processElement(insertRecord("key1", 3, 2500L));
testHarness.processElement(insertRecord("key1", 4, 2500L));
testHarness.processElement(insertRecord("key1", 5, 2500L));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key1", 15L, 5L, 0L)));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processElement(insertRecord("key2", 6, 6000L));
testHarness.processElement(insertRecord("key2", 7, 6000L));
testHarness.processElement(insertRecord("key2", 8, 6050L));
testHarness.processElement(insertRecord("key2", 9, 6050L));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key2", 30L, 5L, 1L)));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processElement(insertRecord("key1", 6, 4000L));
testHarness.processElement(insertRecord("key1", 7, 4000L));
testHarness.processElement(insertRecord("key1", 8, 4000L));
testHarness.processElement(insertRecord("key2", 10, 15000L));
testHarness.processElement(insertRecord("key2", 11, 15000L));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key1", 30L, 5L, 1L)));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key2", 45L, 5L, 2L)));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.close();
// we close once in the rest...
assertEquals("Close was not called.", 2, closeCalled.get());
}
@Test(timeout = 60000)
public void testConnectionsArePooledAsyncCreate() throws Exception {
final JmsPoolConnectionFactory cf = createPooledConnectionFactory();
cf.setMaxConnections(1);
final ConcurrentLinkedQueue<JmsPoolConnection> connections = new ConcurrentLinkedQueue<JmsPoolConnection>();
try {
final JmsPoolConnection primary = (JmsPoolConnection) cf.createConnection();
final ExecutorService executor = Executors.newFixedThreadPool(10);
final int numConnections = 100;
for (int i = 0; i < numConnections; ++i) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
connections.add((JmsPoolConnection) cf.createConnection());
} catch (JMSException e) {
}
}
});
}
assertTrue("All connections should have been created.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return connections.size() == numConnections;
}
}, TimeUnit.SECONDS.toMillis(10), TimeUnit.MILLISECONDS.toMillis(50)));
executor.shutdown();
assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
for (JmsPoolConnection connection : connections) {
assertSame(primary.getConnection(), connection.getConnection());
}
connections.clear();
} finally {
cf.stop();
}
}
@Test
public void testEventTimeSlidingWindows() throws Exception {
closeCalled.set(0);
WindowOperator operator = WindowOperatorBuilder
.builder()
.withInputFields(inputFieldTypes)
.sliding(Duration.ofSeconds(3), Duration.ofSeconds(1))
.withEventTime(2)
.aggregateAndBuild(getTimeWindowAggFunction(), equaliser, accTypes, aggResultTypes, windowTypes);
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(operator);
testHarness.open();
// process elements
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
// add elements out-of-order
testHarness.processElement(insertRecord("key2", 1, 3999L));
testHarness.processElement(insertRecord("key2", 1, 3000L));
testHarness.processElement(insertRecord("key1", 1, 20L));
testHarness.processElement(insertRecord("key1", 1, 0L));
testHarness.processElement(insertRecord("key1", 1, 999L));
testHarness.processElement(insertRecord("key2", 1, 1998L));
testHarness.processElement(insertRecord("key2", 1, 1999L));
testHarness.processElement(insertRecord("key2", 1, 1000L));
testHarness.processWatermark(new Watermark(999));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key1", 3L, 3L, -2000L, 1000L, 999L)));
expectedOutput.add(new Watermark(999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processWatermark(new Watermark(1999));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key1", 3L, 3L, -1000L, 2000L, 1999L)));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key2", 3L, 3L, -1000L, 2000L, 1999L)));
expectedOutput.add(new Watermark(1999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processWatermark(new Watermark(2999));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key1", 3L, 3L, 0L, 3000L, 2999L)));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key2", 3L, 3L, 0L, 3000L, 2999L)));
expectedOutput.add(new Watermark(2999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
// do a snapshot, close and restore again
OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
testHarness.close();
expectedOutput.clear();
testHarness = createTestHarness(operator);
testHarness.setup();
testHarness.initializeState(snapshot);
testHarness.open();
testHarness.processWatermark(new Watermark(3999));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key2", 5L, 5L, 1000L, 4000L, 3999L)));
expectedOutput.add(new Watermark(3999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processWatermark(new Watermark(4999));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key2", 2L, 2L, 2000L, 5000L, 4999L)));
expectedOutput.add(new Watermark(4999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processWatermark(new Watermark(5999));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key2", 2L, 2L, 3000L, 6000L, 5999L)));
expectedOutput.add(new Watermark(5999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
// those don't have any effect...
testHarness.processWatermark(new Watermark(6999));
testHarness.processWatermark(new Watermark(7999));
expectedOutput.add(new Watermark(6999));
expectedOutput.add(new Watermark(7999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.close();
// we close once in the rest...
assertEquals("Close was not called.", 2, closeCalled.get());
}