java.util.concurrent.ConcurrentLinkedQueue#add ( )源码实例Demo

下面列出了java.util.concurrent.ConcurrentLinkedQueue#add ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: openjdk-jdk8u-backup   文件: RemoveLeak.java
public static void main(String[] args) {
    int i = 0;
    // Without bug fix, OutOfMemoryError was observed at iteration 65120
    int iterations = 10 * 65120;
    try {
        ConcurrentLinkedQueue<Long> queue = new ConcurrentLinkedQueue<>();
        queue.add(0L);
        while (i++ < iterations) {
            queue.add(1L);
            queue.remove(1L);
        }
    } catch (Error t) {
        System.err.printf("failed at iteration %d/%d%n", i, iterations);
        throw t;
    }
}
 
private void processRecords(OneInputStreamTaskTestHarness<String, String> testHarness) throws Exception {
	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	testHarness.processElement(new StreamRecord<>("10"), 0, 0);
	testHarness.processElement(new StreamRecord<>("20"), 0, 0);
	testHarness.processElement(new StreamRecord<>("30"), 0, 0);

	testHarness.waitForInputProcessing();

	expectedOutput.add(new StreamRecord<>("10"));
	expectedOutput.add(new StreamRecord<>("20"));
	expectedOutput.add(new StreamRecord<>("30"));
	TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
}
 
源代码3 项目: flink   文件: KeyedCoProcessOperatorTest.java
@Test
public void testEventTimeTimers() throws Exception {

	KeyedCoProcessOperator<String, Integer, String, String> operator =
			new KeyedCoProcessOperator<>(new EventTimeTriggeringProcessFunction());

	TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
			new KeyedTwoInputStreamOperatorTestHarness<>(
					operator,
					new IntToStringKeySelector<>(),
					new IdentityKeySelector<String>(),
					BasicTypeInfo.STRING_TYPE_INFO);

	testHarness.setup();
	testHarness.open();

	testHarness.processElement1(new StreamRecord<>(17, 42L));
	testHarness.processElement2(new StreamRecord<>("18", 42L));

	testHarness.processWatermark1(new Watermark(5));
	testHarness.processWatermark2(new Watermark(5));

	testHarness.processWatermark1(new Watermark(6));
	testHarness.processWatermark2(new Watermark(6));

	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	expectedOutput.add(new StreamRecord<>("INPUT1:17", 42L));
	expectedOutput.add(new StreamRecord<>("INPUT2:18", 42L));
	expectedOutput.add(new StreamRecord<>("17:1777", 5L));
	expectedOutput.add(new Watermark(5L));
	expectedOutput.add(new StreamRecord<>("18:1777", 6L));
	expectedOutput.add(new Watermark(6L));

	TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());

	testHarness.close();
}
 
源代码4 项目: flink   文件: KeyedProcessOperatorTest.java
/**
 * This also verifies that the timestamps ouf side-emitted records is correct.
 */
@Test
public void testSideOutput() throws Exception {
	KeyedProcessOperator<Integer, Integer, String> operator = new KeyedProcessOperator<>(new SideOutputProcessFunction());

	OneInputStreamOperatorTestHarness<Integer, String> testHarness =
		new KeyedOneInputStreamOperatorTestHarness<>(
			operator, new IdentityKeySelector<>(), BasicTypeInfo.INT_TYPE_INFO);

	testHarness.setup();
	testHarness.open();

	testHarness.processElement(new StreamRecord<>(42, 17L /* timestamp */));

	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	expectedOutput.add(new StreamRecord<>("IN:42", 17L /* timestamp */));

	TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());

	ConcurrentLinkedQueue<StreamRecord<Integer>> expectedIntSideOutput = new ConcurrentLinkedQueue<>();
	expectedIntSideOutput.add(new StreamRecord<>(42, 17L /* timestamp */));
	ConcurrentLinkedQueue<StreamRecord<Integer>> intSideOutput =
		testHarness.getSideOutput(SideOutputProcessFunction.INTEGER_OUTPUT_TAG);
	TestHarnessUtil.assertOutputEquals(
		"Side output was not correct.",
		expectedIntSideOutput,
		intSideOutput);

	ConcurrentLinkedQueue<StreamRecord<Long>> expectedLongSideOutput = new ConcurrentLinkedQueue<>();
	expectedLongSideOutput.add(new StreamRecord<>(42L, 17L /* timestamp */));
	ConcurrentLinkedQueue<StreamRecord<Long>> longSideOutput =
		testHarness.getSideOutput(SideOutputProcessFunction.LONG_OUTPUT_TAG);
	TestHarnessUtil.assertOutputEquals(
		"Side output was not correct.",
		expectedLongSideOutput,
		longSideOutput);

	testHarness.close();
}
 
源代码5 项目: Flink-CEPplus   文件: StreamFilterTest.java
@Test
@SuppressWarnings("unchecked")
public void testFilter() throws Exception {
	StreamFilter<Integer> operator = new StreamFilter<Integer>(new MyFilter());

	OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);

	long initialTime = 0L;
	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();

	testHarness.open();

	testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));
	testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 2));
	testHarness.processWatermark(new Watermark(initialTime + 2));
	testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 3));
	testHarness.processElement(new StreamRecord<Integer>(4, initialTime + 4));
	testHarness.processElement(new StreamRecord<Integer>(5, initialTime + 5));
	testHarness.processElement(new StreamRecord<Integer>(6, initialTime + 6));
	testHarness.processElement(new StreamRecord<Integer>(7, initialTime + 7));

	expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 2));
	expectedOutput.add(new Watermark(initialTime + 2));
	expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 4));
	expectedOutput.add(new StreamRecord<Integer>(6, initialTime + 6));

	TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
}
 
源代码6 项目: openjdk-jdk9   文件: ConcurrentLinkedQueueTest.java
/**
 * containsAll(c) is true when c contains a subset of elements
 */
public void testContainsAll() {
    ConcurrentLinkedQueue q = populatedQueue(SIZE);
    ConcurrentLinkedQueue p = new ConcurrentLinkedQueue();
    for (int i = 0; i < SIZE; ++i) {
        assertTrue(q.containsAll(p));
        assertFalse(p.containsAll(q));
        p.add(new Integer(i));
    }
    assertTrue(p.containsAll(q));
}
 
源代码7 项目: flink   文件: LegacyKeyedCoProcessOperatorTest.java
@Test
public void testTimestampAndWatermarkQuerying() throws Exception {

	LegacyKeyedCoProcessOperator<String, Integer, String, String> operator =
			new LegacyKeyedCoProcessOperator<>(new WatermarkQueryingProcessFunction());

	TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
			new KeyedTwoInputStreamOperatorTestHarness<>(
					operator,
					new IntToStringKeySelector<>(),
					new IdentityKeySelector<String>(),
					BasicTypeInfo.STRING_TYPE_INFO);

	testHarness.setup();
	testHarness.open();

	testHarness.processWatermark1(new Watermark(17));
	testHarness.processWatermark2(new Watermark(17));
	testHarness.processElement1(new StreamRecord<>(5, 12L));

	testHarness.processWatermark1(new Watermark(42));
	testHarness.processWatermark2(new Watermark(42));
	testHarness.processElement2(new StreamRecord<>("6", 13L));

	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	expectedOutput.add(new Watermark(17L));
	expectedOutput.add(new StreamRecord<>("5WM:17 TS:12", 12L));
	expectedOutput.add(new Watermark(42L));
	expectedOutput.add(new StreamRecord<>("6WM:42 TS:13", 13L));

	TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());

	testHarness.close();
}
 
源代码8 项目: flink   文件: WindowOperatorTest.java
@Test
public void testCleanupTimeOverflow() throws Exception {
	long windowSize = 1000;
	long lateness = 2000;
	WindowOperator operator = WindowOperatorBuilder
			.builder()
			.withInputFields(inputFieldTypes)
			.tumble(Duration.ofMillis(windowSize))
			.withEventTime(2)
			.withAllowedLateness(Duration.ofMillis(lateness))
			.withSendRetraction()
			.aggregateAndBuild(new SumAndCountAggTimeWindow(), equaliser, accTypes, aggResultTypes, windowTypes);

	OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness =
			new KeyedOneInputStreamOperatorTestHarness<BaseRow, BaseRow, BaseRow>(
					operator, keySelector, keyType);

	testHarness.open();

	ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();

	WindowAssigner<TimeWindow> windowAssigner = TumblingWindowAssigner.of(Duration.ofMillis(windowSize));
	long timestamp = Long.MAX_VALUE - 1750;
	Collection<TimeWindow> windows = windowAssigner.assignWindows(GenericRow.of(fromString("key2"), 1), timestamp);
	TimeWindow window = windows.iterator().next();

	testHarness.processElement(record("key2", 1, timestamp));

	// the garbage collection timer would wrap-around
	assertTrue(window.maxTimestamp() + lateness < window.maxTimestamp());

	// and it would prematurely fire with watermark (Long.MAX_VALUE - 1500)
	assertTrue(window.maxTimestamp() + lateness < Long.MAX_VALUE - 1500);

	// if we don't correctly prevent wrap-around in the garbage collection
	// timers this watermark will clean our window state for the just-added
	// element/window
	testHarness.processWatermark(new Watermark(Long.MAX_VALUE - 1500));

	// this watermark is before the end timestamp of our only window
	assertTrue(Long.MAX_VALUE - 1500 < window.maxTimestamp());
	assertTrue(window.maxTimestamp() < Long.MAX_VALUE);

	// push in a watermark that will trigger computation of our window
	testHarness.processWatermark(new Watermark(window.maxTimestamp()));

	expected.add(new Watermark(Long.MAX_VALUE - 1500));
	expected.add(record("key2", 1L, 1L, window.getStart(), window.getEnd(), window.maxTimestamp()));
	expected.add(new Watermark(window.maxTimestamp()));

	assertor.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput());
	testHarness.close();
}
 
源代码9 项目: flink   文件: WindowOperatorMigrationTest.java
/**
 * Manually run this to write binary snapshot data.
 */
@Ignore
@Test
public void writeReducingProcessingTimeWindowsSnapshot() throws Exception {
	final int windowSize = 3;

	ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
			new SumReducer<>(),
			STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));

	WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
			TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
			new TimeWindow.Serializer(),
			new TupleKeySelector<>(),
			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
			stateDesc,
			new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
			ProcessingTimeTrigger.create(),
			0,
			null /* late data output tag */);

	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector<>(), BasicTypeInfo.STRING_TYPE_INFO);

	testHarness.setup();
	testHarness.open();

	testHarness.setProcessingTime(10);
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1)));

	testHarness.setProcessingTime(3010);
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key3", 1)));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 1), 2999));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 1), 2999));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator<>());

	// do snapshot and save to file
	OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
	OperatorSnapshotUtil.writeStateHandle(
		snapshot,
		"src/test/resources/win-op-migration-test-reduce-processing-time-flink" + flinkGenerateSavepointVersion + "-snapshot");

	testHarness.close();

}
 
源代码10 项目: flink   文件: LegacyKeyedProcessOperatorTest.java
@Test
public void testProcessingTimeTimers() throws Exception {

	LegacyKeyedProcessOperator<Integer, Integer, Integer> operator =
			new LegacyKeyedProcessOperator<>(new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));

	OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
			new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);

	testHarness.setup();
	testHarness.open();

	testHarness.processElement(new StreamRecord<>(17));

	testHarness.setProcessingTime(5);

	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	expectedOutput.add(new StreamRecord<>(17));
	expectedOutput.add(new StreamRecord<>(1777));

	TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());

	testHarness.close();
}
 
源代码11 项目: flink   文件: WindowOperatorTest.java
@Test
public void testNotSideOutputDueToLatenessSessionWithLateness() throws Exception {
	// same as testSideOutputDueToLatenessSessionWithLateness() but with an accumulating trigger, i.e.
	// one that does not return FIRE_AND_PURGE when firing but just FIRE. The expected
	// results are therefore slightly different.

	final int gapSize = 3;
	final long lateness = 10;

	ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
		new SumReducer(),
		STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));

	WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Long, Long>, TimeWindow> operator =
		new WindowOperator<>(
			EventTimeSessionWindows.withGap(Time.seconds(gapSize)),
			new TimeWindow.Serializer(),
			new TupleKeySelector(),
			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
			stateDesc,
			new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
			EventTimeTrigger.create(),
			lateness,
			lateOutputTag /* late data output tag */);

	OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
		createTestHarness(operator);

	testHarness.open();

	ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
	testHarness.processWatermark(new Watermark(1999));

	expected.add(new Watermark(1999));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 2000));
	testHarness.processWatermark(new Watermark(4998));

	expected.add(new Watermark(4998));

	// this will not be sideoutput because the session we're adding two has maxTimestamp
	// after the current watermark
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 4500));

	// new session
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 8500));
	testHarness.processWatermark(new Watermark(7400));

	expected.add(new Watermark(7400));

	// this will merge the two sessions into one
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
	testHarness.processWatermark(new Watermark(11501));

	expected.add(new StreamRecord<>(new Tuple3<>("key2-5", 1000L, 11500L), 11499));
	expected.add(new Watermark(11501));

	// new session
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 11600));
	testHarness.processWatermark(new Watermark(14600));

	expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 11600L, 14600L), 14599));
	expected.add(new Watermark(14600));

	// because of the small allowed lateness and because the trigger is accumulating
	// this will be merged into the session (11600-14600) and therefore will not
	// be sideoutput as late
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));

	// adding ("key2", 1) extended the session to (10000-146000) for which
	// maxTimestamp <= currentWatermark. Therefore, we immediately get a firing
	// with the current version of EventTimeTrigger/EventTimeTriggerAccum
	expected.add(new StreamRecord<>(new Tuple3<>("key2-2", 10000L, 14600L), 14599));

	ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
	ConcurrentLinkedQueue<StreamRecord<Tuple2<String, Integer>>> sideActual = testHarness.getSideOutput(
			lateOutputTag);

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
	assertEquals(null, sideActual);

	testHarness.processWatermark(new Watermark(20000));

	expected.add(new StreamRecord<>(new Tuple3<>("key2-3", 10000L, 17500L), 17499));
	expected.add(new Watermark(20000));

	testHarness.processWatermark(new Watermark(100000));

	expected.add(new Watermark(100000));

	actual = testHarness.getOutput();
	sideActual = testHarness.getSideOutput(lateOutputTag);
	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
	assertEquals(null, sideActual);

	testHarness.close();
}
 
源代码12 项目: flink   文件: WindowOperatorTest.java
@Test
@SuppressWarnings("unchecked")
public void testEventTimeTumblingWindows() throws Exception {
	closeCalled.set(0);

	WindowOperator operator = WindowOperatorBuilder
			.builder()
			.withInputFields(inputFieldTypes)
			.tumble(Duration.ofSeconds(3))
			.withEventTime(2)
			.aggregateAndBuild(getTimeWindowAggFunction(), equaliser, accTypes, aggResultTypes, windowTypes);

	OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = createTestHarness(operator);

	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	testHarness.open();

	// add elements out-of-order
	testHarness.processElement(record("key2", 1, 3999L));
	testHarness.processElement(record("key2", 1, 3000L));

	testHarness.processElement(record("key1", 1, 20L));
	testHarness.processElement(record("key1", 1, 0L));
	testHarness.processElement(record("key1", 1, 999L));

	testHarness.processElement(record("key2", 1, 1998L));
	testHarness.processElement(record("key2", 1, 1999L));
	testHarness.processElement(record("key2", 1, 1000L));

	testHarness.processWatermark(new Watermark(999));
	expectedOutput.add(new Watermark(999));
	assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());

	testHarness.processWatermark(new Watermark(1999));
	expectedOutput.add(new Watermark(1999));
	assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());

	// do a snapshot, close and restore again
	OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
	testHarness.close();
	expectedOutput.clear();

	testHarness = createTestHarness(operator);
	testHarness.setup();
	testHarness.initializeState(snapshot);
	testHarness.open();

	testHarness.processWatermark(new Watermark(2999));
	expectedOutput.addAll(doubleRecord(isTableAggregate, record("key1", 3L, 3L, 0L, 3000L, 2999L)));
	expectedOutput.addAll(doubleRecord(isTableAggregate, record("key2", 3L, 3L, 0L, 3000L, 2999L)));
	expectedOutput.add(new Watermark(2999));
	assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());

	testHarness.processWatermark(new Watermark(3999));
	expectedOutput.add(new Watermark(3999));
	assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());

	testHarness.processWatermark(new Watermark(4999));
	expectedOutput.add(new Watermark(4999));
	assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());

	testHarness.processWatermark(new Watermark(5999));
	expectedOutput.addAll(doubleRecord(isTableAggregate, record("key2", 2L, 2L, 3000L, 6000L, 5999L)));
	expectedOutput.add(new Watermark(5999));
	assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());

	// those don't have any effect...
	testHarness.processWatermark(new Watermark(6999));
	testHarness.processWatermark(new Watermark(7999));
	expectedOutput.add(new Watermark(6999));
	expectedOutput.add(new Watermark(7999));

	assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());

	testHarness.close();

	// we close once in the rest...
	assertEquals("Close was not called.", 2, closeCalled.get());
}
 
源代码13 项目: flink   文件: AsyncWaitOperatorTest.java
/**
 * Delay a while before async invocation to check whether end input waits for all elements finished or not.
 */
@Test
public void testEndInput() throws Exception {
	final AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>(
		new DelayedAsyncFunction(10),
		-1,
		2,
		AsyncDataStream.OutputMode.ORDERED);

	final OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
		new OneInputStreamOperatorTestHarness<>(operator, IntSerializer.INSTANCE);

	final long initialTime = 0L;
	final ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
	expectedOutput.add(new StreamRecord<>(2, initialTime + 1));
	expectedOutput.add(new StreamRecord<>(4, initialTime + 2));
	expectedOutput.add(new Watermark(initialTime + 2));
	expectedOutput.add(new StreamRecord<>(6, initialTime + 3));

	testHarness.open();

	try {
		synchronized (testHarness.getCheckpointLock()) {
			testHarness.processElement(new StreamRecord<>(1, initialTime + 1));
			testHarness.processElement(new StreamRecord<>(2, initialTime + 2));
			testHarness.processWatermark(new Watermark(initialTime + 2));
			testHarness.processElement(new StreamRecord<>(3, initialTime + 3));
		}

		// wait until all async collectors in the buffer have been emitted out.
		synchronized (testHarness.getCheckpointLock()) {
			testHarness.endInput();
		}

		TestHarnessUtil.assertOutputEquals("Output with watermark was not correct.", expectedOutput, testHarness.getOutput());
	} finally {
		synchronized (testHarness.getCheckpointLock()) {
			testHarness.close();
		}
	}
}
 
源代码14 项目: Flink-CEPplus   文件: WindowOperatorTest.java
@Test
public void testProcessingTimeTumblingWindows() throws Throwable {
	final int windowSize = 3;

	ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
			new SumReducer(),
			STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));

	WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
			TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
			new TimeWindow.Serializer(),
			new TupleKeySelector(),
			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
			stateDesc,
			new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
			ProcessingTimeTrigger.create(),
			0,
			null /* late data output tag */);

	OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
			createTestHarness(operator);

	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	testHarness.open();

	testHarness.setProcessingTime(3);

	// timestamp is ignored in processing time
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));

	testHarness.setProcessingTime(5000);

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));

	testHarness.setProcessingTime(7000);

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());

	testHarness.close();
}
 
源代码15 项目: flink   文件: WindowOperatorMigrationTest.java
@Test
public void testRestoreSessionWindowsWithCountTrigger() throws Exception {

	final int sessionSize = 3;

	ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
			STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));

	WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
			EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
			new TimeWindow.Serializer(),
			new TupleKeySelector<String>(),
			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
			stateDesc,
			new InternalIterableWindowFunction<>(new SessionWindowFunction()),
			PurgingTrigger.of(CountTrigger.of(4)),
			0,
			null /* late data output tag */);

	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector<>(), BasicTypeInfo.STRING_TYPE_INFO);

	testHarness.setup();

	testHarness.initializeState(
		OperatorSnapshotUtil.getResourceFilename(
			"win-op-migration-test-session-with-stateful-trigger-flink" + testMigrateVersion + "-snapshot"));

	testHarness.open();

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 6000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 6500));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 7000));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());

	// add an element that merges the two "key1" sessions, they should now have count 6, and therefore fire
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 10), 4500));

	expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-22", 10L, 10000L), 9999L));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());

	testHarness.close();
}
 
源代码16 项目: flink   文件: WindowOperatorTest.java
@Test
@SuppressWarnings("unchecked")
public void testCountTrigger() throws Exception {
	closeCalled.set(0);

	final int windowSize = 4;

	ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
			new SumReducer(),
			STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));

	WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>(
			GlobalWindows.create(),
			new GlobalWindow.Serializer(),
			new TupleKeySelector(),
			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
			stateDesc,
			new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
			PurgingTrigger.of(CountTrigger.of(windowSize)),
			0,
			null /* late data output tag */);

	OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
			createTestHarness(operator);

	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	testHarness.open();

	// The global window actually ignores these timestamps...

	// add elements out-of-order
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 20));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 0));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 999));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));

	// do a snapshot, close and restore again
	OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);

	testHarness.close();

	ConcurrentLinkedQueue<Object> outputBeforeClose = testHarness.getOutput();

	stateDesc = new ReducingStateDescriptor<>("window-contents",
			new SumReducer(),
			STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));

	operator = new WindowOperator<>(
			GlobalWindows.create(),
			new GlobalWindow.Serializer(),
			new TupleKeySelector(),
			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
			stateDesc,
			new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
			PurgingTrigger.of(CountTrigger.of(windowSize)),
			0,
			null /* late data output tag */);

	testHarness = createTestHarness(operator);

	testHarness.setup();
	testHarness.initializeState(snapshot);
	testHarness.open();

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, Iterables.concat(outputBeforeClose, testHarness.getOutput()), new Tuple2ResultSortComparator());

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10999));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, Iterables.concat(outputBeforeClose, testHarness.getOutput()), new Tuple2ResultSortComparator());

	testHarness.close();
}
 
源代码17 项目: Flink-CEPplus   文件: EvictingWindowOperatorTest.java
/**
 * Tests DeltaEvictor, evictAfter behavior.
 */
@Test
public void testDeltaEvictorEvictAfter() throws Exception {
	AtomicInteger closeCalled = new AtomicInteger(0);
	final int triggerCount = 2;
	final boolean evictAfter = true;
	final int threshold = 2;

	@SuppressWarnings({"unchecked", "rawtypes"})
	TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
		(TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));

	ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
		new ListStateDescriptor<>("window-contents", streamRecordSerializer);

	EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
		GlobalWindows.create(),
		new GlobalWindow.Serializer(),
		new TupleKeySelector(),
		BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
		stateDesc,
		new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
		CountTrigger.of(triggerCount),
		DeltaEvictor.of(threshold, new DeltaFunction<Tuple2<String, Integer>>() {
			@Override
			public double getDelta(Tuple2<String, Integer> oldDataPoint, Tuple2<String, Integer> newDataPoint) {
				return newDataPoint.f1 - oldDataPoint.f1;
			}
		}, evictAfter),
		0,
		null /* late data output tag */);

	OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
		new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);

	long initialTime = 0L;
	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	testHarness.open();

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), initialTime + 3999));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 5), initialTime + 999));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 1998));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), initialTime + 1999));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 15), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 9), initialTime + 10999));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), initialTime + 1000));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 16), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 22), Long.MAX_VALUE));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());

	testHarness.close();

	Assert.assertEquals("Close was not called.", 1, closeCalled.get());
}
 
源代码18 项目: Flink-CEPplus   文件: WindowOperatorTest.java
@Test
@SuppressWarnings("unchecked")
public void testReduceSessionWindows() throws Exception {
	closeCalled.set(0);

	final int sessionSize = 3;

	ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>(
			"window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));

	WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
			EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
			new TimeWindow.Serializer(),
			new TupleKeySelector(),
			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
			stateDesc,
			new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
			EventTimeTrigger.create(),
			0,
			null /* late data output tag */);

	OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
			createTestHarness(operator);

	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	testHarness.open();

	// add elements out-of-order
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));

	// do a snapshot, close and restore again
	OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
	testHarness.close();

	testHarness = createTestHarness(operator);
	testHarness.setup();
	testHarness.initializeState(snapshot);
	testHarness.open();

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 5501));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 6050));

	testHarness.processWatermark(new Watermark(12000));

	expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-6", 10L, 5500L), 5499));
	expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499));
	expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-20", 5501L, 9050L), 9049));
	expectedOutput.add(new Watermark(12000));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 15000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 20), 15000));

	testHarness.processWatermark(new Watermark(17999));

	expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-30", 15000L, 18000L), 17999));
	expectedOutput.add(new Watermark(17999));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());

	testHarness.close();
}
 
源代码19 项目: flink   文件: WindowOperatorTest.java
@Test
@SuppressWarnings("unchecked")
public void testReduceSessionWindowsWithProcessFunction() throws Exception {
	closeCalled.set(0);

	final int sessionSize = 3;

	ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>(
			"window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));

	WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
			EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
			new TimeWindow.Serializer(),
			new TupleKeySelector(),
			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
			stateDesc,
			new InternalSingleValueProcessWindowFunction<>(new ReducedProcessSessionWindowFunction()),
			EventTimeTrigger.create(),
			0,
			null /* late data output tag */);

	OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
			createTestHarness(operator);

	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	testHarness.open();

	// add elements out-of-order
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));

	// do a snapshot, close and restore again
	OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
	testHarness.close();

	testHarness = createTestHarness(operator);
	testHarness.setup();
	testHarness.initializeState(snapshot);
	testHarness.open();

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 5501));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 6050));

	testHarness.processWatermark(new Watermark(12000));

	expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-6", 10L, 5500L), 5499));
	expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499));
	expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-20", 5501L, 9050L), 9049));
	expectedOutput.add(new Watermark(12000));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 15000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 20), 15000));

	testHarness.processWatermark(new Watermark(17999));

	expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-30", 15000L, 18000L), 17999));
	expectedOutput.add(new Watermark(17999));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());

	testHarness.close();
}
 
源代码20 项目: Flink-CEPplus   文件: KeyedProcessOperatorTest.java
@Test
public void testEventTimeTimers() throws Exception {

	final int expectedKey = 17;

	KeyedProcessOperator<Integer, Integer, Integer> operator =
			new KeyedProcessOperator<>(new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME, expectedKey));

	OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
			new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);

	testHarness.setup();
	testHarness.open();

	testHarness.processWatermark(new Watermark(0));

	testHarness.processElement(new StreamRecord<>(expectedKey, 42L));

	testHarness.processWatermark(new Watermark(5));

	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	expectedOutput.add(new Watermark(0L));
	expectedOutput.add(new StreamRecord<>(expectedKey, 42L));
	expectedOutput.add(new StreamRecord<>(1777, 5L));
	expectedOutput.add(new Watermark(5L));

	TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());

	testHarness.close();
}