下面列出了怎么用org.apache.flink.util.function.SupplierWithException的API类实例代码及写法,或者点击链接到github查看源代码。
@SuppressWarnings("unchecked")
DummyFlinkKafkaConsumer(
SupplierWithException<AbstractFetcher<T, ?>, Exception> testFetcherSupplier,
AbstractPartitionDiscoverer testPartitionDiscoverer,
boolean isAutoCommitEnabled,
long discoveryIntervalMillis,
List<String> topics) {
super(
topics,
null,
(KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class),
discoveryIntervalMillis,
false);
this.testFetcherSupplier = testFetcherSupplier;
this.testPartitionDiscoverer = testPartitionDiscoverer;
this.isAutoCommitEnabled = isAutoCommitEnabled;
}
/**
* Runs the given runnable with the given ClassLoader as the thread's
* {@link Thread#setContextClassLoader(ClassLoader) context class loader}.
*
* <p>The method will make sure to set the context class loader of the calling thread
* back to what it was before after the runnable completed.
*/
public static <R, E extends Throwable> R withContextClassLoader(
final ClassLoader cl,
final SupplierWithException<R, E> s) throws E {
final Thread currentThread = Thread.currentThread();
final ClassLoader oldClassLoader = currentThread.getContextClassLoader();
try {
currentThread.setContextClassLoader(cl);
return s.get();
}
finally {
currentThread.setContextClassLoader(oldClassLoader);
}
}
private SupplierWithException<CheckpointStreamWithResultProvider, Exception> createCheckpointStreamSupplier(
long checkpointId,
CheckpointStreamFactory primaryStreamFactory,
CheckpointOptions checkpointOptions) {
return localRecoveryConfig.isLocalRecoveryEnabled() &&
(CheckpointType.SAVEPOINT != checkpointOptions.getCheckpointType()) ?
() -> CheckpointStreamWithResultProvider.createDuplicatingStream(
checkpointId,
CheckpointedStateScope.EXCLUSIVE,
primaryStreamFactory,
localRecoveryConfig.getLocalStateDirectoryProvider()) :
() -> CheckpointStreamWithResultProvider.createSimpleStream(
CheckpointedStateScope.EXCLUSIVE,
primaryStreamFactory);
}
<SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> TtlValue<V> getWrappedWithTtlCheckAndUpdate(
SupplierWithException<TtlValue<V>, SE> getter,
ThrowingConsumer<TtlValue<V>, CE> updater,
ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
TtlValue<V> ttlValue = getter.get();
if (ttlValue == null) {
return null;
} else if (expired(ttlValue)) {
stateClear.run();
if (!returnExpired) {
return null;
}
} else if (updateTsOnRead) {
updater.accept(rewrapWithNewTs(ttlValue));
}
return ttlValue;
}
/**
* Returns {@code requestValue} if it is not null, otherwise returns the query parameter value
* if it is not null, otherwise returns the default value.
*/
public static <T> T fromRequestBodyOrQueryParameter(
T requestValue,
SupplierWithException<T, RestHandlerException> queryParameterExtractor,
T defaultValue,
Logger log) throws RestHandlerException {
if (requestValue != null) {
return requestValue;
} else {
T queryParameterValue = queryParameterExtractor.get();
if (queryParameterValue != null) {
log.warn("Configuring the job submission via query parameters is deprecated." +
" Please migrate to submitting a JSON request instead.");
return queryParameterValue;
} else {
return defaultValue;
}
}
}
@SuppressWarnings("unchecked")
DummyFlinkKafkaConsumer(
SupplierWithException<AbstractFetcher<T, ?>, Exception> testFetcherSupplier,
AbstractPartitionDiscoverer testPartitionDiscoverer,
boolean isAutoCommitEnabled,
long discoveryIntervalMillis,
List<String> topics) {
super(
topics,
null,
(KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class),
discoveryIntervalMillis,
false);
this.testFetcherSupplier = testFetcherSupplier;
this.testPartitionDiscoverer = testPartitionDiscoverer;
this.isAutoCommitEnabled = isAutoCommitEnabled;
}
private SupplierWithException<CheckpointStreamWithResultProvider, Exception> createCheckpointStreamSupplier(
long checkpointId,
CheckpointStreamFactory primaryStreamFactory,
CheckpointOptions checkpointOptions) {
return localRecoveryConfig.isLocalRecoveryEnabled() && !checkpointOptions.getCheckpointType().isSavepoint() ?
() -> CheckpointStreamWithResultProvider.createDuplicatingStream(
checkpointId,
CheckpointedStateScope.EXCLUSIVE,
primaryStreamFactory,
localRecoveryConfig.getLocalStateDirectoryProvider()) :
() -> CheckpointStreamWithResultProvider.createSimpleStream(
CheckpointedStateScope.EXCLUSIVE,
primaryStreamFactory);
}
<SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> TtlValue<V> getWrappedWithTtlCheckAndUpdate(
SupplierWithException<TtlValue<V>, SE> getter,
ThrowingConsumer<TtlValue<V>, CE> updater,
ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
TtlValue<V> ttlValue = getter.get();
if (ttlValue == null) {
return null;
} else if (expired(ttlValue)) {
stateClear.run();
if (!returnExpired) {
return null;
}
} else if (updateTsOnRead) {
updater.accept(rewrapWithNewTs(ttlValue));
}
return ttlValue;
}
/**
* Returns {@code requestValue} if it is not null, otherwise returns the query parameter value
* if it is not null, otherwise returns the default value.
*/
public static <T> T fromRequestBodyOrQueryParameter(
T requestValue,
SupplierWithException<T, RestHandlerException> queryParameterExtractor,
T defaultValue,
Logger log) throws RestHandlerException {
if (requestValue != null) {
return requestValue;
} else {
T queryParameterValue = queryParameterExtractor.get();
if (queryParameterValue != null) {
log.warn("Configuring the job submission via query parameters is deprecated." +
" Please migrate to submitting a JSON request instead.");
return queryParameterValue;
} else {
return defaultValue;
}
}
}
@SuppressWarnings("unchecked")
DummyFlinkKafkaConsumer(
SupplierWithException<AbstractFetcher<T, ?>, Exception> testFetcherSupplier,
AbstractPartitionDiscoverer testPartitionDiscoverer,
boolean isAutoCommitEnabled,
long discoveryIntervalMillis,
List<String> topics,
KafkaDeserializationSchema<T> mock) {
super(
topics,
null,
mock,
discoveryIntervalMillis,
false);
this.testFetcherSupplier = testFetcherSupplier;
this.testPartitionDiscoverer = testPartitionDiscoverer;
this.isAutoCommitEnabled = isAutoCommitEnabled;
}
TestingExecutor(
List<SupplierWithException<TypedResult<List<Tuple2<Boolean, Row>>>, SqlExecutionException>> resultChanges,
List<SupplierWithException<TypedResult<Integer>, SqlExecutionException>> snapshotResults,
List<SupplierWithException<List<Row>, SqlExecutionException>> resultPages,
BiConsumerWithException<String, String, SqlExecutionException> useCatalogConsumer,
BiConsumerWithException<String, String, SqlExecutionException> useDatabaseConsumer,
BiFunctionWithException<String, String, TableResult, SqlExecutionException> executeSqlConsumer,
TriFunctionWithException<String, String, String, Void, SqlExecutionException> setSessionPropertyFunction,
FunctionWithException<String, Void, SqlExecutionException> resetSessionPropertiesFunction) {
this.resultChanges = resultChanges;
this.snapshotResults = snapshotResults;
this.resultPages = resultPages;
this.useCatalogConsumer = useCatalogConsumer;
this.useDatabaseConsumer = useDatabaseConsumer;
this.executeSqlConsumer = executeSqlConsumer;
this.setSessionPropertyFunction = setSessionPropertyFunction;
this.resetSessionPropertiesFunction = resetSessionPropertiesFunction;
helper = new SqlParserHelper();
helper.registerTables();
}
private SupplierWithException<CheckpointStreamWithResultProvider, Exception> createCheckpointStreamSupplier(
long checkpointId,
CheckpointStreamFactory primaryStreamFactory,
CheckpointOptions checkpointOptions) {
return localRecoveryConfig.isLocalRecoveryEnabled() && !checkpointOptions.getCheckpointType().isSavepoint() ?
() -> CheckpointStreamWithResultProvider.createDuplicatingStream(
checkpointId,
CheckpointedStateScope.EXCLUSIVE,
primaryStreamFactory,
localRecoveryConfig.getLocalStateDirectoryProvider()) :
() -> CheckpointStreamWithResultProvider.createSimpleStream(
CheckpointedStateScope.EXCLUSIVE,
primaryStreamFactory);
}
<SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> TtlValue<V> getWrappedWithTtlCheckAndUpdate(
SupplierWithException<TtlValue<V>, SE> getter,
ThrowingConsumer<TtlValue<V>, CE> updater,
ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
TtlValue<V> ttlValue = getter.get();
if (ttlValue == null) {
return null;
} else if (expired(ttlValue)) {
stateClear.run();
if (!returnExpired) {
return null;
}
} else if (updateTsOnRead) {
updater.accept(rewrapWithNewTs(ttlValue));
}
return ttlValue;
}
/**
* Returns {@code requestValue} if it is not null, otherwise returns the query parameter value
* if it is not null, otherwise returns the default value.
*/
public static <T> T fromRequestBodyOrQueryParameter(
T requestValue,
SupplierWithException<T, RestHandlerException> queryParameterExtractor,
T defaultValue,
Logger log) throws RestHandlerException {
if (requestValue != null) {
return requestValue;
} else {
T queryParameterValue = queryParameterExtractor.get();
if (queryParameterValue != null) {
log.warn("Configuring the job submission via query parameters is deprecated." +
" Please migrate to submitting a JSON request instead.");
return queryParameterValue;
} else {
return defaultValue;
}
}
}
@SuppressWarnings("unchecked")
DummyFlinkKafkaConsumer(
SupplierWithException<AbstractFetcher<T, ?>, Exception> testFetcherSupplier,
AbstractPartitionDiscoverer testPartitionDiscoverer,
boolean isAutoCommitEnabled,
long discoveryIntervalMillis) {
this(
testFetcherSupplier,
testPartitionDiscoverer,
isAutoCommitEnabled,
discoveryIntervalMillis,
Collections.singletonList("dummy-topic")
);
}
private FSDataOutputStream createOutputStream(
final SupplierWithException<FSDataOutputStream, IOException> streamOpener) throws IOException {
final SupplierWithException<OutStream, IOException> wrappedStreamOpener =
() -> new OutStream(streamOpener.get(), this);
return createStream(wrappedStreamOpener, openOutputStreams, true);
}
private FSDataInputStream createInputStream(
final SupplierWithException<FSDataInputStream, IOException> streamOpener) throws IOException {
final SupplierWithException<InStream, IOException> wrappedStreamOpener =
() -> new InStream(streamOpener.get(), this);
return createStream(wrappedStreamOpener, openInputStreams, false);
}
SnapshotAsynchronousPartCallable(
@Nonnull SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier,
@Nonnull ResourceGuard.Lease dbLease,
@Nonnull Snapshot snapshot,
@Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
@Nonnull List<RocksDbKvStateInfo> metaDataCopy,
@Nonnull String logPathString) {
this.checkpointStreamSupplier = checkpointStreamSupplier;
this.dbLease = dbLease;
this.snapshot = snapshot;
this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
this.metaData = fillMetaData(metaDataCopy);
this.logPathString = logPathString;
}
@SuppressWarnings("deprecation")
private Map<Class<? extends StateDescriptor>, SupplierWithException<IS, Exception>> createStateFactories() {
return Stream.of(
Tuple2.of(ValueStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createValueState),
Tuple2.of(ListStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createListState),
Tuple2.of(MapStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createMapState),
Tuple2.of(ReducingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createReducingState),
Tuple2.of(AggregatingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createAggregatingState),
Tuple2.of(FoldingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createFoldingState)
).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
}
@SuppressWarnings("unchecked")
private IS createState() throws Exception {
SupplierWithException<IS, Exception> stateFactory = stateFactories.get(stateDesc.getClass());
if (stateFactory == null) {
String message = String.format("State %s is not supported by %s",
stateDesc.getClass(), TtlStateFactory.class);
throw new FlinkRuntimeException(message);
}
IS state = stateFactory.get();
if (incrementalCleanup != null) {
incrementalCleanup.setTtlState((AbstractTtlState<K, N, ?, TTLSV, ?>) state);
}
return state;
}
<SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> V getWithTtlCheckAndUpdate(
SupplierWithException<TtlValue<V>, SE> getter,
ThrowingConsumer<TtlValue<V>, CE> updater,
ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
TtlValue<V> ttlValue = getWrappedWithTtlCheckAndUpdate(getter, updater, stateClear);
return ttlValue == null ? null : ttlValue.getUserValue();
}
/**
* Returns a future which is completed with the result of the {@link SupplierWithException}.
*
* @param supplier to provide the future's value
* @param executor to execute the supplier
* @param <T> type of the result
* @return Future which is completed with the value of the supplier
*/
public static <T> CompletableFuture<T> supplyAsync(SupplierWithException<T, ?> supplier, Executor executor) {
return CompletableFuture.supplyAsync(
() -> {
try {
return supplier.get();
} catch (Throwable e) {
throw new CompletionException(e);
}
},
executor);
}
private SupplierWithException<Boolean, Exception> jobIsRunning(Supplier<CompletableFuture<? extends AccessExecutionGraph>> executionGraphFutureSupplier) {
final Predicate<AccessExecution> runningOrFinished = ExecutionGraphTestUtils.isInExecutionState(ExecutionState.RUNNING).or(ExecutionGraphTestUtils.isInExecutionState(ExecutionState.FINISHED));
final Predicate<AccessExecutionGraph> allExecutionsRunning = ExecutionGraphTestUtils.allExecutionsPredicate(runningOrFinished);
return () -> {
final AccessExecutionGraph executionGraph = executionGraphFutureSupplier.get().join();
return allExecutionsRunning.test(executionGraph);
};
}
/**
* Executes the given supplier with the main thread executor until completion, returns the result or a exception.
* This method blocks until the execution is complete.
*/
public <U> U execute(@Nonnull SupplierWithException<U, Throwable> supplierWithException) {
return CompletableFuture.supplyAsync(
FunctionUtils.uncheckedSupplier(supplierWithException),
mainThreadExecutor)
.join();
}
public static void waitUntilCondition(SupplierWithException<Boolean, Exception> condition, Deadline timeout, long retryIntervalMillis) throws Exception {
while (timeout.hasTimeLeft() && !condition.get()) {
Thread.sleep(Math.min(retryIntervalMillis, timeout.timeLeft().toMillis()));
}
if (!timeout.hasTimeLeft()) {
throw new TimeoutException("Condition was not met in given timeout.");
}
}
private static <X> X fetchMetric(final SupplierWithException<CompletableFuture<X>, IOException> clientOperation, final Predicate<X> predicate) throws InterruptedException, ExecutionException, TimeoutException {
final CompletableFuture<X> responseFuture = FutureUtils.retrySuccessfulWithDelay(() -> {
try {
return clientOperation.get();
} catch (IOException e) {
throw new RuntimeException(e);
}
},
Time.seconds(1),
Deadline.fromNow(Duration.ofSeconds(5)),
predicate,
new ScheduledExecutorServiceAdapter(scheduledExecutorService));
return responseFuture.get(30, TimeUnit.SECONDS);
}
public DummyFlinkPulsarSource(
SupplierWithException<PulsarFetcher<T>, Exception> testFetcherSupplier,
PulsarMetadataReader discoverer,
Properties properties) {
super("a", "b", mock(DeserializationSchema.class), properties);
this.testFetcherSupplier = testFetcherSupplier;
this.discoverer = discoverer;
}
@SuppressWarnings("unchecked")
DummyFlinkKafkaConsumer(
SupplierWithException<AbstractFetcher<T, ?>, Exception> testFetcherSupplier,
AbstractPartitionDiscoverer testPartitionDiscoverer,
boolean isAutoCommitEnabled,
long discoveryIntervalMillis) {
this(
testFetcherSupplier,
testPartitionDiscoverer,
isAutoCommitEnabled,
discoveryIntervalMillis,
Collections.singletonList("dummy-topic")
);
}
private SupplierWithException<Boolean, Exception> isJobVertexBackPressured(final JobVertex sourceJobVertex) {
return () -> {
final OperatorBackPressureStatsResponse backPressureStatsResponse = dispatcherGateway
.requestOperatorBackPressureStats(TEST_JOB_ID, sourceJobVertex.getID())
.get();
return backPressureStatsResponse.getOperatorBackPressureStats()
.map(backPressureStats -> isBackPressured(backPressureStats))
.orElse(false);
};
}
/**
* Runs the given runnable with the given ClassLoader as the thread's
* {@link Thread#setContextClassLoader(ClassLoader) context class loader}.
*
* <p>The method will make sure to set the context class loader of the calling thread
* back to what it was before after the runnable completed.
*/
public static <R, E extends Throwable> R withContextClassLoader(
final ClassLoader cl,
final SupplierWithException<R, E> s) throws E {
try (TemporaryClassLoaderContext tmpCl = new TemporaryClassLoaderContext(cl)) {
return s.get();
}
}