java.util.concurrent.CompletableFuture#exceptionally ( )源码实例Demo

下面列出了java.util.concurrent.CompletableFuture#exceptionally ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: cyclops   文件: SimpleReactStream.java
/**
 * @param collector
 *            to perform aggregation / reduction operation on the results
 *            from active stage (e.g. to Collect into a List or String)
 * @param fn
 *            Function that receives the results of all currently active
 *            tasks as input
 * @return A new builder object that can be used to define the next stage in
 *         the dataflow
 */
@SuppressWarnings({ "unchecked", "rawtypes" })
default <R1, R2> SimpleReactStream<R2> allOf(final Collector<? super U, ?, R1> collector, final Function<? super R1, ? extends R2> fn) {

    final CompletableFuture[] array = lastActiveArray(getLastActive());
    final CompletableFuture cf = CompletableFuture.allOf(array);
    final Function<Exception, R2> f = (final Exception e) -> {
        BlockingStreamHelper.capture(e, getErrorHandler());
        return BlockingStreamHelper.block(this, Collectors.toList(), new EagerStreamWrapper(
                                                                                            Stream.of(array), getErrorHandler()));
    };
    final CompletableFuture onFail = cf.exceptionally(f);
    final CompletableFuture onSuccess = onFail.thenApplyAsync((result) -> {
        return new StageWithResults(
                                    getTaskExecutor(), null,
                                    result).submit(() -> fn.apply(BlockingStreamHelper.aggregateResultsCompletable(collector, Stream.of(array)
                                                                                                                                    .collect(Collectors.toList()),
                                                                                                                   getErrorHandler())));
    } , getTaskExecutor());
    return (SimpleReactStream<R2>) withLastActive(new EagerStreamWrapper(
                                                                         onSuccess, getErrorHandler()));

}
 
源代码2 项目: Flink-CEPplus   文件: Dispatcher.java
@Override
public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, Time timeout) {
	final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);

	final CompletableFuture<ArchivedExecutionGraph> archivedExecutionGraphFuture = jobMasterGatewayFuture.thenCompose(
		(JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJob(timeout));

	return archivedExecutionGraphFuture.exceptionally(
		(Throwable throwable) -> {
			final ArchivedExecutionGraph serializableExecutionGraph = archivedExecutionGraphStore.get(jobId);

			// check whether it is a completed job
			if (serializableExecutionGraph == null) {
				throw new CompletionException(ExceptionUtils.stripCompletionException(throwable));
			} else {
				return serializableExecutionGraph;
			}
		});
}
 
源代码3 项目: Flink-CEPplus   文件: Execution.java
/**
 * Sends stop RPC call.
 */
public void stop() {
	assertRunningInJobMasterMainThread();
	final LogicalSlot slot = assignedResource;

	if (slot != null) {
		final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

		CompletableFuture<Acknowledge> stopResultFuture = FutureUtils.retry(
			() -> taskManagerGateway.stopTask(attemptId, rpcTimeout),
			NUM_STOP_CALL_TRIES,
			vertex.getExecutionGraph().getJobMasterMainThreadExecutor());

		stopResultFuture.exceptionally(
			failure -> {
				LOG.info("Stopping task was not successful.", failure);
				return null;
			});
	}
}
 
源代码4 项目: flink   文件: Dispatcher.java
@Override
public CompletableFuture<JobStatus> requestJobStatus(JobID jobId, Time timeout) {

	final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);

	final CompletableFuture<JobStatus> jobStatusFuture = jobMasterGatewayFuture.thenCompose(
		(JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJobStatus(timeout));

	return jobStatusFuture.exceptionally(
		(Throwable throwable) -> {
			final JobDetails jobDetails = archivedExecutionGraphStore.getAvailableJobDetails(jobId);

			// check whether it is a completed job
			if (jobDetails == null) {
				throw new CompletionException(ExceptionUtils.stripCompletionException(throwable));
			} else {
				return jobDetails.getStatus();
			}
		});
}
 
源代码5 项目: flink   文件: Dispatcher.java
@Override
public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, Time timeout) {
	final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);

	final CompletableFuture<ArchivedExecutionGraph> archivedExecutionGraphFuture = jobMasterGatewayFuture.thenCompose(
		(JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJob(timeout));

	return archivedExecutionGraphFuture.exceptionally(
		(Throwable throwable) -> {
			final ArchivedExecutionGraph serializableExecutionGraph = archivedExecutionGraphStore.get(jobId);

			// check whether it is a completed job
			if (serializableExecutionGraph == null) {
				throw new CompletionException(ExceptionUtils.stripCompletionException(throwable));
			} else {
				return serializableExecutionGraph;
			}
		});
}
 
源代码6 项目: JDA   文件: GuildImpl.java
@Nonnull
@Override
@CheckReturnValue
public Task<List<Member>> retrieveMembersByPrefix(@Nonnull String prefix, int limit)
{
    Checks.notEmpty(prefix, "Prefix");
    Checks.positive(limit, "Limit");
    Checks.check(limit <= 100, "Limit must not be greater than 100");
    MemberChunkManager chunkManager = api.getClient().getChunkManager();

    List<Member> collect = new ArrayList<>(limit);
    CompletableFuture<List<Member>> result = new CompletableFuture<>();
    CompletableFuture<Void> handle = chunkManager.chunkGuild(this, prefix, limit, (last, list) -> {
        collect.addAll(list);
        if (last)
            result.complete(collect);
    });

    result.exceptionally(ex -> {
        WebSocketClient.LOG.error("Encountered exception trying to handle member chunk response", ex);
        return null;
    });

    return new GatewayTask<>(result, () -> handle.cancel(false));
}
 
源代码7 项目: flink   文件: Dispatcher.java
@Override
public CompletableFuture<JobStatus> requestJobStatus(JobID jobId, Time timeout) {

	final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);

	final CompletableFuture<JobStatus> jobStatusFuture = jobMasterGatewayFuture.thenCompose(
		(JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJobStatus(timeout));

	return jobStatusFuture.exceptionally(
		(Throwable throwable) -> {
			final JobDetails jobDetails = archivedExecutionGraphStore.getAvailableJobDetails(jobId);

			// check whether it is a completed job
			if (jobDetails == null) {
				throw new CompletionException(ExceptionUtils.stripCompletionException(throwable));
			} else {
				return jobDetails.getStatus();
			}
		});
}
 
@Test
public void forwardExceptionShouldPropagate() throws Exception {
    final String value = UUID.randomUUID().toString();

    SettableFuture<String> lf = SettableFuture.create();
    CompletableFuture<String> cf = MoreFutures.toCompletableFuture(lf);

    Exception intentionalException = new IntentionalException();
    final AtomicReference<Throwable> foundException = new AtomicReference<>();

    cf = cf.exceptionally(ex -> {
        foundException.set(ex);
        return value;
    });

    lf.setException(intentionalException);

    assertThat(cf).isDone();
    assertThat(cf).isNotCancelled();
    assertThat(cf).isNotCompletedExceptionally();

    assertThat(cf).isCompletedWithValue(value);
    assertThat(foundException.get()).isSameAs(intentionalException);
}
 
源代码9 项目: copycat   文件: OrderedCompletableFutureTest.java
/**
 * Tests ordered failure of future callbacks.
 */
public void testOrderedFailure() throws Throwable {
  CompletableFuture<String> future = new OrderedCompletableFuture<>();
  AtomicInteger order = new AtomicInteger();
  future.whenComplete((r, e) -> assertEquals(1, order.incrementAndGet()));
  future.whenComplete((r, e) -> assertEquals(2, order.incrementAndGet()));
  future.handle((r, e) -> {
    assertEquals(3, order.incrementAndGet());
    return "bar";
  });
  future.thenRun(() -> fail());
  future.thenAccept(r -> fail());
  future.exceptionally(e -> {
    assertEquals(3, order.incrementAndGet());
    return "bar";
  });
  future.completeExceptionally(new RuntimeException("foo"));
}
 
源代码10 项目: besu   文件: FastSyncDownloader.java
private CompletableFuture<FastSyncState> downloadChainAndWorldState(
    final FastSyncState currentState) {
  // Synchronized ensures that stop isn't called while we're in the process of starting a
  // world state and chain download. If it did we might wind up starting a new download
  // after the stop method had called cancel.
  synchronized (this) {
    if (!running.get()) {
      return CompletableFuture.failedFuture(
          new CancellationException("FastSyncDownloader stopped"));
    }
    final CompletableFuture<Void> worldStateFuture =
        worldStateDownloader.run(currentState.getPivotBlockHeader().get());
    final ChainDownloader chainDownloader = fastSyncActions.createChainDownloader(currentState);
    final CompletableFuture<Void> chainFuture = chainDownloader.start();

    // If either download fails, cancel the other one.
    chainFuture.exceptionally(
        error -> {
          worldStateFuture.cancel(true);
          return null;
        });
    worldStateFuture.exceptionally(
        error -> {
          chainDownloader.cancel();
          return null;
        });

    return CompletableFuture.allOf(worldStateFuture, chainFuture)
        .thenApply(
            complete -> {
              trailingPeerRequirements = Optional.empty();
              return currentState;
            });
  }
}
 
源代码11 项目: besu   文件: WorldStateDownloadProcess.java
public CompletableFuture<Void> start(final EthScheduler ethScheduler) {
  final CompletableFuture<Void> fetchDataFuture = ethScheduler.startPipeline(fetchDataPipeline);
  final CompletableFuture<Void> completionFuture = ethScheduler.startPipeline(completionPipeline);

  fetchDataFuture.whenComplete(
      (result, error) -> {
        if (error != null) {
          if (!(ExceptionUtils.rootCause(error) instanceof CancellationException)) {
            LOG.error("Pipeline failed", error);
          }
          completionPipeline.abort();
        } else {
          // No more data to fetch, so propagate the pipe closure onto the completion pipe.
          requestsToComplete.close();
        }
      });

  completionFuture.exceptionally(
      error -> {
        if (!(ExceptionUtils.rootCause(error) instanceof CancellationException)) {
          LOG.error("Pipeline failed", error);
        }
        fetchDataPipeline.abort();
        return null;
      });
  return completionFuture;
}
 
源代码12 项目: besu   文件: FastSyncDownloaderTest.java
private <T> void assertCompletedExceptionally(
    final CompletableFuture<T> future, final FastSyncError expectedError) {
  assertThat(future).isCompletedExceptionally();
  future.exceptionally(
      actualError -> {
        assertThat(actualError)
            .isInstanceOf(FastSyncException.class)
            .extracting(ex -> ((FastSyncException) ex).getError())
            .isEqualTo(expectedError);
        return null;
      });
}
 
源代码13 项目: besu   文件: FutureUtils.java
/**
 * Propagates cancellation, and only cancellation, from one future to another.
 *
 * <p>When <code>from</code> is completed with a {@link
 * java.util.concurrent.CancellationException} {@link java.util.concurrent.Future#cancel(boolean)}
 * will be called on <code>to</code>, allowing interruption if the future is currently running.
 *
 * @param from the CompletableFuture to take cancellation from
 * @param to the CompletableFuture to propagate cancellation to
 */
public static void propagateCancellation(
    final CompletableFuture<?> from, final CompletableFuture<?> to) {
  from.exceptionally(
      error -> {
        if (error instanceof CancellationException) {
          to.cancel(true);
        }
        return null;
      });
}
 
源代码14 项目: enode   文件: JDBCPublishedVersionStore.java
@Override
public CompletableFuture<Integer> getPublishedVersionAsync(String processorName, String aggregateRootTypeName, String aggregateRootId) {
    CompletableFuture<Integer> future = new CompletableFuture<>();
    String sql = String.format(SELECT_SQL, tableName);
    JsonArray array = new JsonArray();
    array.add(processorName);
    array.add(aggregateRootId);
    sqlClient.querySingleWithParams(sql, array, x -> {
        if (x.succeeded()) {
            int result = 0;
            if (x.result() != null && x.result().size() > 0) {
                result = x.result().getInteger(0);
            }
            future.complete(result);
            return;
        }
        future.completeExceptionally(x.cause());
    });
    return future.exceptionally(throwable -> {
        if (throwable instanceof SQLException) {
            SQLException ex = (SQLException) throwable;
            logger.error("Get aggregate published version has sql exception.", ex);
            throw new IORuntimeException(throwable);
        }
        logger.error("Get aggregate published version has unknown exception.", throwable);
        throw new EnodeRuntimeException(throwable);
    });
}
 
源代码15 项目: JDA   文件: GuildImpl.java
@Nonnull
@Override
public Task<List<Member>> retrieveMembersByIds(boolean includePresence, @Nonnull long... ids)
{
    Checks.notNull(ids, "ID Array");
    Checks.check(!includePresence || api.isIntent(GatewayIntent.GUILD_PRESENCES),
            "Cannot retrieve presences of members without GUILD_PRESENCES intent!");

    if (ids.length == 0)
        return new GatewayTask<>(CompletableFuture.completedFuture(Collections.emptyList()), () -> {});
    Checks.check(ids.length <= 100, "You can only request 100 members at once");
    MemberChunkManager chunkManager = api.getClient().getChunkManager();
    List<Member> collect = new ArrayList<>(ids.length);
    CompletableFuture<List<Member>> result = new CompletableFuture<>();
    CompletableFuture<Void> handle = chunkManager.chunkGuild(this, includePresence, ids, (last, list) -> {
        collect.addAll(list);
        if (last)
            result.complete(collect);
    });

    result.exceptionally(ex -> {
        WebSocketClient.LOG.error("Encountered exception trying to handle member chunk response", ex);
        return null;
    });

    return new GatewayTask<>(result, () -> handle.cancel(false));
}
 
源代码16 项目: fabric-sdk-java   文件: EndorserClient.java
public CompletableFuture<Protocol.Response> sendDiscoveryRequestAsync(Protocol.SignedRequest signedRequest) {
    if (shutdown) {
        CompletableFuture<Protocol.Response> ret = new CompletableFuture<>();
        ret.completeExceptionally(new PeerException("Shutdown " + toString()));
        return ret;
    }

    CompletableFuture<Protocol.Response> future = CompletableFuturesExtra.toCompletableFuture(discoveryFutureStub.discover(signedRequest));
    return future.exceptionally(throwable -> {
        throw new CompletionException(format("%s %s", toString, throwable.getMessage()), throwable);
    });

}
 
源代码17 项目: cyclops   文件: FastFuture.java
public FastFuture<T> populateFromCompletableFuture(final CompletableFuture<T> cf) {
    cf.thenAccept(i -> this.set(i));
    cf.exceptionally(t -> {
        completedExceptionally(t);
        return join();
    });
    return this;
}
 
源代码18 项目: enode   文件: MongoPublishedVersionStore.java
@Override
public CompletableFuture<Integer> getPublishedVersionAsync(String processorName, String aggregateRootTypeName, String aggregateRootId) {
    CompletableFuture<Integer> future = new CompletableFuture<>();
    Bson updateFilter = Filters.and(
            Filters.eq("processorName", processorName),
            Filters.eq("aggregateRootId", aggregateRootId)
    );
    mongoClient.getDatabase(configuration.getDatabaseName()).getCollection(configuration.getPublishedVersionCollectionName()).find(updateFilter).subscribe(new Subscriber<Document>() {
        private Integer version = 0;

        @Override
        public void onSubscribe(Subscription s) {
            s.request(1);
        }

        @Override
        public void onNext(Document document) {
            version = document.getInteger("version");
            future.complete(version);
        }

        @Override
        public void onError(Throwable t) {
            future.completeExceptionally(t);
        }

        @Override
        public void onComplete() {
            future.complete(version);
        }
    });
    return future.exceptionally(throwable -> {
        if (throwable instanceof SQLException) {
            SQLException ex = (SQLException) throwable;
            logger.error("Get aggregate published version has sql exception.", ex);
            throw new IORuntimeException(throwable);
        }
        logger.error("Get aggregate published version has unknown exception.", throwable);
        throw new EnodeRuntimeException(throwable);
    });
}
 
源代码19 项目: enode   文件: JDBCPublishedVersionStore.java
@Override
public CompletableFuture<Integer> updatePublishedVersionAsync(String processorName, String aggregateRootTypeName, String aggregateRootId, int publishedVersion) {
    CompletableFuture<Integer> future = new CompletableFuture<>();
    String sql = "";
    boolean isInsert = publishedVersion == 1;
    JsonArray array = new JsonArray();
    if (isInsert) {
        sql = String.format(INSERT_SQL, tableName);
        array.add(processorName);
        array.add(aggregateRootTypeName);
        array.add(aggregateRootId);
        array.add(1);
        array.add(new Date().toInstant());
    } else {
        sql = String.format(UPDATE_SQL, tableName);
        array.add(publishedVersion);
        array.add(new Date().toInstant());
        array.add(processorName);
        array.add(aggregateRootId);
        array.add(publishedVersion - 1);
    }
    sqlClient.updateWithParams(sql, array, x -> {
        if (x.succeeded()) {
            future.complete(x.result().getUpdated());
            return;
        }
        future.completeExceptionally(x.cause());
    });
    return future.exceptionally(throwable -> {
        if (throwable instanceof SQLException) {
            SQLException ex = (SQLException) throwable;
            // insert duplicate return
            if (isInsert && ex.getSQLState().equals(sqlState) && ex.getMessage().contains(uniqueIndexName)) {
                return 0;
            }
            logger.error("Insert or update aggregate published version has sql exception.", ex);
            throw new IORuntimeException(throwable);
        }
        logger.error("Insert or update aggregate published version has unknown exception.", throwable);
        throw new EnodeRuntimeException(throwable);

    });
}
 
源代码20 项目: pulsar   文件: OwnedBundle.java
/**
 * It unloads the bundle by closing all topics concurrently under this bundle.
 *
 * <pre>
 * a. disable bundle ownership in memory and not in zk
 * b. close all the topics concurrently
 * c. delete ownership znode from zookeeper.
 * </pre>
 *
 * @param pulsar
 * @param timeout
 *            timeout for unloading bundle. It doesn't throw exception if it times out while waiting on closing all
 *            topics
 * @param timeoutUnit
 * @throws Exception
 */
public CompletableFuture<Void> handleUnloadRequest(PulsarService pulsar, long timeout, TimeUnit timeoutUnit) {
    long unloadBundleStartTime = System.nanoTime();
    // Need a per namespace RenetrantReadWriteLock
    // Here to do a writeLock to set the flag and proceed to check and close connections
    try {
        while (!this.nsLock.writeLock().tryLock(1, TimeUnit.SECONDS)) {
            // Using tryLock to avoid deadlocks caused by 2 threads trying to acquire 2 readlocks (eg: replicators)
            // while a handleUnloadRequest happens in the middle
            LOG.warn("Contention on OwnedBundle rw lock. Retrying to acquire lock write lock");
        }

        try {
            // set the flag locally s.t. no more producer/consumer to this namespace is allowed
            if (!IS_ACTIVE_UPDATER.compareAndSet(this, TRUE, FALSE)) {
                // An exception is thrown when the namespace is not in active state (i.e. another thread is
                // removing/have removed it)
                return FutureUtil.failedFuture(new IllegalStateException(
                        "Namespace is not active. ns:" + this.bundle + "; state:" + IS_ACTIVE_UPDATER.get(this)));
            }
        } finally {
            // no matter success or not, unlock
            this.nsLock.writeLock().unlock();
        }
    } catch (InterruptedException e) {
        return FutureUtil.failedFuture(e);
    }

    AtomicInteger unloadedTopics = new AtomicInteger();
    LOG.info("Disabling ownership: {}", this.bundle);

    // close topics forcefully
    CompletableFuture<Void> future = pulsar.getNamespaceService().getOwnershipCache()
            .updateBundleState(this.bundle, false)
            .thenCompose(v -> pulsar.getBrokerService().unloadServiceUnit(bundle, true, timeout, timeoutUnit))
            .handle((numUnloadedTopics, ex) -> {
                if (ex != null) {
                    // ignore topic-close failure to unload bundle
                    LOG.error("Failed to close topics under namespace {}", bundle.toString(), ex);
                } else {
                    unloadedTopics.set(numUnloadedTopics);
                }
                return null;
            })
            .thenCompose(v -> {
                // delete ownership node on zk
                return pulsar.getNamespaceService().getOwnershipCache().removeOwnership(bundle);
            }).thenAccept(v -> {
                double unloadBundleTime = TimeUnit.NANOSECONDS
                        .toMillis((System.nanoTime() - unloadBundleStartTime));
                LOG.info("Unloading {} namespace-bundle with {} topics completed in {} ms", this.bundle,
                        unloadedTopics, unloadBundleTime);
            });

    future.exceptionally(ex -> {
        // Failed to remove ownership node: enable namespace-bundle again so, it can serve new topics
        pulsar.getNamespaceService().getOwnershipCache().updateBundleState(this.bundle, true);
        return null;
    });
    return future;
}