下面列出了org.mockito.verification.VerificationMode#org.apache.flink.runtime.state.memory.MemoryStateBackend 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> getTestHarness(
int maxParallelism,
int taskParallelism,
int subtaskIdx) throws Exception {
KeySelector<Event, Integer> keySelector = new TestKeySelector();
KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> harness =
new KeyedOneInputStreamOperatorTestHarness<>(
getKeyedCepOpearator(
false,
new NFAFactory()),
keySelector,
BasicTypeInfo.INT_TYPE_INFO,
maxParallelism,
taskParallelism,
subtaskIdx);
harness.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
return harness;
}
private static CheckpointCoordinator instantiateCheckpointCoordinator(JobID jid, ExecutionVertex... ackVertices) {
CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
10000000L,
600000L,
0L,
1,
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true,
false,
0);
return new CheckpointCoordinator(
jid,
chkConfig,
new ExecutionVertex[0],
ackVertices,
new ExecutionVertex[0],
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(10),
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY,
new CheckpointFailureManager(
0,
NoOpFailJobCall.INSTANCE));
}
/**
* Tests that the settings are actually serializable.
*/
@Test
public void testIsJavaSerializable() throws Exception {
JobCheckpointingSettings settings = new JobCheckpointingSettings(
Arrays.asList(new JobVertexID(), new JobVertexID()),
Arrays.asList(new JobVertexID(), new JobVertexID()),
Arrays.asList(new JobVertexID(), new JobVertexID()),
new CheckpointCoordinatorConfiguration(
1231231,
1231,
112,
12,
CheckpointRetentionPolicy.RETAIN_ON_FAILURE,
false,
false,
0),
new SerializedValue<>(new MemoryStateBackend()));
JobCheckpointingSettings copy = CommonTestUtils.createCopySerializable(settings);
assertEquals(settings.getVerticesToAcknowledge(), copy.getVerticesToAcknowledge());
assertEquals(settings.getVerticesToConfirm(), copy.getVerticesToConfirm());
assertEquals(settings.getVerticesToTrigger(), copy.getVerticesToTrigger());
assertEquals(settings.getCheckpointCoordinatorConfiguration(), copy.getCheckpointCoordinatorConfiguration());
assertNotNull(copy.getDefaultStateBackend());
assertTrue(copy.getDefaultStateBackend().deserializeValue(this.getClass().getClassLoader()).getClass() == MemoryStateBackend.class);
}
@Test(expected = IllegalArgumentException.class)
public void testExistingSavepointEnforceUniqueUIDs() throws IOException {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(10);
DataSource<Integer> input = env.fromElements(0);
BootstrapTransformation<Integer> transformation = OperatorTransformation
.bootstrapWith(input)
.transform(new ExampleStateBootstrapFunction());
Collection<OperatorState> operatorStates = Collections.singletonList(new OperatorState(
OperatorIDGenerator.fromUid(UID), 1, 4));
SavepointMetadata metadata = new SavepointMetadata(4, Collections.emptyList(), operatorStates);
new ExistingSavepoint(env, metadata, new MemoryStateBackend())
.withOperator(UID, transformation)
.withOperator(UID, transformation);
}
@Test(expected = IllegalArgumentException.class)
public void testNewSavepointEnforceUniqueUIDs() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(10);
DataSource<Integer> input = env.fromElements(0);
BootstrapTransformation<Integer> transformation = OperatorTransformation
.bootstrapWith(input)
.transform(new ExampleStateBootstrapFunction());
SavepointMetadata metadata = new SavepointMetadata(1, Collections.emptyList(), Collections.emptyList());
new NewSavepoint(metadata, new MemoryStateBackend())
.withOperator(UID, transformation)
.withOperator(UID, transformation);
}
@Test(expected = IllegalArgumentException.class)
public void testExistingSavepointEnforceUniqueUIDs() throws IOException {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(10);
DataSource<Integer> input = env.fromElements(0);
BootstrapTransformation<Integer> transformation = OperatorTransformation
.bootstrapWith(input)
.transform(new ExampleStateBootstrapFunction());
Collection<OperatorState> operatorStates = Collections.singletonList(new OperatorState(
OperatorIDGenerator.fromUid(UID), 1, 4));
SavepointMetadata metadata = new SavepointMetadata(4, Collections.emptyList(), operatorStates);
new ExistingSavepoint(env, metadata, new MemoryStateBackend())
.withOperator(UID, transformation)
.withOperator(UID, transformation);
}
@Test
public void testSnapshotEmpty() throws Exception {
final AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
final OperatorStateBackend operatorStateBackend =
abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "testOperator", emptyStateHandles, cancelStreamRegistry);
CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096);
RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot =
operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtils.runIfNotDoneAndGet(snapshot);
OperatorStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot();
assertNull(stateHandle);
}
private static <T> SourceOperator<T, MockSourceSplit> createTestOperator(
SourceReader<T, MockSourceSplit> reader,
WatermarkStrategy<T> watermarkStrategy,
ProcessingTimeService timeService) throws Exception {
final OperatorStateStore operatorStateStore =
new MemoryStateBackend().createOperatorStateBackend(
new MockEnvironmentBuilder().build(),
"test-operator",
Collections.emptyList(),
new CloseableRegistry());
final StateInitializationContext stateContext = new StateInitializationContextImpl(
false, operatorStateStore, null, null, null);
final SourceOperator<T, MockSourceSplit> sourceOperator =
new TestingSourceOperator<>(reader, watermarkStrategy, timeService);
sourceOperator.initializeState(stateContext);
sourceOperator.open();
return sourceOperator;
}
/**
* Validates taking the application-defined memory state backend and adding additional
* parameters from the cluster configuration, but giving precedence to application-defined
* parameters over configuration-defined parameters.
*/
@Test
public void testConfigureMemoryStateBackendMixed() throws Exception {
final String appCheckpointDir = new Path(tmp.newFolder().toURI()).toString();
final String checkpointDir = new Path(tmp.newFolder().toURI()).toString();
final String savepointDir = new Path(tmp.newFolder().toURI()).toString();
final Path expectedCheckpointPath = new Path(appCheckpointDir);
final Path expectedSavepointPath = new Path(savepointDir);
final MemoryStateBackend backend = new MemoryStateBackend(appCheckpointDir, null);
final Configuration config = new Configuration();
config.setString(backendKey, "filesystem"); // check that this is not accidentally picked up
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); // this parameter should not be picked up
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
StateBackend loadedBackend = StateBackendLoader.fromApplicationOrConfigOrDefault(backend, config, cl, null);
assertTrue(loadedBackend instanceof MemoryStateBackend);
final MemoryStateBackend memBackend = (MemoryStateBackend) loadedBackend;
assertEquals(expectedCheckpointPath, memBackend.getCheckpointPath());
assertEquals(expectedSavepointPath, memBackend.getSavepointPath());
}
private JobGraph createJobGraph(ExecutionMode mode) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
env.setRestartStrategy(RestartStrategies.noRestart());
env.setStateBackend((StateBackend) new MemoryStateBackend());
switch (mode) {
case MIGRATE:
createMigrationJob(env);
break;
case RESTORE:
createRestoredJob(env);
break;
}
return StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
}
@Test
public void testSnapshotEmpty() throws Exception {
final AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
final OperatorStateBackend operatorStateBackend =
abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "testOperator", emptyStateHandles, cancelStreamRegistry);
CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096);
RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot =
operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtils.runIfNotDoneAndGet(snapshot);
OperatorStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot();
assertNull(stateHandle);
}
private static <T> void setupSourceOperator(
StreamSource<T, ?> operator,
ExecutionConfig executionConfig,
Environment env,
ProcessingTimeService timeProvider) {
StreamConfig cfg = new StreamConfig(new Configuration());
cfg.setStateBackend(new MemoryStateBackend());
cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
cfg.setOperatorID(new OperatorID());
try {
MockStreamTask mockTask = new MockStreamTaskBuilder(env)
.setConfig(cfg)
.setExecutionConfig(executionConfig)
.setProcessingTimeService(timeProvider)
.build();
operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) mock(Output.class));
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testDefaultParallelismRespectedWhenLessThanMaxParallelism() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
DataSource<Integer> input = env.fromElements(0);
BootstrapTransformation<Integer> transformation = OperatorTransformation
.bootstrapWith(input)
.transform(new ExampleStateBootstrapFunction());
int maxParallelism = transformation.getMaxParallelism(10);
DataSet<TaggedOperatorSubtaskState> result = transformation.writeOperatorSubtaskStates(
OperatorIDGenerator.fromUid("uid"),
new MemoryStateBackend(),
new Path(),
maxParallelism
);
Assert.assertEquals(
"The parallelism of a data set should not change when less than the max parallelism of the savepoint",
ExecutionConfig.PARALLELISM_DEFAULT,
getParallelism(result));
}
private JobGraph createJobGraph(ExecutionMode mode) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
env.setRestartStrategy(RestartStrategies.noRestart());
env.setStateBackend((StateBackend) new MemoryStateBackend());
switch (mode) {
case MIGRATE:
createMigrationJob(env);
break;
case RESTORE:
createRestoredJob(env);
break;
}
return StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
}
@Test(expected = IOException.class)
public void testInvalidProcessReaderFunctionFails() throws Exception {
OperatorID operatorID = OperatorIDGenerator.fromUid("uid");
OperatorSubtaskState state = createOperatorSubtaskState(new StreamFlatMap<>(new StatefulFunction()));
OperatorState operatorState = new OperatorState(operatorID, 1, 128);
operatorState.putState(0, state);
KeyedStateInputFormat<?, ?> format = new KeyedStateInputFormat<>(operatorState, new MemoryStateBackend(), Types.INT, new ReaderFunction());
KeyGroupRangeInputSplit split = format.createInputSplits(1)[0];
KeyedStateReaderFunction<Integer, Integer> userFunction = new InvalidReaderFunction();
readInputSplit(split, userFunction);
Assert.fail("KeyedStateReaderFunction did not fail on invalid RuntimeContext use");
}
@Test
public void testReadState() throws Exception {
OperatorID operatorID = OperatorIDGenerator.fromUid("uid");
OperatorSubtaskState state = createOperatorSubtaskState(new StreamFlatMap<>(new StatefulFunction()));
OperatorState operatorState = new OperatorState(operatorID, 1, 128);
operatorState.putState(0, state);
KeyedStateInputFormat<?, ?> format = new KeyedStateInputFormat<>(operatorState, new MemoryStateBackend(), Types.INT, new ReaderFunction());
KeyGroupRangeInputSplit split = format.createInputSplits(1)[0];
KeyedStateReaderFunction<Integer, Integer> userFunction = new ReaderFunction();
List<Integer> data = readInputSplit(split, userFunction);
Assert.assertEquals("Incorrect data read from input split", Arrays.asList(1, 2, 3), data);
}
@Test
public void testOperatorSpecificMaxParallelismRespected() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
DataSource<Integer> input = env.fromElements(0);
BootstrapTransformation<Integer> transformation = OperatorTransformation
.bootstrapWith(input)
.setMaxParallelism(1)
.transform(new ExampleStateBootstrapFunction());
int maxParallelism = transformation.getMaxParallelism(4);
DataSet<TaggedOperatorSubtaskState> result = transformation.writeOperatorSubtaskStates(
OperatorIDGenerator.fromUid("uid"),
new MemoryStateBackend(),
new Path(),
maxParallelism
);
Assert.assertEquals("The parallelism of a data set should be constrained my the savepoint max parallelism", 1, getParallelism(result));
}
private KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> getTestHarness(
int maxParallelism,
int taskParallelism,
int subtaskIdx) throws Exception {
KeySelector<Event, Integer> keySelector = new TestKeySelector();
KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> harness =
new KeyedOneInputStreamOperatorTestHarness<>(
getKeyedCepOpearator(
false,
new NFAFactory()),
keySelector,
BasicTypeInfo.INT_TYPE_INFO,
maxParallelism,
taskParallelism,
subtaskIdx);
harness.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
return harness;
}
@Test
public void testCreatePartitionedInputSplits() throws Exception {
OperatorID operatorID = OperatorIDGenerator.fromUid("uid");
OperatorSubtaskState state = createOperatorSubtaskState(new StreamFlatMap<>(new StatefulFunction()));
OperatorState operatorState = new OperatorState(operatorID, 1, 128);
operatorState.putState(0, state);
KeyedStateInputFormat<?, ?> format = new KeyedStateInputFormat<>(operatorState, new MemoryStateBackend(), Types.INT, new ReaderFunction());
KeyGroupRangeInputSplit[] splits = format.createInputSplits(4);
Assert.assertEquals("Failed to properly partition operator state into input splits", 4, splits.length);
}
@SuppressWarnings("unchecked")
private static <T> MockStreamTask setupSourceOperator(
StreamSource<T, ?> operator,
TimeCharacteristic timeChar,
long watermarkInterval,
final TimerService timeProvider) throws Exception {
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setAutoWatermarkInterval(watermarkInterval);
StreamConfig cfg = new StreamConfig(new Configuration());
cfg.setStateBackend(new MemoryStateBackend());
cfg.setTimeCharacteristic(timeChar);
cfg.setOperatorID(new OperatorID());
Environment env = new DummyEnvironment("MockTwoInputTask", 1, 0);
StreamStatusMaintainer streamStatusMaintainer = mock(StreamStatusMaintainer.class);
when(streamStatusMaintainer.getStreamStatus()).thenReturn(StreamStatus.ACTIVE);
MockStreamTask mockTask = new MockStreamTaskBuilder(env)
.setConfig(cfg)
.setExecutionConfig(executionConfig)
.setStreamStatusMaintainer(streamStatusMaintainer)
.setTimerService(timeProvider)
.build();
operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) mock(Output.class));
return mockTask;
}
@Test
public void testWithRocksDbBackendIncremental() throws Exception {
RocksDBStateBackend incRocksDbBackend = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE), true);
incRocksDbBackend.setDbStoragePath(tmpFolder.newFolder().getAbsolutePath());
testProgramWithBackend(incRocksDbBackend);
}
@Override
protected StateBackendTestContext createStateBackendTestContext(TtlTimeProvider timeProvider) {
return new StateBackendTestContext(timeProvider) {
@Override
protected StateBackend createStateBackend() {
return new MemoryStateBackend(false);
}
};
}
/**
* Tests that the settings are actually serializable.
*/
@Test
public void testIsJavaSerializable() throws Exception {
JobCheckpointingSettings settings = new JobCheckpointingSettings(
Arrays.asList(new JobVertexID(), new JobVertexID()),
Arrays.asList(new JobVertexID(), new JobVertexID()),
Arrays.asList(new JobVertexID(), new JobVertexID()),
new CheckpointCoordinatorConfiguration(
1231231,
1231,
112,
12,
CheckpointRetentionPolicy.RETAIN_ON_FAILURE,
false,
false,
false,
0),
new SerializedValue<>(new MemoryStateBackend()));
JobCheckpointingSettings copy = CommonTestUtils.createCopySerializable(settings);
assertEquals(settings.getVerticesToAcknowledge(), copy.getVerticesToAcknowledge());
assertEquals(settings.getVerticesToConfirm(), copy.getVerticesToConfirm());
assertEquals(settings.getVerticesToTrigger(), copy.getVerticesToTrigger());
assertEquals(settings.getCheckpointCoordinatorConfiguration(), copy.getCheckpointCoordinatorConfiguration());
assertNotNull(copy.getDefaultStateBackend());
assertTrue(copy.getDefaultStateBackend().deserializeValue(this.getClass().getClassLoader()).getClass() == MemoryStateBackend.class);
}
@Test
public void testWithRocksDbBackendFull() throws Exception {
RocksDBStateBackend fullRocksDbBackend = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE), false);
fullRocksDbBackend.setDbStoragePath(tmpFolder.newFolder().getAbsolutePath());
testProgramWithBackend(fullRocksDbBackend);
}
@Override
protected StateBackendTestContext createStateBackendTestContext(TtlTimeProvider timeProvider) {
return new StateBackendTestContext(timeProvider) {
@Override
protected StateBackend createStateBackend() {
return new MemoryStateBackend(false);
}
};
}
/**
* Tests that no minimum delay between savepoints is enforced.
*/
@Test
public void testMinDelayBetweenSavepoints() throws Exception {
JobID jobId = new JobID();
final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
CheckpointCoordinator coord = new CheckpointCoordinator(
jobId,
100000,
200000,
100000000L, // very long min delay => should not affect savepoints
1,
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[] { vertex1 },
new ExecutionVertex[] { vertex1 },
new ExecutionVertex[] { vertex1 },
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
String savepointDir = tmpFolder.newFolder().getAbsolutePath();
CompletableFuture<CompletedCheckpoint> savepoint0 = coord.triggerSavepoint(0, savepointDir);
assertFalse("Did not trigger savepoint", savepoint0.isDone());
CompletableFuture<CompletedCheckpoint> savepoint1 = coord.triggerSavepoint(1, savepointDir);
assertFalse("Did not trigger savepoint", savepoint1.isDone());
}
/**
* Tests that the pending checkpoint stats callbacks are created.
*/
@Test
public void testCheckpointStatsTrackerPendingCheckpointCallback() {
final long timestamp = System.currentTimeMillis();
ExecutionVertex vertex1 = mockExecutionVertex(new ExecutionAttemptID());
// set up the coordinator and validate the initial state
CheckpointCoordinator coord = new CheckpointCoordinator(
new JobID(),
600000,
600000,
0,
Integer.MAX_VALUE,
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[]{vertex1},
new ExecutionVertex[]{vertex1},
new ExecutionVertex[]{vertex1},
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
coord.setCheckpointStatsTracker(tracker);
when(tracker.reportPendingCheckpoint(anyLong(), anyLong(), any(CheckpointProperties.class)))
.thenReturn(mock(PendingCheckpointStats.class));
// Trigger a checkpoint and verify callback
assertTrue(coord.triggerCheckpoint(timestamp, false));
verify(tracker, times(1))
.reportPendingCheckpoint(eq(1L), eq(timestamp), eq(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)));
}
@Test
public void testInstantiateMemoryBackendByDefault() throws Exception {
StateBackend backend =
StateBackendLoader.fromApplicationOrConfigOrDefault(null, new Configuration(), cl, null);
assertTrue(backend instanceof MemoryStateBackend);
}
@Test
public void testStreamConfig() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> input = env.fromElements("");
BootstrapTransformation<String> transformation = OperatorTransformation
.bootstrapWith(input)
.keyBy(new CustomKeySelector())
.transform(new ExampleKeyedStateBootstrapFunction());
StreamConfig config = transformation.getConfig(OperatorIDGenerator.fromUid("uid"), new MemoryStateBackend(), null);
KeySelector selector = config.getStatePartitioner(0, Thread.currentThread().getContextClassLoader());
Assert.assertEquals("Incorrect key selector forwarded to stream operator", CustomKeySelector.class, selector.getClass());
}
private ExecutionGraph createExecutionGraphAndEnableCheckpointing(
CheckpointIDCounter counter,
CompletedCheckpointStore store) throws Exception {
final Time timeout = Time.days(1L);
JobVertex jobVertex = new JobVertex("MockVertex");
jobVertex.setInvokableClass(AbstractInvokable.class);
final ExecutionGraph executionGraph = new ExecutionGraphTestUtils.TestingExecutionGraphBuilder(jobVertex)
.setRpcTimeout(timeout)
.setAllocationTimeout(timeout)
.allowQueuedScheduling()
.build();
executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
100,
100,
100,
1,
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true,
false,
0);
executionGraph.enableCheckpointing(
chkConfig,
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
counter,
store,
new MemoryStateBackend(),
CheckpointStatsTrackerTest.createTestTracker());
return executionGraph;
}