下面列出了java.util.concurrent.ConcurrentLinkedQueue#addAll ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public ConcurrentLinkedQueue<Runnable> snapshotQueuedAndExecutingTasks() {
ConcurrentLinkedQueue<Runnable> snapshot = new ConcurrentLinkedQueue<>();
activeSnapshots.put( Thread.currentThread(), snapshot );
snapshot.addAll(queuedAndExecutingTasks);
// There is inconsistency between queuedAndExecutingTasks and snapshot
// taken here. If there are a large number of tasks, by the time we
// iterate queuedAndExecutingTasks and get all the tasks into the snapshot
// queue, some tasks complete and are removed from
// queuedAndExecutingTasks. So, iterate over the snapshot again and remove
// those completed tasks so that they both are consistent.
for (Runnable task : snapshot) {
if ( !(queuedAndExecutingTasks.contains(task)) )
snapshot.remove(task);
}
return snapshot;
}
@Override
public void run() {
while(true) {
final ConcurrentLinkedQueue<String> stringsToLog = new ConcurrentLinkedQueue<>();
synchronized (logsToFile) {
if (logsToFile.isEmpty()) {
try {
logsToFile.wait();
} catch (InterruptedException e) {
return;
}
if (logsToFile.isEmpty()) {
continue;
}
}
stringsToLog.addAll(logsToFile);
logsToFile.clear();
}
stringsToLog.stream().forEach(logginFile::println);
logginFile.flush();
}
}
/**
* addAll of a collection with null elements throws NPE
*/
public void testAddAll2() {
ConcurrentLinkedQueue q = new ConcurrentLinkedQueue();
try {
q.addAll(Arrays.asList(new Integer[SIZE]));
shouldThrow();
} catch (NullPointerException success) {}
}
/**
* addAll of a collection with any null elements throws NPE after
* possibly adding some elements
*/
public void testAddAll3() {
ConcurrentLinkedQueue q = new ConcurrentLinkedQueue();
Integer[] ints = new Integer[SIZE];
for (int i = 0; i < SIZE - 1; ++i)
ints[i] = new Integer(i);
try {
q.addAll(Arrays.asList(ints));
shouldThrow();
} catch (NullPointerException success) {}
}
/**
* addAll(this) throws IAE
*/
public void testAddAllSelf() {
ConcurrentLinkedQueue q = populatedQueue(SIZE);
try {
q.addAll(q);
shouldThrow();
} catch (IllegalArgumentException success) {}
}
/**
* addAll of a collection with null elements throws NPE
*/
public void testAddAll2() {
ConcurrentLinkedQueue q = new ConcurrentLinkedQueue();
try {
q.addAll(Arrays.asList(new Integer[SIZE]));
shouldThrow();
} catch (NullPointerException success) {}
}
/**
* addAll of a collection with any null elements throws NPE after
* possibly adding some elements
*/
public void testAddAll3() {
ConcurrentLinkedQueue q = new ConcurrentLinkedQueue();
Integer[] ints = new Integer[SIZE];
for (int i = 0; i < SIZE - 1; ++i)
ints[i] = new Integer(i);
try {
q.addAll(Arrays.asList(ints));
shouldThrow();
} catch (NullPointerException success) {}
}
/**
* addAll(this) throws IAE
*/
public void testAddAllSelf() {
ConcurrentLinkedQueue q = populatedQueue(SIZE);
try {
q.addAll(q);
shouldThrow();
} catch (IllegalArgumentException success) {}
}
@Test
public void testProcessingTimeSessionWindows() throws Throwable {
closeCalled.set(0);
WindowOperator operator = WindowOperatorBuilder
.builder()
.withInputFields(inputFieldTypes)
.session(Duration.ofSeconds(3))
.withProcessingTime()
.aggregateAndBuild(getTimeWindowAggFunction(), equaliser, accTypes, aggResultTypes, windowTypes);
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(operator);
RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(
outputType.getFieldTypes(), new GenericRowRecordSortComparator(0, new VarCharType(VarCharType.MAX_LENGTH)));
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// timestamp is ignored in processing time
testHarness.setProcessingTime(3);
testHarness.processElement(insertRecord("key2", 1, 1L));
testHarness.setProcessingTime(1000);
testHarness.processElement(insertRecord("key2", 1, 1002L));
testHarness.setProcessingTime(5000);
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key2", 2L, 2L, 3L, 4000L, 3999L)));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processElement(insertRecord("key2", 1, 5000L));
testHarness.processElement(insertRecord("key2", 1, 5000L));
testHarness.processElement(insertRecord("key1", 1, 5000L));
testHarness.processElement(insertRecord("key1", 1, 5000L));
testHarness.processElement(insertRecord("key1", 1, 5000L));
testHarness.setProcessingTime(10000);
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key2", 2L, 2L, 5000L, 8000L, 7999L)));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key1", 3L, 3L, 5000L, 8000L, 7999L)));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.close();
}
private void testAsyncTimeout(
LazyAsyncFunction lazyAsyncFunction,
Optional<Class<? extends Throwable>> expectedException,
StreamRecord<Integer>... expectedRecords) throws Exception {
final long timeout = 10L;
final OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
createTestHarness(lazyAsyncFunction, timeout, 2, AsyncDataStream.OutputMode.ORDERED);
final MockEnvironment mockEnvironment = testHarness.getEnvironment();
mockEnvironment.setExpectedExternalFailureCause(Throwable.class);
final long initialTime = 0L;
final ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
testHarness.setProcessingTime(initialTime);
synchronized (testHarness.getCheckpointLock()) {
testHarness.processElement(new StreamRecord<>(1, initialTime));
testHarness.setProcessingTime(initialTime + 5L);
testHarness.processElement(new StreamRecord<>(2, initialTime + 5L));
}
// trigger the timeout of the first stream record
testHarness.setProcessingTime(initialTime + timeout + 1L);
// allow the second async stream record to be processed
lazyAsyncFunction.countDown();
// wait until all async collectors in the buffer have been emitted out.
synchronized (testHarness.getCheckpointLock()) {
testHarness.endInput();
testHarness.close();
}
expectedOutput.addAll(Arrays.asList(expectedRecords));
TestHarnessUtil.assertOutputEquals("Output with watermark was not correct.", expectedOutput, testHarness.getOutput());
if (expectedException.isPresent()) {
assertTrue(mockEnvironment.getActualExternalFailureCause().isPresent());
assertTrue(ExceptionUtils.findThrowable(
mockEnvironment.getActualExternalFailureCause().get(),
expectedException.get()).isPresent());
}
}
@Test
public void testEventTimeSlidingWindows() throws Exception {
closeCalled.set(0);
WindowOperator operator = WindowOperatorBuilder
.builder()
.withInputFields(inputFieldTypes)
.sliding(Duration.ofSeconds(3), Duration.ofSeconds(1))
.withEventTime(2)
.aggregateAndBuild(getTimeWindowAggFunction(), equaliser, accTypes, aggResultTypes, windowTypes);
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(operator);
testHarness.open();
// process elements
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
// add elements out-of-order
testHarness.processElement(insertRecord("key2", 1, 3999L));
testHarness.processElement(insertRecord("key2", 1, 3000L));
testHarness.processElement(insertRecord("key1", 1, 20L));
testHarness.processElement(insertRecord("key1", 1, 0L));
testHarness.processElement(insertRecord("key1", 1, 999L));
testHarness.processElement(insertRecord("key2", 1, 1998L));
testHarness.processElement(insertRecord("key2", 1, 1999L));
testHarness.processElement(insertRecord("key2", 1, 1000L));
testHarness.processWatermark(new Watermark(999));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key1", 3L, 3L, -2000L, 1000L, 999L)));
expectedOutput.add(new Watermark(999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processWatermark(new Watermark(1999));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key1", 3L, 3L, -1000L, 2000L, 1999L)));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key2", 3L, 3L, -1000L, 2000L, 1999L)));
expectedOutput.add(new Watermark(1999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processWatermark(new Watermark(2999));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key1", 3L, 3L, 0L, 3000L, 2999L)));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key2", 3L, 3L, 0L, 3000L, 2999L)));
expectedOutput.add(new Watermark(2999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
// do a snapshot, close and restore again
OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
testHarness.close();
expectedOutput.clear();
testHarness = createTestHarness(operator);
testHarness.setup();
testHarness.initializeState(snapshot);
testHarness.open();
testHarness.processWatermark(new Watermark(3999));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key2", 5L, 5L, 1000L, 4000L, 3999L)));
expectedOutput.add(new Watermark(3999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processWatermark(new Watermark(4999));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key2", 2L, 2L, 2000L, 5000L, 4999L)));
expectedOutput.add(new Watermark(4999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processWatermark(new Watermark(5999));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key2", 2L, 2L, 3000L, 6000L, 5999L)));
expectedOutput.add(new Watermark(5999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
// those don't have any effect...
testHarness.processWatermark(new Watermark(6999));
testHarness.processWatermark(new Watermark(7999));
expectedOutput.add(new Watermark(6999));
expectedOutput.add(new Watermark(7999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.close();
// we close once in the rest...
assertEquals("Close was not called.", 2, closeCalled.get());
}
@Test
@SuppressWarnings("unchecked")
public void testEventTimeTumblingWindows() throws Exception {
closeCalled.set(0);
WindowOperator operator = WindowOperatorBuilder
.builder()
.withInputFields(inputFieldTypes)
.tumble(Duration.ofSeconds(3))
.withEventTime(2)
.aggregateAndBuild(getTimeWindowAggFunction(), equaliser, accTypes, aggResultTypes, windowTypes);
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(operator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// add elements out-of-order
testHarness.processElement(insertRecord("key2", 1, 3999L));
testHarness.processElement(insertRecord("key2", 1, 3000L));
testHarness.processElement(insertRecord("key1", 1, 20L));
testHarness.processElement(insertRecord("key1", 1, 0L));
testHarness.processElement(insertRecord("key1", 1, 999L));
testHarness.processElement(insertRecord("key2", 1, 1998L));
testHarness.processElement(insertRecord("key2", 1, 1999L));
testHarness.processElement(insertRecord("key2", 1, 1000L));
testHarness.processWatermark(new Watermark(999));
expectedOutput.add(new Watermark(999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processWatermark(new Watermark(1999));
expectedOutput.add(new Watermark(1999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
// do a snapshot, close and restore again
OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
testHarness.close();
expectedOutput.clear();
testHarness = createTestHarness(operator);
testHarness.setup();
testHarness.initializeState(snapshot);
testHarness.open();
testHarness.processWatermark(new Watermark(2999));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key1", 3L, 3L, 0L, 3000L, 2999L)));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key2", 3L, 3L, 0L, 3000L, 2999L)));
expectedOutput.add(new Watermark(2999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processWatermark(new Watermark(3999));
expectedOutput.add(new Watermark(3999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processWatermark(new Watermark(4999));
expectedOutput.add(new Watermark(4999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processWatermark(new Watermark(5999));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key2", 2L, 2L, 3000L, 6000L, 5999L)));
expectedOutput.add(new Watermark(5999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
// those don't have any effect...
testHarness.processWatermark(new Watermark(6999));
testHarness.processWatermark(new Watermark(7999));
expectedOutput.add(new Watermark(6999));
expectedOutput.add(new Watermark(7999));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.close();
// we close once in the rest...
assertEquals("Close was not called.", 2, closeCalled.get());
}
@Test
@SuppressWarnings("unchecked")
public void testProcessingTimeTumblingWindows() throws Exception {
closeCalled.set(0);
WindowOperator operator = WindowOperatorBuilder.builder()
.withInputFields(inputFieldTypes)
.tumble(Duration.ofSeconds(3))
.withProcessingTime()
.aggregateAndBuild(getTimeWindowAggFunction(), equaliser, accTypes, aggResultTypes, windowTypes);
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(operator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
testHarness.setProcessingTime(3);
// timestamp is ignored in processing time
testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE));
testHarness.processElement(insertRecord("key2", 1, 7000L));
testHarness.processElement(insertRecord("key2", 1, 7000L));
testHarness.processElement(insertRecord("key1", 1, 7000L));
testHarness.processElement(insertRecord("key1", 1, 7000L));
testHarness.setProcessingTime(5000);
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key2", 3L, 3L, 0L, 3000L, 2999L)));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key1", 2L, 2L, 0L, 3000L, 2999L)));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processElement(insertRecord("key1", 1, 7000L));
testHarness.processElement(insertRecord("key1", 1, 7000L));
testHarness.processElement(insertRecord("key1", 1, 7000L));
testHarness.setProcessingTime(7000);
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key1", 3L, 3L, 3000L, 6000L, 5999L)));
assertEquals(0L, operator.getWatermarkLatency().getValue());
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.close();
}
/**
* This tests a custom Session window assigner that assigns some elements to "point windows",
* windows that have the same timestamp for start and end.
*
* <p>In this test, elements that have 33 as the second tuple field will be put into a point
* window.
*/
@Test
@SuppressWarnings("unchecked")
public void testPointSessions() throws Exception {
closeCalled.set(0);
WindowOperator operator = WindowOperatorBuilder
.builder()
.withInputFields(inputFieldTypes)
.assigner(new PointSessionWindowAssigner(3000))
.withEventTime(2)
.aggregateAndBuild(getTimeWindowAggFunction(), equaliser, accTypes, aggResultTypes, windowTypes);
OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = createTestHarness(operator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// add elements out-of-order
testHarness.processElement(record("key2", 1, 0L));
testHarness.processElement(record("key2", 33, 1000L));
// do a snapshot, close and restore again
OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
testHarness.close();
testHarness = createTestHarness(operator);
testHarness.setup();
testHarness.initializeState(snapshot);
testHarness.open();
testHarness.processElement(record("key2", 33, 2500L));
testHarness.processElement(record("key1", 1, 10L));
testHarness.processElement(record("key1", 2, 1000L));
testHarness.processElement(record("key1", 33, 2500L));
testHarness.processWatermark(new Watermark(12000));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key1", 36L, 3L, 10L, 4000L, 3999L)));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key2", 67L, 3L, 0L, 3000L, 2999L)));
expectedOutput.add(new Watermark(12000));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.close();
// we close once in the rest...
assertEquals("Close was not called.", 2, closeCalled.get());
}
@Test
public void testTumblingCountWindow() throws Exception {
closeCalled.set(0);
final int windowSize = 3;
LogicalType[] windowTypes = new LogicalType[] { new BigIntType() };
WindowOperator operator = WindowOperatorBuilder
.builder()
.withInputFields(inputFieldTypes)
.countWindow(windowSize)
.aggregateAndBuild(getCountWindowAggFunction(), equaliser, accTypes, aggResultTypes, windowTypes);
OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = createTestHarness(operator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
testHarness.processElement(record("key2", 1, 0L));
testHarness.processElement(record("key2", 2, 1000L));
testHarness.processElement(record("key2", 3, 2500L));
testHarness.processElement(record("key1", 1, 10L));
testHarness.processElement(record("key1", 2, 1000L));
testHarness.processWatermark(new Watermark(12000));
testHarness.setProcessingTime(12000L);
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key2", 6L, 3L, 0L)));
expectedOutput.add(new Watermark(12000));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
// do a snapshot, close and restore again
OperatorSubtaskState snapshotV2 = testHarness.snapshot(0L, 0);
testHarness.close();
expectedOutput.clear();
testHarness = createTestHarness(operator);
testHarness.setup();
testHarness.initializeState(snapshotV2);
testHarness.open();
testHarness.processElement(record("key1", 2, 2500L));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key1", 5L, 3L, 0L)));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processElement(record("key2", 4, 5501L));
testHarness.processElement(record("key2", 5, 6000L));
testHarness.processElement(record("key2", 5, 6000L));
testHarness.processElement(record("key2", 6, 6050L));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key2", 14L, 3L, 1L)));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processElement(record("key1", 3, 4000L));
testHarness.processElement(record("key2", 10, 15000L));
testHarness.processElement(record("key2", 20, 15000L));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key2", 36L, 3L, 2L)));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processElement(record("key1", 2, 2500L));
testHarness.processElement(record("key1", 2, 2500L));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key1", 7L, 3L, 1L)));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.close();
// we close once in the rest...
assertEquals("Close was not called.", 2, closeCalled.get());
}
@Test
public void testSlidingCountWindow() throws Exception {
closeCalled.set(0);
final int windowSize = 5;
final int windowSlide = 3;
LogicalType[] windowTypes = new LogicalType[] { new BigIntType() };
WindowOperator operator = WindowOperatorBuilder
.builder()
.withInputFields(inputFieldTypes)
.countWindow(windowSize, windowSlide)
.aggregateAndBuild(getCountWindowAggFunction(), equaliser, accTypes, aggResultTypes, windowTypes);
OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = createTestHarness(operator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
testHarness.processElement(record("key2", 1, 0L));
testHarness.processElement(record("key2", 2, 1000L));
testHarness.processElement(record("key2", 3, 2500L));
testHarness.processElement(record("key2", 4, 2500L));
testHarness.processElement(record("key2", 5, 2500L));
testHarness.processElement(record("key1", 1, 10L));
testHarness.processElement(record("key1", 2, 1000L));
testHarness.processWatermark(new Watermark(12000));
testHarness.setProcessingTime(12000L);
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key2", 15L, 5L, 0L)));
expectedOutput.add(new Watermark(12000));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
// do a snapshot, close and restore again
OperatorSubtaskState snapshotV2 = testHarness.snapshot(0L, 0);
testHarness.close();
expectedOutput.clear();
testHarness = createTestHarness(operator);
testHarness.setup();
testHarness.initializeState(snapshotV2);
testHarness.open();
testHarness.processElement(record("key1", 3, 2500L));
testHarness.processElement(record("key1", 4, 2500L));
testHarness.processElement(record("key1", 5, 2500L));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key1", 15L, 5L, 0L)));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processElement(record("key2", 6, 6000L));
testHarness.processElement(record("key2", 7, 6000L));
testHarness.processElement(record("key2", 8, 6050L));
testHarness.processElement(record("key2", 9, 6050L));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key2", 30L, 5L, 1L)));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processElement(record("key1", 6, 4000L));
testHarness.processElement(record("key1", 7, 4000L));
testHarness.processElement(record("key1", 8, 4000L));
testHarness.processElement(record("key2", 10, 15000L));
testHarness.processElement(record("key2", 11, 15000L));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key1", 30L, 5L, 1L)));
expectedOutput.addAll(doubleRecord(isTableAggregate, record("key2", 45L, 5L, 2L)));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.close();
// we close once in the rest...
assertEquals("Close was not called.", 2, closeCalled.get());
}
@Override
// TODO: [BEAM-4563] Pass Future back to consumer to check for async errors
@SuppressWarnings("FutureReturnValueIgnored")
public void start(DirectGraph graph, RootProviderRegistry rootProviderRegistry) {
int numTargetSplits = Math.max(3, targetParallelism);
ImmutableMap.Builder<AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<CommittedBundle<?>>>
pendingRootBundles = ImmutableMap.builder();
for (AppliedPTransform<?, ?, ?> root : graph.getRootTransforms()) {
ConcurrentLinkedQueue<CommittedBundle<?>> pending = new ConcurrentLinkedQueue<>();
try {
Collection<CommittedBundle<?>> initialInputs =
rootProviderRegistry.getInitialInputs(root, numTargetSplits);
pending.addAll(initialInputs);
} catch (Exception e) {
throw UserCodeException.wrap(e);
}
pendingRootBundles.put(root, pending);
}
evaluationContext.initialize(pendingRootBundles.build());
final ExecutionDriver executionDriver =
QuiescenceDriver.create(
evaluationContext, graph, this, visibleUpdates, pendingRootBundles.build());
executorService.submit(
new Runnable() {
@Override
public void run() {
DriverState drive = executionDriver.drive();
if (drive.isTermainal()) {
State newPipelineState = State.UNKNOWN;
switch (drive) {
case FAILED:
newPipelineState = State.FAILED;
break;
case SHUTDOWN:
newPipelineState = State.DONE;
break;
case CONTINUE:
throw new IllegalStateException(
String.format("%s should not be a terminal state", DriverState.CONTINUE));
default:
throw new IllegalArgumentException(
String.format("Unknown %s %s", DriverState.class.getSimpleName(), drive));
}
shutdownIfNecessary(newPipelineState);
} else {
executorService.submit(this);
}
}
});
}
@Test
public void testSlidingCountWindow() throws Exception {
closeCalled.set(0);
final int windowSize = 5;
final int windowSlide = 3;
LogicalType[] windowTypes = new LogicalType[] { new BigIntType() };
WindowOperator operator = WindowOperatorBuilder
.builder()
.withInputFields(inputFieldTypes)
.countWindow(windowSize, windowSlide)
.aggregateAndBuild(getCountWindowAggFunction(), equaliser, accTypes, aggResultTypes, windowTypes);
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(operator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
testHarness.processElement(insertRecord("key2", 1, 0L));
testHarness.processElement(insertRecord("key2", 2, 1000L));
testHarness.processElement(insertRecord("key2", 3, 2500L));
testHarness.processElement(insertRecord("key2", 4, 2500L));
testHarness.processElement(insertRecord("key2", 5, 2500L));
testHarness.processElement(insertRecord("key1", 1, 10L));
testHarness.processElement(insertRecord("key1", 2, 1000L));
testHarness.processWatermark(new Watermark(12000));
testHarness.setProcessingTime(12000L);
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key2", 15L, 5L, 0L)));
expectedOutput.add(new Watermark(12000));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
// do a snapshot, close and restore again
OperatorSubtaskState snapshotV2 = testHarness.snapshot(0L, 0);
testHarness.close();
expectedOutput.clear();
testHarness = createTestHarness(operator);
testHarness.setup();
testHarness.initializeState(snapshotV2);
testHarness.open();
testHarness.processElement(insertRecord("key1", 3, 2500L));
testHarness.processElement(insertRecord("key1", 4, 2500L));
testHarness.processElement(insertRecord("key1", 5, 2500L));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key1", 15L, 5L, 0L)));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processElement(insertRecord("key2", 6, 6000L));
testHarness.processElement(insertRecord("key2", 7, 6000L));
testHarness.processElement(insertRecord("key2", 8, 6050L));
testHarness.processElement(insertRecord("key2", 9, 6050L));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key2", 30L, 5L, 1L)));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processElement(insertRecord("key1", 6, 4000L));
testHarness.processElement(insertRecord("key1", 7, 4000L));
testHarness.processElement(insertRecord("key1", 8, 4000L));
testHarness.processElement(insertRecord("key2", 10, 15000L));
testHarness.processElement(insertRecord("key2", 11, 15000L));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key1", 30L, 5L, 1L)));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key2", 45L, 5L, 2L)));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.close();
// we close once in the rest...
assertEquals("Close was not called.", 2, closeCalled.get());
}
protected void sortTransferenceQueue(ConcurrentLinkedQueue<Transference> queue) {
synchronized (_transference_queue_sort_lock) {
ArrayList<Transference> trans_list = new ArrayList(queue);
trans_list.sort((Transference o1, Transference o2) -> MiscTools.naturalCompare(o1.getFile_name(), o2.getFile_name(), true));
queue.clear();
queue.addAll(trans_list);
}
}
@Test
public void testProcessingTimeSlidingWindows() throws Throwable {
closeCalled.set(0);
WindowOperator operator = WindowOperatorBuilder
.builder()
.withInputFields(inputFieldTypes)
.sliding(Duration.ofSeconds(3), Duration.ofSeconds(1))
.withProcessingTime()
.aggregateAndBuild(getTimeWindowAggFunction(), equaliser, accTypes, aggResultTypes, windowTypes);
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(operator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// timestamp is ignored in processing time
testHarness.setProcessingTime(3);
testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE));
testHarness.setProcessingTime(1000);
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key2", 1L, 1L, -2000L, 1000L, 999L)));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE));
testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE));
testHarness.setProcessingTime(2000);
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key2", 3L, 3L, -1000L, 2000L, 1999L)));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE));
testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE));
testHarness.setProcessingTime(3000);
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key2", 3L, 3L, 0L, 3000L, 2999L)));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key1", 2L, 2L, 0L, 3000L, 2999L)));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE));
testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE));
testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE));
testHarness.setProcessingTime(7000);
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key2", 2L, 2L, 1000L, 4000L, 3999L)));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key1", 5L, 5L, 1000L, 4000L, 3999L)));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key1", 5L, 5L, 2000L, 5000L, 4999L)));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key1", 3L, 3L, 3000L, 6000L, 5999L)));
assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.close();
}