org.mockito.verification.VerificationMode#org.apache.flink.runtime.state.memory.MemoryStateBackend源码实例Demo

下面列出了org.mockito.verification.VerificationMode#org.apache.flink.runtime.state.memory.MemoryStateBackend 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: flink   文件: CEPRescalingTest.java
private KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> getTestHarness(
		int maxParallelism,
		int taskParallelism,
		int subtaskIdx) throws Exception {

	KeySelector<Event, Integer> keySelector = new TestKeySelector();
	KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> harness =
			new KeyedOneInputStreamOperatorTestHarness<>(
					getKeyedCepOpearator(
							false,
							new NFAFactory()),
					keySelector,
					BasicTypeInfo.INT_TYPE_INFO,
					maxParallelism,
					taskParallelism,
					subtaskIdx);
	harness.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
	return harness;
}
 
private static CheckpointCoordinator instantiateCheckpointCoordinator(JobID jid, ExecutionVertex... ackVertices) {
	CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
		10000000L,
		600000L,
		0L,
		1,
		CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
		true,
		false,
		0);
	return new CheckpointCoordinator(
			jid,
			chkConfig,
			new ExecutionVertex[0],
			ackVertices,
			new ExecutionVertex[0],
			new StandaloneCheckpointIDCounter(),
			new StandaloneCompletedCheckpointStore(10),
			new MemoryStateBackend(),
			Executors.directExecutor(),
			SharedStateRegistry.DEFAULT_FACTORY,
			new CheckpointFailureManager(
				0,
				NoOpFailJobCall.INSTANCE));
}
 
源代码3 项目: flink   文件: JobCheckpointingSettingsTest.java
/**
 * Tests that the settings are actually serializable.
 */
@Test
public void testIsJavaSerializable() throws Exception {
	JobCheckpointingSettings settings = new JobCheckpointingSettings(
		Arrays.asList(new JobVertexID(), new JobVertexID()),
		Arrays.asList(new JobVertexID(), new JobVertexID()),
		Arrays.asList(new JobVertexID(), new JobVertexID()),
		new CheckpointCoordinatorConfiguration(
			1231231,
			1231,
			112,
			12,
			CheckpointRetentionPolicy.RETAIN_ON_FAILURE,
			false,
			false,
			0),
		new SerializedValue<>(new MemoryStateBackend()));

	JobCheckpointingSettings copy = CommonTestUtils.createCopySerializable(settings);
	assertEquals(settings.getVerticesToAcknowledge(), copy.getVerticesToAcknowledge());
	assertEquals(settings.getVerticesToConfirm(), copy.getVerticesToConfirm());
	assertEquals(settings.getVerticesToTrigger(), copy.getVerticesToTrigger());
	assertEquals(settings.getCheckpointCoordinatorConfiguration(), copy.getCheckpointCoordinatorConfiguration());
	assertNotNull(copy.getDefaultStateBackend());
	assertTrue(copy.getDefaultStateBackend().deserializeValue(this.getClass().getClassLoader()).getClass() == MemoryStateBackend.class);
}
 
源代码4 项目: flink   文件: SavepointTest.java
@Test(expected = IllegalArgumentException.class)
public void testExistingSavepointEnforceUniqueUIDs() throws IOException {
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	env.setParallelism(10);

	DataSource<Integer> input = env.fromElements(0);

	BootstrapTransformation<Integer> transformation = OperatorTransformation
		.bootstrapWith(input)
		.transform(new ExampleStateBootstrapFunction());

	Collection<OperatorState> operatorStates = Collections.singletonList(new OperatorState(
		OperatorIDGenerator.fromUid(UID), 1, 4));

	SavepointMetadata metadata = new SavepointMetadata(4, Collections.emptyList(), operatorStates);

	new ExistingSavepoint(env, metadata, new MemoryStateBackend())
		.withOperator(UID, transformation)
		.withOperator(UID, transformation);
}
 
源代码5 项目: flink   文件: SavepointTest.java
@Test(expected = IllegalArgumentException.class)
public void testNewSavepointEnforceUniqueUIDs() {
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	env.setParallelism(10);

	DataSource<Integer> input = env.fromElements(0);

	BootstrapTransformation<Integer> transformation = OperatorTransformation
		.bootstrapWith(input)
		.transform(new ExampleStateBootstrapFunction());

	SavepointMetadata metadata = new SavepointMetadata(1, Collections.emptyList(), Collections.emptyList());

	new NewSavepoint(metadata, new MemoryStateBackend())
		.withOperator(UID, transformation)
		.withOperator(UID, transformation);
}
 
源代码6 项目: flink   文件: SavepointTest.java
@Test(expected = IllegalArgumentException.class)
public void testExistingSavepointEnforceUniqueUIDs() throws IOException {
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	env.setParallelism(10);

	DataSource<Integer> input = env.fromElements(0);

	BootstrapTransformation<Integer> transformation = OperatorTransformation
		.bootstrapWith(input)
		.transform(new ExampleStateBootstrapFunction());

	Collection<OperatorState> operatorStates = Collections.singletonList(new OperatorState(
		OperatorIDGenerator.fromUid(UID), 1, 4));

	SavepointMetadata metadata = new SavepointMetadata(4, Collections.emptyList(), operatorStates);

	new ExistingSavepoint(env, metadata, new MemoryStateBackend())
		.withOperator(UID, transformation)
		.withOperator(UID, transformation);
}
 
源代码7 项目: Flink-CEPplus   文件: OperatorStateBackendTest.java
@Test
public void testSnapshotEmpty() throws Exception {
	final AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
	CloseableRegistry cancelStreamRegistry = new CloseableRegistry();

	final OperatorStateBackend operatorStateBackend =
			abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "testOperator", emptyStateHandles, cancelStreamRegistry);

	CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096);

	RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot =
			operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());

	SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtils.runIfNotDoneAndGet(snapshot);
	OperatorStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot();
	assertNull(stateHandle);
}
 
源代码8 项目: flink   文件: SourceOperatorEventTimeTest.java
private static <T> SourceOperator<T, MockSourceSplit> createTestOperator(
		SourceReader<T, MockSourceSplit> reader,
		WatermarkStrategy<T> watermarkStrategy,
		ProcessingTimeService timeService) throws Exception {

	final OperatorStateStore operatorStateStore =
			new MemoryStateBackend().createOperatorStateBackend(
					new MockEnvironmentBuilder().build(),
					"test-operator",
					Collections.emptyList(),
					new CloseableRegistry());

	final StateInitializationContext stateContext = new StateInitializationContextImpl(
		false, operatorStateStore, null, null, null);

	final SourceOperator<T, MockSourceSplit> sourceOperator =
			new TestingSourceOperator<>(reader, watermarkStrategy, timeService);
	sourceOperator.initializeState(stateContext);
	sourceOperator.open();

	return sourceOperator;
}
 
源代码9 项目: Flink-CEPplus   文件: StateBackendLoadingTest.java
/**
 * Validates taking the application-defined memory state backend and adding additional
 * parameters from the cluster configuration, but giving precedence to application-defined
 * parameters over configuration-defined parameters.
 */
@Test
public void testConfigureMemoryStateBackendMixed() throws Exception {
	final String appCheckpointDir = new Path(tmp.newFolder().toURI()).toString();
	final String checkpointDir = new Path(tmp.newFolder().toURI()).toString();
	final String savepointDir = new Path(tmp.newFolder().toURI()).toString();

	final Path expectedCheckpointPath = new Path(appCheckpointDir);
	final Path expectedSavepointPath = new Path(savepointDir);

	final MemoryStateBackend backend = new MemoryStateBackend(appCheckpointDir, null);

	final Configuration config = new Configuration();
	config.setString(backendKey, "filesystem"); // check that this is not accidentally picked up
	config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); // this parameter should not be picked up
	config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);

	StateBackend loadedBackend = StateBackendLoader.fromApplicationOrConfigOrDefault(backend, config, cl, null);
	assertTrue(loadedBackend instanceof MemoryStateBackend);

	final MemoryStateBackend memBackend = (MemoryStateBackend) loadedBackend;
	assertEquals(expectedCheckpointPath, memBackend.getCheckpointPath());
	assertEquals(expectedSavepointPath, memBackend.getSavepointPath());
}
 
源代码10 项目: flink   文件: AbstractOperatorRestoreTestBase.java
private JobGraph createJobGraph(ExecutionMode mode) {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
	env.setRestartStrategy(RestartStrategies.noRestart());
	env.setStateBackend((StateBackend) new MemoryStateBackend());

	switch (mode) {
		case MIGRATE:
			createMigrationJob(env);
			break;
		case RESTORE:
			createRestoredJob(env);
			break;
	}

	return StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
}
 
源代码11 项目: flink   文件: OperatorStateBackendTest.java
@Test
public void testSnapshotEmpty() throws Exception {
	final AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
	CloseableRegistry cancelStreamRegistry = new CloseableRegistry();

	final OperatorStateBackend operatorStateBackend =
			abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "testOperator", emptyStateHandles, cancelStreamRegistry);

	CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096);

	RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot =
			operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());

	SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtils.runIfNotDoneAndGet(snapshot);
	OperatorStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot();
	assertNull(stateHandle);
}
 
private static <T> void setupSourceOperator(
		StreamSource<T, ?> operator,
		ExecutionConfig executionConfig,
		Environment env,
		ProcessingTimeService timeProvider) {

	StreamConfig cfg = new StreamConfig(new Configuration());
	cfg.setStateBackend(new MemoryStateBackend());

	cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
	cfg.setOperatorID(new OperatorID());

	try {
		MockStreamTask mockTask = new MockStreamTaskBuilder(env)
			.setConfig(cfg)
			.setExecutionConfig(executionConfig)
			.setProcessingTimeService(timeProvider)
			.build();

		operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) mock(Output.class));
	} catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
源代码13 项目: flink   文件: BootstrapTransformationTest.java
@Test
public void testDefaultParallelismRespectedWhenLessThanMaxParallelism() {
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	env.setParallelism(4);

	DataSource<Integer> input = env.fromElements(0);

	BootstrapTransformation<Integer> transformation = OperatorTransformation
		.bootstrapWith(input)
		.transform(new ExampleStateBootstrapFunction());

	int maxParallelism = transformation.getMaxParallelism(10);
	DataSet<TaggedOperatorSubtaskState> result = transformation.writeOperatorSubtaskStates(
		OperatorIDGenerator.fromUid("uid"),
		new MemoryStateBackend(),
		new Path(),
		maxParallelism
	);

	Assert.assertEquals(
		"The parallelism of a data set should not change when less than the max parallelism of the savepoint",
		ExecutionConfig.PARALLELISM_DEFAULT,
		getParallelism(result));
}
 
源代码14 项目: flink   文件: AbstractOperatorRestoreTestBase.java
private JobGraph createJobGraph(ExecutionMode mode) {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
	env.setRestartStrategy(RestartStrategies.noRestart());
	env.setStateBackend((StateBackend) new MemoryStateBackend());

	switch (mode) {
		case MIGRATE:
			createMigrationJob(env);
			break;
		case RESTORE:
			createRestoredJob(env);
			break;
	}

	return StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
}
 
源代码15 项目: flink   文件: KeyedStateInputFormatTest.java
@Test(expected = IOException.class)
public void testInvalidProcessReaderFunctionFails() throws Exception {
	OperatorID operatorID = OperatorIDGenerator.fromUid("uid");

	OperatorSubtaskState state = createOperatorSubtaskState(new StreamFlatMap<>(new StatefulFunction()));
	OperatorState operatorState = new OperatorState(operatorID, 1, 128);
	operatorState.putState(0, state);

	KeyedStateInputFormat<?, ?> format = new KeyedStateInputFormat<>(operatorState, new MemoryStateBackend(), Types.INT, new ReaderFunction());
	KeyGroupRangeInputSplit split = format.createInputSplits(1)[0];

	KeyedStateReaderFunction<Integer, Integer> userFunction = new InvalidReaderFunction();

	readInputSplit(split, userFunction);

	Assert.fail("KeyedStateReaderFunction did not fail on invalid RuntimeContext use");
}
 
源代码16 项目: flink   文件: KeyedStateInputFormatTest.java
@Test
public void testReadState() throws Exception {
	OperatorID operatorID = OperatorIDGenerator.fromUid("uid");

	OperatorSubtaskState state = createOperatorSubtaskState(new StreamFlatMap<>(new StatefulFunction()));
	OperatorState operatorState = new OperatorState(operatorID, 1, 128);
	operatorState.putState(0, state);

	KeyedStateInputFormat<?, ?> format = new KeyedStateInputFormat<>(operatorState, new MemoryStateBackend(), Types.INT, new ReaderFunction());
	KeyGroupRangeInputSplit split = format.createInputSplits(1)[0];

	KeyedStateReaderFunction<Integer, Integer> userFunction = new ReaderFunction();

	List<Integer> data = readInputSplit(split, userFunction);

	Assert.assertEquals("Incorrect data read from input split", Arrays.asList(1, 2, 3), data);
}
 
源代码17 项目: flink   文件: BootstrapTransformationTest.java
@Test
public void testOperatorSpecificMaxParallelismRespected() {
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	env.setParallelism(4);

	DataSource<Integer> input = env.fromElements(0);

	BootstrapTransformation<Integer> transformation = OperatorTransformation
		.bootstrapWith(input)
		.setMaxParallelism(1)
		.transform(new ExampleStateBootstrapFunction());

	int maxParallelism = transformation.getMaxParallelism(4);
	DataSet<TaggedOperatorSubtaskState> result = transformation.writeOperatorSubtaskStates(
		OperatorIDGenerator.fromUid("uid"),
		new MemoryStateBackend(),
		new Path(),
		maxParallelism
	);

	Assert.assertEquals("The parallelism of a data set should be constrained my the savepoint max parallelism", 1, getParallelism(result));
}
 
源代码18 项目: flink   文件: CEPRescalingTest.java
private KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> getTestHarness(
		int maxParallelism,
		int taskParallelism,
		int subtaskIdx) throws Exception {

	KeySelector<Event, Integer> keySelector = new TestKeySelector();
	KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> harness =
			new KeyedOneInputStreamOperatorTestHarness<>(
					getKeyedCepOpearator(
							false,
							new NFAFactory()),
					keySelector,
					BasicTypeInfo.INT_TYPE_INFO,
					maxParallelism,
					taskParallelism,
					subtaskIdx);
	harness.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
	return harness;
}
 
源代码19 项目: flink   文件: KeyedStateInputFormatTest.java
@Test
public void testCreatePartitionedInputSplits() throws Exception {
	OperatorID operatorID = OperatorIDGenerator.fromUid("uid");

	OperatorSubtaskState state = createOperatorSubtaskState(new StreamFlatMap<>(new StatefulFunction()));
	OperatorState operatorState = new OperatorState(operatorID, 1, 128);
	operatorState.putState(0, state);

	KeyedStateInputFormat<?, ?> format = new KeyedStateInputFormat<>(operatorState, new MemoryStateBackend(), Types.INT, new ReaderFunction());
	KeyGroupRangeInputSplit[] splits = format.createInputSplits(4);
	Assert.assertEquals("Failed to properly partition operator state into input splits", 4, splits.length);
}
 
源代码20 项目: flink   文件: StreamSourceOperatorWatermarksTest.java
@SuppressWarnings("unchecked")
private static <T> MockStreamTask setupSourceOperator(
		StreamSource<T, ?> operator,
		TimeCharacteristic timeChar,
		long watermarkInterval,
		final TimerService timeProvider) throws Exception {

	ExecutionConfig executionConfig = new ExecutionConfig();
	executionConfig.setAutoWatermarkInterval(watermarkInterval);

	StreamConfig cfg = new StreamConfig(new Configuration());
	cfg.setStateBackend(new MemoryStateBackend());

	cfg.setTimeCharacteristic(timeChar);
	cfg.setOperatorID(new OperatorID());

	Environment env = new DummyEnvironment("MockTwoInputTask", 1, 0);

	StreamStatusMaintainer streamStatusMaintainer = mock(StreamStatusMaintainer.class);
	when(streamStatusMaintainer.getStreamStatus()).thenReturn(StreamStatus.ACTIVE);

	MockStreamTask mockTask = new MockStreamTaskBuilder(env)
		.setConfig(cfg)
		.setExecutionConfig(executionConfig)
		.setStreamStatusMaintainer(streamStatusMaintainer)
		.setTimerService(timeProvider)
		.build();

	operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) mock(Output.class));
	return mockTask;
}
 
@Test
public void testWithRocksDbBackendIncremental() throws Exception {
	RocksDBStateBackend incRocksDbBackend = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE), true);
	incRocksDbBackend.setDbStoragePath(tmpFolder.newFolder().getAbsolutePath());

	testProgramWithBackend(incRocksDbBackend);
}
 
源代码22 项目: flink   文件: HeapSyncSnapshotTtlStateTest.java
@Override
protected StateBackendTestContext createStateBackendTestContext(TtlTimeProvider timeProvider) {
	return new StateBackendTestContext(timeProvider) {
		@Override
		protected StateBackend createStateBackend() {
			return new MemoryStateBackend(false);
		}
	};
}
 
源代码23 项目: flink   文件: JobCheckpointingSettingsTest.java
/**
 * Tests that the settings are actually serializable.
 */
@Test
public void testIsJavaSerializable() throws Exception {
	JobCheckpointingSettings settings = new JobCheckpointingSettings(
		Arrays.asList(new JobVertexID(), new JobVertexID()),
		Arrays.asList(new JobVertexID(), new JobVertexID()),
		Arrays.asList(new JobVertexID(), new JobVertexID()),
		new CheckpointCoordinatorConfiguration(
			1231231,
			1231,
			112,
			12,
			CheckpointRetentionPolicy.RETAIN_ON_FAILURE,
			false,
			false,
			false,
			0),
		new SerializedValue<>(new MemoryStateBackend()));

	JobCheckpointingSettings copy = CommonTestUtils.createCopySerializable(settings);
	assertEquals(settings.getVerticesToAcknowledge(), copy.getVerticesToAcknowledge());
	assertEquals(settings.getVerticesToConfirm(), copy.getVerticesToConfirm());
	assertEquals(settings.getVerticesToTrigger(), copy.getVerticesToTrigger());
	assertEquals(settings.getCheckpointCoordinatorConfiguration(), copy.getCheckpointCoordinatorConfiguration());
	assertNotNull(copy.getDefaultStateBackend());
	assertTrue(copy.getDefaultStateBackend().deserializeValue(this.getClass().getClassLoader()).getClass() == MemoryStateBackend.class);
}
 
源代码24 项目: flink   文件: KeyedStateCheckpointingITCase.java
@Test
public void testWithRocksDbBackendFull() throws Exception {
	RocksDBStateBackend fullRocksDbBackend = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE), false);
	fullRocksDbBackend.setDbStoragePath(tmpFolder.newFolder().getAbsolutePath());

	testProgramWithBackend(fullRocksDbBackend);
}
 
源代码25 项目: flink   文件: HeapSyncSnapshotTtlStateTest.java
@Override
protected StateBackendTestContext createStateBackendTestContext(TtlTimeProvider timeProvider) {
	return new StateBackendTestContext(timeProvider) {
		@Override
		protected StateBackend createStateBackend() {
			return new MemoryStateBackend(false);
		}
	};
}
 
源代码26 项目: Flink-CEPplus   文件: CheckpointCoordinatorTest.java
/**
 * Tests that no minimum delay between savepoints is enforced.
 */
@Test
public void testMinDelayBetweenSavepoints() throws Exception {
	JobID jobId = new JobID();

	final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
	ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);

	CheckpointCoordinator coord = new CheckpointCoordinator(
		jobId,
		100000,
		200000,
		100000000L, // very long min delay => should not affect savepoints
		1,
		CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
		new ExecutionVertex[] { vertex1 },
		new ExecutionVertex[] { vertex1 },
		new ExecutionVertex[] { vertex1 },
		new StandaloneCheckpointIDCounter(),
		new StandaloneCompletedCheckpointStore(2),
		new MemoryStateBackend(),
		Executors.directExecutor(),
		SharedStateRegistry.DEFAULT_FACTORY);

	String savepointDir = tmpFolder.newFolder().getAbsolutePath();

	CompletableFuture<CompletedCheckpoint> savepoint0 = coord.triggerSavepoint(0, savepointDir);
	assertFalse("Did not trigger savepoint", savepoint0.isDone());

	CompletableFuture<CompletedCheckpoint> savepoint1 = coord.triggerSavepoint(1, savepointDir);
	assertFalse("Did not trigger savepoint", savepoint1.isDone());
}
 
源代码27 项目: Flink-CEPplus   文件: CheckpointCoordinatorTest.java
/**
 * Tests that the pending checkpoint stats callbacks are created.
 */
@Test
public void testCheckpointStatsTrackerPendingCheckpointCallback() {
	final long timestamp = System.currentTimeMillis();
	ExecutionVertex vertex1 = mockExecutionVertex(new ExecutionAttemptID());

	// set up the coordinator and validate the initial state
	CheckpointCoordinator coord = new CheckpointCoordinator(
		new JobID(),
		600000,
		600000,
		0,
		Integer.MAX_VALUE,
		CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
		new ExecutionVertex[]{vertex1},
		new ExecutionVertex[]{vertex1},
		new ExecutionVertex[]{vertex1},
		new StandaloneCheckpointIDCounter(),
		new StandaloneCompletedCheckpointStore(1),
		new MemoryStateBackend(),
		Executors.directExecutor(),
		SharedStateRegistry.DEFAULT_FACTORY);

	CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
	coord.setCheckpointStatsTracker(tracker);

	when(tracker.reportPendingCheckpoint(anyLong(), anyLong(), any(CheckpointProperties.class)))
		.thenReturn(mock(PendingCheckpointStats.class));

	// Trigger a checkpoint and verify callback
	assertTrue(coord.triggerCheckpoint(timestamp, false));

	verify(tracker, times(1))
		.reportPendingCheckpoint(eq(1L), eq(timestamp), eq(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)));
}
 
源代码28 项目: flink   文件: StateBackendLoadingTest.java
@Test
public void testInstantiateMemoryBackendByDefault() throws Exception {
	StateBackend backend =
			StateBackendLoader.fromApplicationOrConfigOrDefault(null, new Configuration(), cl, null);

	assertTrue(backend instanceof MemoryStateBackend);
}
 
源代码29 项目: flink   文件: BootstrapTransformationTest.java
@Test
public void testStreamConfig() {
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	DataSource<String> input = env.fromElements("");

	BootstrapTransformation<String> transformation = OperatorTransformation
		.bootstrapWith(input)
		.keyBy(new CustomKeySelector())
		.transform(new ExampleKeyedStateBootstrapFunction());

	StreamConfig config = transformation.getConfig(OperatorIDGenerator.fromUid("uid"), new MemoryStateBackend(), null);
	KeySelector selector = config.getStatePartitioner(0, Thread.currentThread().getContextClassLoader());

	Assert.assertEquals("Incorrect key selector forwarded to stream operator", CustomKeySelector.class, selector.getClass());
}
 
private ExecutionGraph createExecutionGraphAndEnableCheckpointing(
		CheckpointIDCounter counter,
		CompletedCheckpointStore store) throws Exception {
	final Time timeout = Time.days(1L);

	JobVertex jobVertex = new JobVertex("MockVertex");
	jobVertex.setInvokableClass(AbstractInvokable.class);

	final ExecutionGraph executionGraph = new ExecutionGraphTestUtils.TestingExecutionGraphBuilder(jobVertex)
		.setRpcTimeout(timeout)
		.setAllocationTimeout(timeout)
		.allowQueuedScheduling()
		.build();

	executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());

	CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
		100,
		100,
		100,
		1,
		CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
		true,
		false,
		0);

	executionGraph.enableCheckpointing(
			chkConfig,
			Collections.emptyList(),
			Collections.emptyList(),
			Collections.emptyList(),
			Collections.emptyList(),
			counter,
			store,
			new MemoryStateBackend(),
			CheckpointStatsTrackerTest.createTestTracker());

	return executionGraph;
}