下面列出了org.mockito.verification.VerificationMode#org.apache.flink.api.common.JobID 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public CompletableFuture<KvStateLocation> requestKvStateLocation(final JobID jobId, final String registrationName) {
// sanity check for the correct JobID
if (jobGraph.getJobID().equals(jobId)) {
if (log.isDebugEnabled()) {
log.debug("Lookup key-value state for job {} with registration " +
"name {}.", jobGraph.getJobID(), registrationName);
}
final KvStateLocationRegistry registry = executionGraph.getKvStateLocationRegistry();
final KvStateLocation location = registry.getKvStateLocation(registrationName);
if (location != null) {
return CompletableFuture.completedFuture(location);
} else {
return FutureUtils.completedExceptionally(new UnknownKvStateLocation(registrationName));
}
} else {
if (log.isDebugEnabled()) {
log.debug("Request of key-value state location for unknown job {} received.", jobId);
}
return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
}
}
@Test
public void testGenerateScopeDefault() throws Exception {
TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
TaskMetricGroup taskGroup = new TaskMetricGroup(
registry, jmGroup, new JobVertexID(), new AbstractID(), "aTaskName", 11, 0);
OperatorMetricGroup opGroup = new OperatorMetricGroup(registry, taskGroup, new OperatorID(), "myOpName");
assertArrayEquals(
new String[] { "theHostName", "taskmanager", "test-tm-id", "myJobName", "myOpName", "11" },
opGroup.getScopeComponents());
assertEquals(
"theHostName.taskmanager.test-tm-id.myJobName.myOpName.11.name",
opGroup.getMetricIdentifier("name"));
}
@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider,
MetricGroup metricGroup,
@Nonnull Collection<KeyedStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry) {
return new MockKeyedStateBackendBuilder<>(
new KvStateRegistry().createTaskRegistry(jobID, new JobVertexID()),
keySerializer,
env.getUserClassLoader(),
numberOfKeyGroups,
keyGroupRange,
env.getExecutionConfig(),
ttlTimeProvider,
stateHandles,
AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),
cancelStreamRegistry).build();
}
/**
* Tests the completion of pending task manager slots by registering a TaskExecutor.
*/
@Test
public void testPendingTaskManagerSlotCompletion() throws Exception {
final int numberSlots = 3;
final TestingResourceActions resourceActions = new TestingResourceActionsBuilder()
.setAllocateResourceFunction(convert(value -> numberSlots))
.build();
try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions)) {
final JobID jobId = new JobID();
assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true));
assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots));
assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(1));
assertThat(slotManager.getNumberRegisteredSlots(), is(0));
final TaskExecutorConnection taskExecutorConnection = createTaskExecutorConnection();
final SlotReport slotReport = createSlotReport(taskExecutorConnection.getResourceID(), numberSlots - 1);
slotManager.registerTaskManager(taskExecutorConnection, slotReport);
assertThat(slotManager.getNumberRegisteredSlots(), is(numberSlots - 1));
assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(1));
}
}
@Override
public Collection<JobID> getJobIds() throws Exception {
Collection<String> paths;
LOG.debug("Retrieving all stored job ids from ZooKeeper under {}.", zooKeeperFullBasePath);
try {
paths = jobGraphsInZooKeeper.getAllPaths();
} catch (Exception e) {
throw new Exception("Failed to retrieve entry paths from ZooKeeperStateHandleStore.", e);
}
List<JobID> jobIds = new ArrayList<>(paths.size());
for (String path : paths) {
try {
jobIds.add(jobIdFromPath(path));
} catch (Exception exception) {
LOG.warn("Could not parse job id from {}. This indicates a malformed path.", path, exception);
}
}
return jobIds;
}
@Test
public void testSavepointCompletedSuccessfully() throws Exception {
final TestingRestfulGateway testingRestfulGateway = new TestingRestfulGateway.Builder()
.setTriggerSavepointFunction((JobID jobId, String targetDirectory) -> CompletableFuture.completedFuture(COMPLETED_SAVEPOINT_EXTERNAL_POINTER))
.build();
final TriggerId triggerId = savepointTriggerHandler.handleRequest(
triggerSavepointRequest(),
testingRestfulGateway).get().getTriggerId();
AsynchronousOperationResult<SavepointInfo> savepointResponseBody;
savepointResponseBody = savepointStatusHandler.handleRequest(
savepointStatusRequest(triggerId),
testingRestfulGateway).get();
assertThat(
savepointResponseBody.queueStatus().getId(),
equalTo(QueueStatus.Id.COMPLETED));
assertThat(savepointResponseBody.resource(), notNullValue());
assertThat(
savepointResponseBody.resource().getLocation(),
equalTo(COMPLETED_SAVEPOINT_EXTERNAL_POINTER));
}
/**
* This method tests that hooks with the same identifier are not registered
* multiple times.
*/
@Test
public void testDeduplicateOnRegister() {
final CheckpointCoordinator cc = instantiateCheckpointCoordinator(new JobID());
MasterTriggerRestoreHook<?> hook1 = mock(MasterTriggerRestoreHook.class);
when(hook1.getIdentifier()).thenReturn("test id");
MasterTriggerRestoreHook<?> hook2 = mock(MasterTriggerRestoreHook.class);
when(hook2.getIdentifier()).thenReturn("test id");
MasterTriggerRestoreHook<?> hook3 = mock(MasterTriggerRestoreHook.class);
when(hook3.getIdentifier()).thenReturn("anotherId");
assertTrue(cc.addMasterHook(hook1));
assertFalse(cc.addMasterHook(hook2));
assertTrue(cc.addMasterHook(hook3));
}
/**
* Tests concurrent calls to {@link PermanentBlobCache#getStorageLocation(JobID, BlobKey)}.
*/
@Test
public void testPermanentBlobCacheGetStorageLocationConcurrentForJob() throws Exception {
final JobID jobId = new JobID();
final Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
try (BlobServer server = new BlobServer(config, new VoidBlobStore());
final PermanentBlobCache cache = new PermanentBlobCache(
config, new VoidBlobStore(), new InetSocketAddress("localhost", server.getPort())
)) {
server.start();
BlobKey key = new PermanentBlobKey();
CheckedThread[] threads = new CheckedThread[] {
new PermanentBlobCacheGetStorageLocation(cache, jobId, key),
new PermanentBlobCacheGetStorageLocation(cache, jobId, key),
new PermanentBlobCacheGetStorageLocation(cache, jobId, key)
};
checkedThreadSimpleTest(threads);
}
}
@Test
public void testSavepointCompletedWithException() throws Exception {
TestingRestfulGateway testingRestfulGateway = new TestingRestfulGateway.Builder()
.setTriggerSavepointFunction((JobID jobId, String directory) -> FutureUtils.completedExceptionally(new RuntimeException("expected")))
.build();
final TriggerId triggerId = savepointTriggerHandler.handleRequest(
triggerSavepointRequest(),
testingRestfulGateway).get().getTriggerId();
final AsynchronousOperationResult<SavepointInfo> savepointResponseBody = savepointStatusHandler.handleRequest(
savepointStatusRequest(triggerId),
testingRestfulGateway).get();
assertThat(savepointResponseBody.queueStatus().getId(), equalTo(QueueStatus.Id.COMPLETED));
assertThat(savepointResponseBody.resource(), notNullValue());
assertThat(savepointResponseBody.resource().getFailureCause(), notNullValue());
final Throwable savepointError = savepointResponseBody.resource()
.getFailureCause()
.deserializeError(ClassLoader.getSystemClassLoader());
assertThat(savepointError.getMessage(), equalTo("expected"));
assertThat(savepointError, instanceOf(RuntimeException.class));
}
@Test
public void testGenerateScopeCustomWildcard() throws Exception {
Configuration cfg = new Configuration();
cfg.setString(MetricOptions.SCOPE_NAMING_JM, "peter");
cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "*.some-constant.<job_id>");
MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
JobID jid = new JobID();
JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName");
JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, jid, "myJobName");
assertArrayEquals(
new String[] { "peter", "some-constant", jid.toString() },
jmGroup.getScopeComponents());
assertEquals(
"peter.some-constant." + jid + ".name",
jmGroup.getMetricIdentifier("name"));
registry.shutdown().get();
}
private void disassociateFromJobManager(JobTable.Connection jobManagerConnection, Exception cause) throws IOException {
checkNotNull(jobManagerConnection);
final JobID jobId = jobManagerConnection.getJobId();
// cleanup remaining partitions once all tasks for this job have completed
scheduleResultPartitionCleanup(jobId);
final KvStateRegistry kvStateRegistry = kvStateService.getKvStateRegistry();
if (kvStateRegistry != null) {
kvStateRegistry.unregisterListener(jobId);
}
final KvStateClientProxy kvStateClientProxy = kvStateService.getKvStateClientProxy();
if (kvStateClientProxy != null) {
kvStateClientProxy.updateKvStateLocationOracle(jobManagerConnection.getJobId(), null);
}
JobMasterGateway jobManagerGateway = jobManagerConnection.getJobManagerGateway();
jobManagerGateway.disconnectTaskManager(getResourceID(), cause);
}
protected AbstractCheckpointMessage(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) {
if (job == null || taskExecutionId == null) {
throw new NullPointerException();
}
this.job = job;
this.taskExecutionId = taskExecutionId;
this.checkpointId = checkpointId;
}
@Override
public void executionStatusChanged(
JobID jobID, JobVertexID vertexID,
String taskName, int taskParallelism, int subtaskIndex,
ExecutionAttemptID executionID, ExecutionState newExecutionState,
long timestamp, String optionalMessage) {
ExecutionGraphMessages.ExecutionStateChanged message =
new ExecutionGraphMessages.ExecutionStateChanged(
jobID, vertexID, taskName, taskParallelism, subtaskIndex,
executionID, newExecutionState, timestamp, optionalMessage);
target.tell(message);
}
@Override
public synchronized SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
if (recoveryFailure != null) {
throw recoveryFailure;
} else {
return super.recoverJobGraph(jobId);
}
}
private ArchivedExecutionGraph loadExecutionGraph(JobID jobId) throws IOException, ClassNotFoundException {
final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);
if (archivedExecutionGraphFile.exists()) {
try (FileInputStream fileInputStream = new FileInputStream(archivedExecutionGraphFile)) {
return InstantiationUtil.deserializeObject(fileInputStream, getClass().getClassLoader());
}
} else {
throw new FileNotFoundException("Could not find file for archived execution graph " + jobId +
". This indicates that the file either has been deleted or never written.");
}
}
@Override
public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
synchronized (lock) {
checkNotShutdown();
return new StandaloneLeaderElectionService();
}
}
/**
* Creates a {@link ZooKeeperCheckpointIDCounter} instance.
*
* @param client The {@link CuratorFramework} ZooKeeper client to use
* @param configuration {@link Configuration} object
* @param jobId ID of job to create the instance for
* @return {@link ZooKeeperCheckpointIDCounter} instance
*/
public static ZooKeeperCheckpointIDCounter createCheckpointIDCounter(
CuratorFramework client,
Configuration configuration,
JobID jobId) {
String checkpointIdCounterPath = configuration.getString(
HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
checkpointIdCounterPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
return new ZooKeeperCheckpointIDCounter(client, checkpointIdCounterPath);
}
@Test
public void testSnapshotUtilsLifecycle() throws Exception {
StreamOperator<Void> operator = new LifecycleOperator();
CheckpointStorageWorkerView storage = new MockStateBackend().createCheckpointStorage(new JobID());
Path path = new Path(folder.newFolder().getAbsolutePath());
SnapshotUtils.snapshot(operator, 0, 0L, true, false, storage, path);
Assert.assertEquals(EXPECTED_CALL_OPERATOR_SNAPSHOT, ACTUAL_ORDER_TRACKING);
}
@Override
public void declineCheckpoint(
JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
Throwable reason) {
DeclineCheckpoint decline = new DeclineCheckpoint(
jobID,
executionAttemptID,
checkpointId,
reason);
actorGateway.tell(decline);
}
/**
* Creates a new checkpoint storage.
*
* @param jobId The ID of the job that writes the checkpoints.
* @param defaultSavepointDirectory The default location for savepoints, or null, if none is set.
*/
protected AbstractFsCheckpointStorage(
JobID jobId,
@Nullable Path defaultSavepointDirectory) {
this.jobId = checkNotNull(jobId);
this.defaultSavepointDirectory = defaultSavepointDirectory;
}
@Test
public void testStopWithMaxWMAndExplicitSavepointDir() throws Exception {
JobID jid = new JobID();
String[] parameters = { "-d", "-p", "test-target-dir", jid.toString() };
final ClusterClient<String> clusterClient = createClusterClient(null);
MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
testFrontend.stop(parameters);
Mockito.verify(clusterClient, times(1))
.stopWithSavepoint(eq(jid), eq(true), eq("test-target-dir"));
}
/**
* Helper to choose the right {@link BlobServer#putTransient} method.
*
* @param blobType
* whether the BLOB should become permanent or transient
*
* @return blob key for the uploaded data
*/
static BlobKey put(BlobService service, @Nullable JobID jobId, InputStream data, BlobKey.BlobType blobType)
throws IOException {
if (blobType == PERMANENT_BLOB) {
if (service instanceof BlobServer) {
return ((BlobServer) service).putPermanent(jobId, data);
} else {
throw new UnsupportedOperationException("uploading streams is only possible at the BlobServer");
}
} else if (jobId == null) {
return service.getTransientBlobService().putTransient(data);
} else {
return service.getTransientBlobService().putTransient(jobId, data);
}
}
public void updateAllocation(AllocationID allocationId, JobID jobId) {
Preconditions.checkState(state == State.FREE, "The slot has to be free in order to set an allocation id.");
state = State.ALLOCATED;
this.allocationId = Preconditions.checkNotNull(allocationId);
this.jobId = Preconditions.checkNotNull(jobId);
}
public JobConfigInfo(
JobID jobId,
String jobName,
@Nullable ExecutionConfigInfo executionConfigInfo) {
this.jobId = Preconditions.checkNotNull(jobId);
this.jobName = Preconditions.checkNotNull(jobName);
this.executionConfigInfo = executionConfigInfo;
}
private static ClusterClient<String> createClusterClient() throws Exception {
final ClusterClient<String> clusterClient = mock(ClusterClient.class);
when(clusterClient.listJobs()).thenReturn(CompletableFuture.completedFuture(Arrays.asList(
new JobStatusMessage(new JobID(), "job1", JobStatus.RUNNING, 1L),
new JobStatusMessage(new JobID(), "job2", JobStatus.CREATED, 1L),
new JobStatusMessage(new JobID(), "job3", JobStatus.FINISHED, 3L)
)));
return clusterClient;
}
/**
* If the file doesn't exists locally, retrieve the file from the blob-service.
*
* @param entry The cache entry descriptor (path, executable flag)
* @param jobID The ID of the job for which the file is copied.
* @return The handle to the task that copies the file.
*/
public Future<Path> createTmpFile(String name, DistributedCacheEntry entry, JobID jobID, ExecutionAttemptID executionId) throws Exception {
synchronized (lock) {
Map<String, Future<Path>> jobEntries = entries.computeIfAbsent(jobID, k -> new HashMap<>());
// register reference holder
final Set<ExecutionAttemptID> refHolders = jobRefHolders.computeIfAbsent(jobID, id -> new HashSet<>());
refHolders.add(executionId);
Future<Path> fileEntry = jobEntries.get(name);
if (fileEntry != null) {
// file is already in the cache. return a future that
// immediately returns the file
return fileEntry;
} else {
// need to copy the file
// create the target path
File tempDirToUse = new File(storageDirectories[nextDirectory++], jobID.toString());
if (nextDirectory >= storageDirectories.length) {
nextDirectory = 0;
}
// kick off the copying
Callable<Path> cp;
if (entry.blobKey != null) {
cp = new CopyFromBlobProcess(entry, jobID, blobService, new Path(tempDirToUse.getAbsolutePath()));
} else {
cp = new CopyFromDFSProcess(entry, new Path(tempDirToUse.getAbsolutePath()));
}
FutureTask<Path> copyTask = new FutureTask<>(cp);
executorService.submit(copyTask);
// store our entry
jobEntries.put(name, copyTask);
return copyTask;
}
}
}
@Nullable
@Override
public JobDetails getAvailableJobDetails(JobID jobId) {
final ArchivedExecutionGraph archivedExecutionGraph = serializableExecutionGraphs.get(jobId);
if (archivedExecutionGraph != null) {
return WebMonitorUtils.createDetailsForJob(archivedExecutionGraph);
} else {
return null;
}
}
@Test
public void testGenerateScopeDefault() {
TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
JobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
assertArrayEquals(
new String[] { "theHostName", "taskmanager", "test-tm-id", "myJobName"},
jmGroup.getScopeComponents());
assertEquals(
"theHostName.taskmanager.test-tm-id.myJobName.name",
jmGroup.getMetricIdentifier("name"));
}
public static ExecutionGraph createExecutionGraph(
JobID jid,
SlotProvider slotProvider,
RestartStrategy restartStrategy,
ScheduledExecutorService executor,
Time timeout,
JobVertex... vertices) throws Exception {
checkNotNull(jid);
checkNotNull(restartStrategy);
checkNotNull(vertices);
checkNotNull(timeout);
return ExecutionGraphBuilder.buildGraph(
null,
new JobGraph(jid, "test job", vertices),
new Configuration(),
executor,
executor,
slotProvider,
ExecutionGraphTestUtils.class.getClassLoader(),
new StandaloneCheckpointRecoveryFactory(),
timeout,
restartStrategy,
new UnregisteredMetricsGroup(),
1,
VoidBlobWriter.getInstance(),
timeout,
TEST_LOGGER);
}
@Override
public void setJobRunning(JobID jobID) {
checkNotNull(jobID);
synchronized (jobStatus) {
jobStatus.put(jobID, JobSchedulingStatus.RUNNING);
}
}