java.util.concurrent.CompletableFuture#handle ( )源码实例Demo

下面列出了java.util.concurrent.CompletableFuture#handle ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: pravega   文件: JwtTokenProviderImpl.java
@VisibleForTesting
CompletableFuture<String> refreshToken() {
    long traceEnterId = LoggerHelpers.traceEnter(log, "refreshToken", this.scopeName, this.streamName);
    CompletableFuture<Void> currentRefreshFuture = tokenRefreshFuture.get();
    if (currentRefreshFuture == null) {
        log.debug("Initiating token refresh for scope {} and stream {}", this.scopeName, this.streamName);
        currentRefreshFuture = this.recreateToken();
        this.tokenRefreshFuture.compareAndSet(null, currentRefreshFuture);
    } else {
        log.debug("Token is already under refresh for scope {} and stream {}", this.scopeName, this.streamName);
    }

    final CompletableFuture<Void> handleToCurrentRefreshFuture  = currentRefreshFuture;
    return currentRefreshFuture.handle((v, ex) -> {
        this.tokenRefreshFuture.compareAndSet(handleToCurrentRefreshFuture, null);
        LoggerHelpers.traceLeave(log, "refreshToken", traceEnterId, this.scopeName, this.streamName);
        if (ex != null) {
            log.warn("Encountered an exception in when refreshing token for scope {} and stream {}",
                    this.scopeName, this.streamName, Exceptions.unwrap(ex));
            throw ex instanceof CompletionException ? (CompletionException) ex : new CompletionException(ex);
        } else {
            return delegationToken.get().getValue();
        }
    });
}
 
源代码2 项目: copycat   文件: OrderedCompletableFutureTest.java
/**
 * Tests ordered failure of future callbacks.
 */
public void testOrderedFailure() throws Throwable {
  CompletableFuture<String> future = new OrderedCompletableFuture<>();
  AtomicInteger order = new AtomicInteger();
  future.whenComplete((r, e) -> assertEquals(1, order.incrementAndGet()));
  future.whenComplete((r, e) -> assertEquals(2, order.incrementAndGet()));
  future.handle((r, e) -> {
    assertEquals(3, order.incrementAndGet());
    return "bar";
  });
  future.thenRun(() -> fail());
  future.thenAccept(r -> fail());
  future.exceptionally(e -> {
    assertEquals(3, order.incrementAndGet());
    return "bar";
  });
  future.completeExceptionally(new RuntimeException("foo"));
}
 
源代码3 项目: quarkus   文件: VertxBlockingOutput.java
@Override
public CompletionStage<Void> writeNonBlocking(ByteBuf data, boolean last) {
    CompletableFuture<Void> ret = new CompletableFuture<>();
    if (last && data == null) {
        request.response().end(handler(ret));
        return ret;
    }
    Buffer buffer = createBuffer(data);
    if (last) {
        request.response().end(buffer, handler(ret));
    } else {
        request.response().write(buffer, handler(ret));
    }
    return ret.handle((v, t) -> {
        if (t != null) {
            if (data != null && data.refCnt() > 0) {
                data.release();
            }
            rethrow(new IOException("Failed to write", t));
        }

        return v;
    });
}
 
源代码4 项目: tutorials   文件: HttpClientUnitTest.java
@Test
public void completeExceptionallyExample() {
    CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
            CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
    CompletableFuture<String> exceptionHandler = cf.handle((s, th) -> { return (th != null) ? "message upon cancel" : ""; });
    cf.completeExceptionally(new RuntimeException("completed exceptionally"));
    assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
    try {
        cf.join();
        fail("Should have thrown an exception");
    } catch (CompletionException ex) { // just for testing
        assertEquals("completed exceptionally", ex.getCause().getMessage());
    }

    assertEquals("message upon cancel", exceptionHandler.join());
}
 
源代码5 项目: onos   文件: ProtectionConfigMonitor.java
private void addProtection(DeviceId did, ProtectionConfig added) {
    ProtectedTransportEndpointDescription description = added.asDescription();
    log.info("adding protection {}-{}", did, description);

    ProtectionConfigBehaviour behaviour = getBehaviour(did);


    CompletableFuture<ConnectPoint> result;
    result = behaviour.createProtectionEndpoint(description);
    result.handle((vPort, e) -> {
        if (vPort != null) {
            log.info("Virtual Port {} created for {}", vPort, description);
            log.debug("{}", deviceService.getPort(vPort));
        } else {
            log.error("Protection {} exceptionally failed.", added, e);
        }
        return vPort;
    });
}
 
源代码6 项目: samza   文件: AsyncSystemProducer.java
/**
 * {@inheritDoc}
 */
@Override
public synchronized void send(String source, OutgoingMessageEnvelope envelope) {
  checkForSendCallbackErrors("Received exception on message send");

  String streamName = envelope.getSystemStream().getStream();
  String streamId = physicalToStreamIds.getOrDefault(streamName, streamName);

  long beforeSendTimeMs = System.currentTimeMillis();

  CompletableFuture<Void> sendResult = sendAsync(source, envelope);

  long afterSendTimeMs = System.currentTimeMillis();

  long latencyMs = afterSendTimeMs - beforeSendTimeMs;
  sendLatency.get(streamId).update(latencyMs);
  aggSendLatency.update(latencyMs);

  pendingFutures.add(sendResult);

  // Auto update the metrics and possible throwable when futures are complete.
  sendResult.handle((aVoid, throwable) -> {
    long callbackLatencyMs = System.currentTimeMillis() - afterSendTimeMs;
    sendCallbackLatency.get(streamId).update(callbackLatencyMs);
    aggSendCallbackLatency.update(callbackLatencyMs);
    if (throwable != null) {
      sendErrors.get(streamId).inc();
      aggSendErrors.inc();
      LOG.error("Send message to event hub: {} failed with exception: ", streamId, throwable);
      sendExceptionOnCallback.compareAndSet(null, throwable);
    }
    return aVoid;
  });
}
 
public static <T> CompletableFuture<T> tap(CompletableFuture<T> input, BiConsumer<T, Throwable> tapper) {
  CompletableFuture<T> outcome = new CompletableFuture<>();
  input.handle((result, throwable) -> {
    tapper.accept(result, throwable);
    completeSomehow(outcome, result, throwable);
    return null;
  });
  return outcome;
}
 
源代码8 项目: java-dataloader   文件: DataLoaderTest.java
@Test
public void should_Clear_values_from_cache_after_errors() {
    List<Collection<Integer>> loadCalls = new ArrayList<>();
    DataLoader<Integer, Integer> errorLoader = idLoaderBlowsUps(new DataLoaderOptions(), loadCalls);

    CompletableFuture<Integer> future1 = errorLoader.load(1);
    future1.handle((value, t) -> {
        if (t != null) {
            // Presumably determine if this error is transient, and only clear the cache in that case.
            errorLoader.clear(1);
        }
        return null;
    });
    errorLoader.dispatch();

    await().until(future1::isDone);
    assertThat(future1.isCompletedExceptionally(), is(true));
    assertThat(cause(future1), instanceOf(IllegalStateException.class));

    CompletableFuture<Integer> future2 = errorLoader.load(1);
    future2.handle((value, t) -> {
        if (t != null) {
            // Again, only do this if you can determine the error is transient.
            errorLoader.clear(1);
        }
        return null;
    });
    errorLoader.dispatch();

    await().until(future2::isDone);
    assertThat(future2.isCompletedExceptionally(), is(true));
    assertThat(cause(future2), instanceOf(IllegalStateException.class));
    assertThat(loadCalls, equalTo(asList(singletonList(1), singletonList(1))));
}
 
源代码9 项目: centraldogma   文件: CentralDogmaServiceImpl.java
private static void handleAsVoidResult(CompletableFuture<?> future, AsyncMethodCallback resultHandler) {
    future.handle((res, cause) -> {
        if (cause != null) {
            resultHandler.onError(convert(cause));
        } else {
            resultHandler.onComplete(null);
        }
        return null;
    });
}
 
源代码10 项目: NioSmtpClient   文件: SmtpSessionTest.java
@Test
public void itExecutesReturnedExceptionalFuturesOnTheProvidedExecutor() {
  ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("SmtpSessionTestExecutor").build());
  SmtpSession session = new SmtpSession(channel, responseHandler, CONFIG, executorService, SSL_ENGINE_SUPPLIER);

  CompletableFuture<SmtpClientResponse> future = session.send(SMTP_REQUEST);
  CompletableFuture<Boolean> assertionFuture = future.handle((r, e) -> {
    assertThat(Thread.currentThread().getName()).contains("SmtpSessionTestExecutor");
    return true;
  });

  responseFuture.completeExceptionally(new RuntimeException());
  assertionFuture.join();
}
 
源代码11 项目: articles   文件: CompletableFutures.java
public static <T> CompletableFuture<List<T>> allOfOrException(Collection<CompletableFuture<T>> futures) {
    CompletableFuture<List<T>> result = allOf(futures);

    for (CompletableFuture<?> f : futures) {
        f.handle((__, ex) -> ex == null || result.completeExceptionally(ex));
    }

    return result;
}
 
源代码12 项目: articles   文件: ParallelStreams.java
static <T> CompletableFuture<List<T>> allOfOrException(Collection<CompletableFuture<T>> futures) {
    CompletableFuture<List<T>> result = futures.stream()
      .collect(collectingAndThen(
        toList(),
        l -> CompletableFuture.allOf(l.toArray(new CompletableFuture[0]))
          .thenApply(__1 -> l.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList()))));

    for (CompletableFuture<?> f : futures) {
        f.handle((__, ex) -> ex == null || result.completeExceptionally(ex));
    }

    return result;
}
 
源代码13 项目: styx   文件: HttpPipelineHandler.java
private void respondAndClose(ChannelHandlerContext ctx, LiveHttpResponse response) {
    HttpResponseWriter responseWriter = responseWriterFactory.create(ctx);
    CompletableFuture<Void> future = responseWriter.write(response);
    future.handle((ignore, reason) -> {
        if (future.isCompletedExceptionally()) {
            LOGGER.error(warningMessage("message='Unable to send error', response=" + reason));
        }
        ctx.close();
        return null;
    });
}
 
源代码14 项目: flink   文件: Execution.java
/**
 * Allocates and assigns a slot obtained from the slot provider to the execution.
 *
 * @param slotProviderStrategy to obtain a new slot from
 * @param locationPreferenceConstraint constraint for the location preferences
 * @param allPreviousExecutionGraphAllocationIds set with all previous allocation ids in the job graph.
 *                                                 Can be empty if the allocation ids are not required for scheduling.
 * @return Future which is completed with the allocated slot once it has been assigned
 * 			or with an exception if an error occurred.
 */
private CompletableFuture<LogicalSlot> allocateAndAssignSlotForExecution(
		SlotProviderStrategy slotProviderStrategy,
		LocationPreferenceConstraint locationPreferenceConstraint,
		@Nonnull Set<AllocationID> allPreviousExecutionGraphAllocationIds) {

	checkNotNull(slotProviderStrategy);

	assertRunningInJobMasterMainThread();

	final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();
	final CoLocationConstraint locationConstraint = vertex.getLocationConstraint();

	// sanity check
	if (locationConstraint != null && sharingGroup == null) {
		throw new IllegalStateException(
				"Trying to schedule with co-location constraint but without slot sharing allowed.");
	}

	// this method only works if the execution is in the state 'CREATED'
	if (transitionState(CREATED, SCHEDULED)) {

		final SlotSharingGroupId slotSharingGroupId = sharingGroup != null ? sharingGroup.getSlotSharingGroupId() : null;

		ScheduledUnit toSchedule = locationConstraint == null ?
				new ScheduledUnit(this, slotSharingGroupId) :
				new ScheduledUnit(this, slotSharingGroupId, locationConstraint);

		// try to extract previous allocation ids, if applicable, so that we can reschedule to the same slot
		ExecutionVertex executionVertex = getVertex();
		AllocationID lastAllocation = executionVertex.getLatestPriorAllocation();

		Collection<AllocationID> previousAllocationIDs =
			lastAllocation != null ? Collections.singletonList(lastAllocation) : Collections.emptyList();

		// calculate the preferred locations
		final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture =
			calculatePreferredLocations(locationPreferenceConstraint);

		final SlotRequestId slotRequestId = new SlotRequestId();

		final CompletableFuture<LogicalSlot> logicalSlotFuture =
			preferredLocationsFuture.thenCompose(
				(Collection<TaskManagerLocation> preferredLocations) -> {
					LOG.info("Allocating slot with SlotRequestID {} for the execution attempt {}.", slotRequestId, attemptId);
					return slotProviderStrategy.allocateSlot(
						slotRequestId,
						toSchedule,
						SlotProfile.priorAllocation(
							vertex.getResourceProfile(),
							getPhysicalSlotResourceProfile(vertex),
							preferredLocations,
							previousAllocationIDs,
							allPreviousExecutionGraphAllocationIds));
				});

		// register call back to cancel slot request in case that the execution gets canceled
		releaseFuture.whenComplete(
			(Object ignored, Throwable throwable) -> {
				if (logicalSlotFuture.cancel(false)) {
					slotProviderStrategy.cancelSlotRequest(
						slotRequestId,
						slotSharingGroupId,
						new FlinkException("Execution " + this + " was released."));
				}
			});

		// This forces calls to the slot pool back into the main thread, for normal and exceptional completion
		return logicalSlotFuture.handle(
			(LogicalSlot logicalSlot, Throwable failure) -> {

				if (failure != null) {
					throw new CompletionException(failure);
				}

				if (tryAssignResource(logicalSlot)) {
					return logicalSlot;
				} else {
					// release the slot
					logicalSlot.releaseSlot(new FlinkException("Could not assign logical slot to execution " + this + '.'));
					throw new CompletionException(
						new FlinkException(
							"Could not assign slot " + logicalSlot + " to execution " + this + " because it has already been assigned "));
				}
			});
	} else {
		// call race, already deployed, or already done
		throw new IllegalExecutionStateException(this, CREATED, state);
	}
}
 
源代码15 项目: armeria   文件: RefreshingAddressResolver.java
@Override
protected void doResolve(InetSocketAddress unresolvedAddress, Promise<InetSocketAddress> promise)
        throws Exception {
    requireNonNull(unresolvedAddress, "unresolvedAddress");
    requireNonNull(promise, "promise");
    if (resolverClosed) {
        promise.tryFailure(new IllegalStateException("resolver is closed already."));
        return;
    }
    final String hostname = unresolvedAddress.getHostString();
    final int port = unresolvedAddress.getPort();
    final CompletableFuture<CacheEntry> entryFuture = cache.get(hostname);
    if (entryFuture != null) {
        handleFromCache(entryFuture, promise, port);
        return;
    }

    final CompletableFuture<CacheEntry> result = new CompletableFuture<>();
    final CompletableFuture<CacheEntry> previous = cache.putIfAbsent(hostname, result);
    if (previous != null) {
        handleFromCache(previous, promise, port);
        return;
    }

    final List<DnsQuestion> questions =
            dnsRecordTypes.stream()
                          .map(type -> DnsQuestionWithoutTrailingDot.of(hostname, type))
                          .collect(toImmutableList());
    sendQueries(questions, hostname, result);
    result.handle((entry, unused) -> {
        final Throwable cause = entry.cause();
        if (cause != null) {
            if (entry.hasCacheableCause() && negativeTtl > 0) {
                executor().schedule(() -> cache.remove(hostname), negativeTtl, TimeUnit.SECONDS);
            } else {
                cache.remove(hostname);
            }
            promise.tryFailure(cause);
            return null;
        }

        entry.scheduleRefresh(entry.ttlMillis());
        promise.trySuccess(new InetSocketAddress(entry.address(), port));
        return null;
    });
}
 
源代码16 项目: flink   文件: JobCancellationHandler.java
@Override
public CompletableFuture<EmptyResponseBody> handleRequest(HandlerRequest<EmptyRequestBody, JobCancellationMessageParameters> request, RestfulGateway gateway) throws RestHandlerException {
	final JobID jobId = request.getPathParameter(JobIDPathParameter.class);
	final List<TerminationModeQueryParameter.TerminationMode> terminationModes = request.getQueryParameter(TerminationModeQueryParameter.class);
	final TerminationModeQueryParameter.TerminationMode terminationMode;

	if (terminationModes.isEmpty()) {
		terminationMode = defaultTerminationMode;
	} else {
		// picking the first termination mode value
		terminationMode = terminationModes.get(0);
	}

	final CompletableFuture<Acknowledge> terminationFuture;

	switch (terminationMode) {
		case CANCEL:
			terminationFuture = gateway.cancelJob(jobId, timeout);
			break;
		case STOP:
			throw new RestHandlerException("The termination mode \"stop\" has been removed. For " +
			"an ungraceful shutdown, please use \"cancel\" instead. For a graceful shutdown, " +
			"please use \"jobs/:jobId/stop\" instead." , HttpResponseStatus.PERMANENT_REDIRECT);
		default:
			terminationFuture = FutureUtils.completedExceptionally(new RestHandlerException("Unknown termination mode " + terminationMode + '.', HttpResponseStatus.BAD_REQUEST));
	}

	return terminationFuture.handle(
		(Acknowledge ack, Throwable throwable) -> {
			if (throwable != null) {
				Throwable error = ExceptionUtils.stripCompletionException(throwable);

				if (error instanceof TimeoutException) {
					throw new CompletionException(
						new RestHandlerException(
							"Job cancellation timed out.",
							HttpResponseStatus.REQUEST_TIMEOUT, error));
				} else if (error instanceof FlinkJobNotFoundException) {
					throw new CompletionException(
						new RestHandlerException(
							"Job could not be found.",
							HttpResponseStatus.NOT_FOUND, error));
				} else {
					throw new CompletionException(
						new RestHandlerException(
							"Job cancellation failed: " + error.getMessage(),
							HttpResponseStatus.INTERNAL_SERVER_ERROR, error));
				}
			} else {
				return EmptyResponseBody.getInstance();
			}
		});
}
 
源代码17 项目: j2objc   文件: CompletableFutureTest.java
public <T,U> CompletableFuture<U> handle
    (CompletableFuture<T> f,
     BiFunction<? super T,Throwable,? extends U> a) {
    return f.handle(a);
}
 
源代码18 项目: samza   文件: AzureBlobOutputStream.java
/**
 * This api will async upload the outputstream into block using stageBlocks,
 * reint outputstream
 * and add the operation to future.
 * @throws RuntimeException when
 *            - blob's stageBlock fails after MAX_ATTEMPTs
 *            - number of blocks exceeds MAX_BLOCKS_IN_AZURE_BLOB
 */
private synchronized void uploadBlockAsync() {
  if (!byteArrayOutputStream.isPresent()) {
    return;
  }
  long size = byteArrayOutputStream.get().size();
  if (size == 0) {
    return;
  }
  LOG.info("Blob: {} uploadBlock. Size:{}", blobAsyncClient.getBlobUrl().toString(), size);

  // Azure sdk requires block Id to be encoded and all blockIds of a blob to be of the same length
  // also, a block blob can have upto 50,000 blocks, hence using a 5 digit block id.
  String blockId = String.format("%05d", blockNum);
  String blockIdEncoded = Base64.getEncoder().encodeToString(blockId.getBytes());
  blockList.add(blockIdEncoded);
  byte[] localByte = byteArrayOutputStream.get().toByteArray();
  byteArrayOutputStream.get().reset();
  totalUploadedBlockSize += localByte.length;

  CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
    // call async stageblock and add to future
    @Override
    public void run() {
      int attemptCount = 0;
      byte[] compressedLocalByte = compression.compress(localByte);
      int blockSize = compressedLocalByte.length;

      while (attemptCount < MAX_ATTEMPT) {
        try {
          ByteBuffer outputStream = ByteBuffer.wrap(compressedLocalByte, 0, blockSize);
          metrics.updateCompressByteMetrics(blockSize);
          LOG.info("{} Upload block start for blob: {} for block size:{}.", blobAsyncClient.getBlobUrl().toString(), blockId, blockSize);
          metrics.updateAzureUploadMetrics();
          // StageBlock generates exception on Failure.
          stageBlock(blockIdEncoded, outputStream, blockSize);
          break;
        } catch (Exception e) {
          attemptCount += 1;
          String msg = "Upload block for blob: " + blobAsyncClient.getBlobUrl().toString()
              + " failed for blockid: " + blockId + " due to exception. AttemptCount: " + attemptCount;
          LOG.error(msg, e);
          if (attemptCount == MAX_ATTEMPT) {
            throw new AzureException("Exceeded number of attempts. Max attempts is: " + MAX_ATTEMPT, e);
          }
        }
      }
    }
  }, blobThreadPool);

  pendingUpload.add(future);

  future.handle((aVoid, throwable) -> {
    if (throwable == null) {
      LOG.info("Upload block for blob: {} with blockid: {} finished.", blobAsyncClient.getBlobUrl().toString(), blockId);
      pendingUpload.remove(future);
      return aVoid;
    } else {
      throw new AzureException("Blob upload failed for blob " + blobAsyncClient.getBlobUrl().toString()
          + " and block with id: " + blockId, throwable);
    }
  });

  blockNum += 1;
  if (blockNum >= MAX_BLOCKS_IN_AZURE_BLOB) {
    throw new AzureException("Azure blob only supports 50000 blocks in a blob. Current number of blocks is " + blockNum);
  }
}
 
源代码19 项目: centraldogma   文件: Repository.java
/**
 * Merges the JSON files sequentially as specified in the {@link MergeQuery}.
 */
default <T> CompletableFuture<MergedEntry<T>> mergeFiles(Revision revision, MergeQuery<T> query) {
    requireNonNull(revision, "revision");
    requireNonNull(query, "query");

    final List<MergeSource> mergeSources = query.mergeSources();
    // Only JSON files can currently be merged.
    mergeSources.forEach(path -> validateJsonFilePath(path.path(), "path"));

    final Revision normalizedRevision;
    try {
        normalizedRevision = normalizeNow(revision);
    } catch (Exception e) {
        return CompletableFutures.exceptionallyCompletedFuture(e);
    }
    final List<CompletableFuture<Entry<?>>> entryFutures = new ArrayList<>(mergeSources.size());
    mergeSources.forEach(path -> {
        if (!path.isOptional()) {
            entryFutures.add(get(normalizedRevision, path.path()));
        } else {
            entryFutures.add(getOrNull(normalizedRevision, path.path()));
        }
    });

    final CompletableFuture<MergedEntry<?>> mergedEntryFuture = mergeEntries(entryFutures, revision,
                                                                             query);
    final CompletableFuture<MergedEntry<T>> future = new CompletableFuture<>();
    mergedEntryFuture.handle((mergedEntry, cause) -> {
        if (cause != null) {
            if (!(cause instanceof CentralDogmaException)) {
                cause = new QueryExecutionException(cause);
            }
            future.completeExceptionally(cause);
            return null;
        }
        future.complete(unsafeCast(mergedEntry));
        return null;
    });

    return future;
}
 
源代码20 项目: Flink-CEPplus   文件: FutureUtils.java
/**
 * This function takes a {@link CompletableFuture} and a handler function for the result of this future. If the
 * input future is already done, this function returns {@link CompletableFuture#handle(BiFunction)}. Otherwise,
 * the return value is {@link CompletableFuture#handleAsync(BiFunction, Executor)} with the given executor.
 *
 * @param completableFuture the completable future for which we want to call #handle.
 * @param executor the executor to run the handle function if the future is not yet done.
 * @param handler the handler function to call when the future is completed.
 * @param <IN> type of the handler input argument.
 * @param <OUT> type of the handler return value.
 * @return the new completion stage.
 */
public static <IN, OUT> CompletableFuture<OUT> handleAsyncIfNotDone(
	CompletableFuture<IN> completableFuture,
	Executor executor,
	BiFunction<? super IN, Throwable, ? extends OUT> handler) {
	return completableFuture.isDone() ?
		completableFuture.handle(handler) :
		completableFuture.handleAsync(handler, executor);
}