下面列出了怎么用com.google.api.client.util.NanoClock的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Reset backoff. If duration is limited, calculate time remaining, otherwise just reset retry
* count.
*
* <p>If a total duration for all backoff has been set, update the new cumulative sleep time to be
* the remaining total backoff duration, stopping if we have already exceeded the allotted time.
*/
private static BackOff resetBackoff(Duration duration, NanoClock nanoClock, long startNanos) {
BackOff backoff;
if (duration.isLongerThan(Duration.ZERO)) {
long nanosConsumed = nanoClock.nanoTime() - startNanos;
Duration consumed = Duration.millis((nanosConsumed + 999999) / 1000000);
Duration remaining = duration.minus(consumed);
if (remaining.isLongerThan(Duration.ZERO)) {
backoff = getMessagesBackoff(remaining);
} else {
backoff = BackOff.STOP_BACKOFF;
}
} else {
backoff = getMessagesBackoff(duration);
}
return backoff;
}
/**
* Visible for testing.
*
* @param nanoClock used as a timing source for knowing how much time has elapsed.
* @param sleeper used to sleep between retries.
* @param additionalIgnoredResponseCodes a list of HTTP status codes that should not be logged.
*/
RetryHttpRequestInitializer(
NanoClock nanoClock,
Sleeper sleeper,
Collection<Integer> additionalIgnoredResponseCodes,
HttpResponseInterceptor responseInterceptor) {
this.nanoClock = nanoClock;
this.sleeper = sleeper;
this.ignoredResponseCodes.addAll(additionalIgnoredResponseCodes);
this.responseInterceptor = responseInterceptor;
this.writeTimeout = 0;
}
/**
* Waits until the pipeline finishes and returns the final status.
*
* @param duration The time to wait for the job to finish. Provide a value less than 1 ms for an
* infinite wait.
* @param messageHandler If non null this handler will be invoked for each batch of messages
* received.
* @return The final state of the job or null on timeout or if the thread is interrupted.
* @throws IOException If there is a persistent problem getting job information.
*/
@Nullable
@VisibleForTesting
public State waitUntilFinish(Duration duration, MonitoringUtil.JobMessagesHandler messageHandler)
throws IOException, InterruptedException {
// We ignore the potential race condition here (Ctrl-C after job submission but before the
// shutdown hook is registered). Even if we tried to do something smarter (eg., SettableFuture)
// the run method (which produces the job) could fail or be Ctrl-C'd before it had returned a
// job. The display of the command to cancel the job is best-effort anyways -- RPC's could fail,
// etc. If the user wants to verify the job was cancelled they should look at the job status.
Thread shutdownHook =
new Thread(
() ->
LOG.warn(
"Job is already running in Google Cloud Platform, Ctrl-C will not cancel it.\n"
+ "To cancel the job in the cloud, run:\n> {}",
MonitoringUtil.getGcloudCancelCommand(dataflowOptions, getJobId())));
try {
Runtime.getRuntime().addShutdownHook(shutdownHook);
return waitUntilFinish(
duration,
messageHandler,
Sleeper.DEFAULT,
NanoClock.SYSTEM,
new MonitoringUtil(dataflowClient));
} finally {
Runtime.getRuntime().removeShutdownHook(shutdownHook);
}
}
@Nullable
@VisibleForTesting
State waitUntilFinish(
Duration duration,
@Nullable MonitoringUtil.JobMessagesHandler messageHandler,
Sleeper sleeper,
NanoClock nanoClock)
throws IOException, InterruptedException {
return waitUntilFinish(
duration, messageHandler, sleeper, nanoClock, new MonitoringUtil(dataflowClient));
}
@Nullable
@VisibleForTesting
State waitUntilFinish(
Duration duration,
MonitoringUtil.JobMessagesHandler messageHandler,
Sleeper sleeper,
NanoClock nanoClock) {
throw new UnsupportedOperationException(ERROR);
}
/** Tests that a {@link DataflowPipelineJob} does not duplicate messages. */
@Test
public void testWaitUntilFinishNoRepeatedLogs() throws Exception {
DataflowPipelineJob job = new DataflowPipelineJob(mockDataflowClient, JOB_ID, options, null);
Sleeper sleeper = new ZeroSleeper();
NanoClock nanoClock = mock(NanoClock.class);
Instant separatingTimestamp = new Instant(42L);
JobMessage theMessage = infoMessage(separatingTimestamp, "nothing");
MonitoringUtil mockMonitor = mock(MonitoringUtil.class);
when(mockMonitor.getJobMessages(anyString(), anyLong()))
.thenReturn(ImmutableList.of(theMessage));
// The Job just always reports "running" across all calls
Job fakeJob = new Job();
fakeJob.setCurrentState("JOB_STATE_RUNNING");
when(mockDataflowClient.getJob(anyString())).thenReturn(fakeJob);
// After waitUntilFinish the DataflowPipelineJob should record the latest message timestamp
when(nanoClock.nanoTime()).thenReturn(0L).thenReturn(2000000000L);
job.waitUntilFinish(Duration.standardSeconds(1), mockHandler, sleeper, nanoClock, mockMonitor);
verify(mockHandler).process(ImmutableList.of(theMessage));
// Second waitUntilFinish should request jobs with `separatingTimestamp` so the monitor
// will only return new messages
when(nanoClock.nanoTime()).thenReturn(3000000000L).thenReturn(6000000000L);
job.waitUntilFinish(Duration.standardSeconds(1), mockHandler, sleeper, nanoClock, mockMonitor);
verify(mockMonitor).getJobMessages(anyString(), eq(separatingTimestamp.getMillis()));
}
/**
* @param additionalIgnoredResponseCodes a list of HTTP status codes that should not be logged.
* @param responseInterceptor HttpResponseInterceptor to be applied on all requests. May be null.
*/
public RetryHttpRequestInitializer(
Collection<Integer> additionalIgnoredResponseCodes,
@Nullable HttpResponseInterceptor responseInterceptor) {
this(NanoClock.SYSTEM, Sleeper.DEFAULT, additionalIgnoredResponseCodes, responseInterceptor);
}
/**
* Waits until the pipeline finishes and returns the final status.
*
* @param duration The time to wait for the job to finish. Provide a value less than 1 ms for an
* infinite wait.
* @param messageHandler If non null this handler will be invoked for each batch of messages
* received.
* @param sleeper A sleeper to use to sleep between attempts.
* @param nanoClock A nanoClock used to time the total time taken.
* @return The final state of the job or null on timeout.
* @throws IOException If there is a persistent problem getting job information.
* @throws InterruptedException if the thread is interrupted.
*/
@Nullable
@VisibleForTesting
State waitUntilFinish(
Duration duration,
@Nullable MonitoringUtil.JobMessagesHandler messageHandler,
Sleeper sleeper,
NanoClock nanoClock,
MonitoringUtil monitor)
throws IOException, InterruptedException {
BackOff backoff = getMessagesBackoff(duration);
// This function tracks the cumulative time from the *first request* to enforce the wall-clock
// limit. Any backoff instance could, at best, track the the time since the first attempt at a
// given request. Thus, we need to track the cumulative time ourselves.
long startNanos = nanoClock.nanoTime();
State state = State.UNKNOWN;
Exception exception;
do {
exception = null;
try {
// Get the state of the job before listing messages. This ensures we always fetch job
// messages after the job finishes to ensure we have all them.
state =
getStateWithRetries(
BackOffAdapter.toGcpBackOff(STATUS_BACKOFF_FACTORY.withMaxRetries(0).backoff()),
sleeper);
} catch (IOException e) {
exception = e;
LOG.warn("Failed to get job state: {}", e.getMessage());
LOG.debug("Failed to get job state: {}", e);
continue;
}
exception = processJobMessages(messageHandler, monitor);
if (exception != null) {
continue;
}
// We can stop if the job is done.
if (state.isTerminal()) {
logTerminalState(state);
return state;
}
// Reset attempts count and update cumulative wait time.
backoff = resetBackoff(duration, nanoClock, startNanos);
} while (BackOffUtils.next(sleeper, backoff));
// At this point Backoff decided that we retried enough times.
// This can be either due to exceeding allowed timeout for job to complete, or receiving
// error multiple times in a row.
if (exception == null) {
LOG.warn("No terminal state was returned within allotted timeout. State value {}", state);
} else {
LOG.error("Failed to fetch job metadata with error: {}", exception);
}
return null;
}
public FastNanoClockAndFuzzySleeper() {
fastNanoTime = NanoClock.SYSTEM.nanoTime();
}
/** Sets the clock to be used for determining when max total time has elapsed doing retries. */
@VisibleForTesting
void setNanoClock(NanoClock clock) {
Preconditions.checkArgument(clock != null, "clock must not be null!");
this.clock = clock;
}
@Test
public void testOpenExceptionsDuringReadTotalElapsedTimeTooGreat() throws Exception {
IOException readException1 = new IOException("read IOException #1");
IOException readException2 = new IOException("read IOException #2");
NanoClock spyNanoClock = spy(NanoClock.class);
when(spyNanoClock.nanoTime())
.thenReturn(
Duration.ofMillis(1).toNanos(),
Duration.ofMillis(2).toNanos(),
Duration.ofMillis(3).toNanos(),
Duration.ofMillis(3).plusMillis(DEFAULT_BACKOFF_MAX_ELAPSED_TIME_MILLIS).toNanos());
StorageObject storageObject = newStorageObject(BUCKET_NAME, OBJECT_NAME);
MockHttpTransport transport =
mockTransport(
jsonDataResponse(storageObject),
inputStreamResponse(CONTENT_LENGTH, 1, new ThrowingInputStream(readException1)),
inputStreamResponse(CONTENT_LENGTH, 1, new ThrowingInputStream(readException2)));
GoogleCloudStorage gcs = mockedGcs(transport);
GoogleCloudStorageReadChannel readChannel =
(GoogleCloudStorageReadChannel) gcs.open(new StorageResourceId(BUCKET_NAME, OBJECT_NAME));
readChannel.setNanoClock(spyNanoClock);
assertThat(readChannel.isOpen()).isTrue();
assertThat(readChannel.position()).isEqualTo(0);
IOException thrown =
assertThrows(IOException.class, () -> readChannel.read(ByteBuffer.allocate(1)));
assertThat(thrown).hasMessageThat().isEqualTo("read IOException #2");
assertThat(trackingHttpRequestInitializer.getAllRequestStrings())
.containsExactly(
getRequestString(BUCKET_NAME, OBJECT_NAME),
getMediaRequestString(BUCKET_NAME, OBJECT_NAME, storageObject.getGeneration()),
getMediaRequestString(BUCKET_NAME, OBJECT_NAME, storageObject.getGeneration()))
.inOrder();
}
/**
* Returns the nano clock.
*
* @since 1.14
*/
public final NanoClock getNanoClock() {
return exponentialBackOffBuilder.getNanoClock();
}
/**
* Sets the nano clock ({@link NanoClock#SYSTEM} by default).
*
* <p>Overriding is only supported for the purpose of calling the super implementation and
* changing the return type, but nothing else.
*
* @since 1.14
*/
public Builder setNanoClock(NanoClock nanoClock) {
exponentialBackOffBuilder.setNanoClock(nanoClock);
return this;
}