类org.apache.flink.util.function.SupplierWithException源码实例Demo

下面列出了怎么用org.apache.flink.util.function.SupplierWithException的API类实例代码及写法,或者点击链接到github查看源代码。

@SuppressWarnings("unchecked")
DummyFlinkKafkaConsumer(
	SupplierWithException<AbstractFetcher<T, ?>, Exception> testFetcherSupplier,
	AbstractPartitionDiscoverer testPartitionDiscoverer,
	boolean isAutoCommitEnabled,
	long discoveryIntervalMillis,
	List<String> topics) {

	super(
		topics,
		null,
		(KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class),
		discoveryIntervalMillis,
		false);

	this.testFetcherSupplier = testFetcherSupplier;
	this.testPartitionDiscoverer = testPartitionDiscoverer;
	this.isAutoCommitEnabled = isAutoCommitEnabled;
}
 
源代码2 项目: Flink-CEPplus   文件: LambdaUtil.java
/**
 * Runs the given runnable with the given ClassLoader as the thread's
 * {@link Thread#setContextClassLoader(ClassLoader) context class loader}.
 *
 * <p>The method will make sure to set the context class loader of the calling thread
 * back to what it was before after the runnable completed.
 */
public static <R, E extends Throwable> R withContextClassLoader(
		final ClassLoader cl,
		final SupplierWithException<R, E> s) throws E {

	final Thread currentThread = Thread.currentThread();
	final ClassLoader oldClassLoader = currentThread.getContextClassLoader();

	try {
		currentThread.setContextClassLoader(cl);
		return s.get();
	}
	finally {
		currentThread.setContextClassLoader(oldClassLoader);
	}
}
 
源代码3 项目: Flink-CEPplus   文件: RocksFullSnapshotStrategy.java
private SupplierWithException<CheckpointStreamWithResultProvider, Exception> createCheckpointStreamSupplier(
	long checkpointId,
	CheckpointStreamFactory primaryStreamFactory,
	CheckpointOptions checkpointOptions) {

	return localRecoveryConfig.isLocalRecoveryEnabled() &&
		(CheckpointType.SAVEPOINT != checkpointOptions.getCheckpointType()) ?

		() -> CheckpointStreamWithResultProvider.createDuplicatingStream(
			checkpointId,
			CheckpointedStateScope.EXCLUSIVE,
			primaryStreamFactory,
			localRecoveryConfig.getLocalStateDirectoryProvider()) :

		() -> CheckpointStreamWithResultProvider.createSimpleStream(
			CheckpointedStateScope.EXCLUSIVE,
			primaryStreamFactory);
}
 
源代码4 项目: Flink-CEPplus   文件: AbstractTtlDecorator.java
<SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> TtlValue<V> getWrappedWithTtlCheckAndUpdate(
	SupplierWithException<TtlValue<V>, SE> getter,
	ThrowingConsumer<TtlValue<V>, CE> updater,
	ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
	TtlValue<V> ttlValue = getter.get();
	if (ttlValue == null) {
		return null;
	} else if (expired(ttlValue)) {
		stateClear.run();
		if (!returnExpired) {
			return null;
		}
	} else if (updateTsOnRead) {
		updater.accept(rewrapWithNewTs(ttlValue));
	}
	return ttlValue;
}
 
源代码5 项目: Flink-CEPplus   文件: HandlerRequestUtils.java
/**
 * Returns {@code requestValue} if it is not null, otherwise returns the query parameter value
 * if it is not null, otherwise returns the default value.
 */
public static <T> T fromRequestBodyOrQueryParameter(
		T requestValue,
		SupplierWithException<T, RestHandlerException> queryParameterExtractor,
		T defaultValue,
		Logger log) throws RestHandlerException {
	if (requestValue != null) {
		return requestValue;
	} else {
		T queryParameterValue = queryParameterExtractor.get();
		if (queryParameterValue != null) {
			log.warn("Configuring the job submission via query parameters is deprecated." +
				" Please migrate to submitting a JSON request instead.");
			return queryParameterValue;
		} else {
			return defaultValue;
		}
	}
}
 
源代码6 项目: flink   文件: FlinkKafkaConsumerBaseTest.java
@SuppressWarnings("unchecked")
DummyFlinkKafkaConsumer(
	SupplierWithException<AbstractFetcher<T, ?>, Exception> testFetcherSupplier,
	AbstractPartitionDiscoverer testPartitionDiscoverer,
	boolean isAutoCommitEnabled,
	long discoveryIntervalMillis,
	List<String> topics) {

	super(
		topics,
		null,
		(KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class),
		discoveryIntervalMillis,
		false);

	this.testFetcherSupplier = testFetcherSupplier;
	this.testPartitionDiscoverer = testPartitionDiscoverer;
	this.isAutoCommitEnabled = isAutoCommitEnabled;
}
 
源代码7 项目: flink   文件: RocksFullSnapshotStrategy.java
private SupplierWithException<CheckpointStreamWithResultProvider, Exception> createCheckpointStreamSupplier(
	long checkpointId,
	CheckpointStreamFactory primaryStreamFactory,
	CheckpointOptions checkpointOptions) {

	return localRecoveryConfig.isLocalRecoveryEnabled() && !checkpointOptions.getCheckpointType().isSavepoint() ?

		() -> CheckpointStreamWithResultProvider.createDuplicatingStream(
			checkpointId,
			CheckpointedStateScope.EXCLUSIVE,
			primaryStreamFactory,
			localRecoveryConfig.getLocalStateDirectoryProvider()) :

		() -> CheckpointStreamWithResultProvider.createSimpleStream(
			CheckpointedStateScope.EXCLUSIVE,
			primaryStreamFactory);
}
 
源代码8 项目: flink   文件: AbstractTtlDecorator.java
<SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> TtlValue<V> getWrappedWithTtlCheckAndUpdate(
	SupplierWithException<TtlValue<V>, SE> getter,
	ThrowingConsumer<TtlValue<V>, CE> updater,
	ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
	TtlValue<V> ttlValue = getter.get();
	if (ttlValue == null) {
		return null;
	} else if (expired(ttlValue)) {
		stateClear.run();
		if (!returnExpired) {
			return null;
		}
	} else if (updateTsOnRead) {
		updater.accept(rewrapWithNewTs(ttlValue));
	}
	return ttlValue;
}
 
源代码9 项目: flink   文件: HandlerRequestUtils.java
/**
 * Returns {@code requestValue} if it is not null, otherwise returns the query parameter value
 * if it is not null, otherwise returns the default value.
 */
public static <T> T fromRequestBodyOrQueryParameter(
		T requestValue,
		SupplierWithException<T, RestHandlerException> queryParameterExtractor,
		T defaultValue,
		Logger log) throws RestHandlerException {
	if (requestValue != null) {
		return requestValue;
	} else {
		T queryParameterValue = queryParameterExtractor.get();
		if (queryParameterValue != null) {
			log.warn("Configuring the job submission via query parameters is deprecated." +
				" Please migrate to submitting a JSON request instead.");
			return queryParameterValue;
		} else {
			return defaultValue;
		}
	}
}
 
源代码10 项目: flink   文件: FlinkKafkaConsumerBaseTest.java
@SuppressWarnings("unchecked")
DummyFlinkKafkaConsumer(
	SupplierWithException<AbstractFetcher<T, ?>, Exception> testFetcherSupplier,
	AbstractPartitionDiscoverer testPartitionDiscoverer,
	boolean isAutoCommitEnabled,
	long discoveryIntervalMillis,
	List<String> topics,
	KafkaDeserializationSchema<T> mock) {

	super(
		topics,
		null,
		mock,
		discoveryIntervalMillis,
		false);

	this.testFetcherSupplier = testFetcherSupplier;
	this.testPartitionDiscoverer = testPartitionDiscoverer;
	this.isAutoCommitEnabled = isAutoCommitEnabled;
}
 
源代码11 项目: flink   文件: TestingExecutor.java
TestingExecutor(
		List<SupplierWithException<TypedResult<List<Tuple2<Boolean, Row>>>, SqlExecutionException>> resultChanges,
		List<SupplierWithException<TypedResult<Integer>, SqlExecutionException>> snapshotResults,
		List<SupplierWithException<List<Row>, SqlExecutionException>> resultPages,
		BiConsumerWithException<String, String, SqlExecutionException> useCatalogConsumer,
		BiConsumerWithException<String, String, SqlExecutionException> useDatabaseConsumer,
		BiFunctionWithException<String, String, TableResult, SqlExecutionException> executeSqlConsumer,
		TriFunctionWithException<String, String, String, Void, SqlExecutionException> setSessionPropertyFunction,
		FunctionWithException<String, Void, SqlExecutionException> resetSessionPropertiesFunction) {
	this.resultChanges = resultChanges;
	this.snapshotResults = snapshotResults;
	this.resultPages = resultPages;
	this.useCatalogConsumer = useCatalogConsumer;
	this.useDatabaseConsumer = useDatabaseConsumer;
	this.executeSqlConsumer = executeSqlConsumer;
	this.setSessionPropertyFunction = setSessionPropertyFunction;
	this.resetSessionPropertiesFunction = resetSessionPropertiesFunction;
	helper = new SqlParserHelper();
	helper.registerTables();
}
 
源代码12 项目: flink   文件: RocksFullSnapshotStrategy.java
private SupplierWithException<CheckpointStreamWithResultProvider, Exception> createCheckpointStreamSupplier(
	long checkpointId,
	CheckpointStreamFactory primaryStreamFactory,
	CheckpointOptions checkpointOptions) {

	return localRecoveryConfig.isLocalRecoveryEnabled() && !checkpointOptions.getCheckpointType().isSavepoint() ?

		() -> CheckpointStreamWithResultProvider.createDuplicatingStream(
			checkpointId,
			CheckpointedStateScope.EXCLUSIVE,
			primaryStreamFactory,
			localRecoveryConfig.getLocalStateDirectoryProvider()) :

		() -> CheckpointStreamWithResultProvider.createSimpleStream(
			CheckpointedStateScope.EXCLUSIVE,
			primaryStreamFactory);
}
 
源代码13 项目: flink   文件: AbstractTtlDecorator.java
<SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> TtlValue<V> getWrappedWithTtlCheckAndUpdate(
	SupplierWithException<TtlValue<V>, SE> getter,
	ThrowingConsumer<TtlValue<V>, CE> updater,
	ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
	TtlValue<V> ttlValue = getter.get();
	if (ttlValue == null) {
		return null;
	} else if (expired(ttlValue)) {
		stateClear.run();
		if (!returnExpired) {
			return null;
		}
	} else if (updateTsOnRead) {
		updater.accept(rewrapWithNewTs(ttlValue));
	}
	return ttlValue;
}
 
源代码14 项目: flink   文件: HandlerRequestUtils.java
/**
 * Returns {@code requestValue} if it is not null, otherwise returns the query parameter value
 * if it is not null, otherwise returns the default value.
 */
public static <T> T fromRequestBodyOrQueryParameter(
		T requestValue,
		SupplierWithException<T, RestHandlerException> queryParameterExtractor,
		T defaultValue,
		Logger log) throws RestHandlerException {
	if (requestValue != null) {
		return requestValue;
	} else {
		T queryParameterValue = queryParameterExtractor.get();
		if (queryParameterValue != null) {
			log.warn("Configuring the job submission via query parameters is deprecated." +
				" Please migrate to submitting a JSON request instead.");
			return queryParameterValue;
		} else {
			return defaultValue;
		}
	}
}
 
源代码15 项目: Flink-CEPplus   文件: FlinkKafkaConsumerBaseTest.java
@SuppressWarnings("unchecked")
DummyFlinkKafkaConsumer(
		SupplierWithException<AbstractFetcher<T, ?>, Exception> testFetcherSupplier,
		AbstractPartitionDiscoverer testPartitionDiscoverer,
		boolean isAutoCommitEnabled,
		long discoveryIntervalMillis) {
	this(
		testFetcherSupplier,
		testPartitionDiscoverer,
		isAutoCommitEnabled,
		discoveryIntervalMillis,
		Collections.singletonList("dummy-topic")
		);
}
 
private FSDataOutputStream createOutputStream(
		final SupplierWithException<FSDataOutputStream, IOException> streamOpener) throws IOException {

	final SupplierWithException<OutStream, IOException> wrappedStreamOpener =
			() -> new OutStream(streamOpener.get(), this);

	return createStream(wrappedStreamOpener, openOutputStreams, true);
}
 
private FSDataInputStream createInputStream(
		final SupplierWithException<FSDataInputStream, IOException> streamOpener) throws IOException {

	final SupplierWithException<InStream, IOException> wrappedStreamOpener =
			() -> new InStream(streamOpener.get(), this);

	return createStream(wrappedStreamOpener, openInputStreams, false);
}
 
源代码18 项目: Flink-CEPplus   文件: RocksFullSnapshotStrategy.java
SnapshotAsynchronousPartCallable(
	@Nonnull SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier,
	@Nonnull ResourceGuard.Lease dbLease,
	@Nonnull Snapshot snapshot,
	@Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
	@Nonnull List<RocksDbKvStateInfo> metaDataCopy,
	@Nonnull String logPathString) {

	this.checkpointStreamSupplier = checkpointStreamSupplier;
	this.dbLease = dbLease;
	this.snapshot = snapshot;
	this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
	this.metaData = fillMetaData(metaDataCopy);
	this.logPathString = logPathString;
}
 
源代码19 项目: Flink-CEPplus   文件: TtlStateFactory.java
@SuppressWarnings("deprecation")
private Map<Class<? extends StateDescriptor>, SupplierWithException<IS, Exception>> createStateFactories() {
	return Stream.of(
		Tuple2.of(ValueStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createValueState),
		Tuple2.of(ListStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createListState),
		Tuple2.of(MapStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createMapState),
		Tuple2.of(ReducingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createReducingState),
		Tuple2.of(AggregatingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createAggregatingState),
		Tuple2.of(FoldingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createFoldingState)
	).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
}
 
源代码20 项目: Flink-CEPplus   文件: TtlStateFactory.java
@SuppressWarnings("unchecked")
private IS createState() throws Exception {
	SupplierWithException<IS, Exception> stateFactory = stateFactories.get(stateDesc.getClass());
	if (stateFactory == null) {
		String message = String.format("State %s is not supported by %s",
			stateDesc.getClass(), TtlStateFactory.class);
		throw new FlinkRuntimeException(message);
	}
	IS state = stateFactory.get();
	if (incrementalCleanup != null) {
		incrementalCleanup.setTtlState((AbstractTtlState<K, N, ?, TTLSV, ?>) state);
	}
	return state;
}
 
源代码21 项目: Flink-CEPplus   文件: AbstractTtlDecorator.java
<SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> V getWithTtlCheckAndUpdate(
	SupplierWithException<TtlValue<V>, SE> getter,
	ThrowingConsumer<TtlValue<V>, CE> updater,
	ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
	TtlValue<V> ttlValue = getWrappedWithTtlCheckAndUpdate(getter, updater, stateClear);
	return ttlValue == null ? null : ttlValue.getUserValue();
}
 
源代码22 项目: Flink-CEPplus   文件: FutureUtils.java
/**
 * Returns a future which is completed with the result of the {@link SupplierWithException}.
 *
 * @param supplier to provide the future's value
 * @param executor to execute the supplier
 * @param <T> type of the result
 * @return Future which is completed with the value of the supplier
 */
public static <T> CompletableFuture<T> supplyAsync(SupplierWithException<T, ?> supplier, Executor executor) {
	return CompletableFuture.supplyAsync(
		() -> {
			try {
				return supplier.get();
			} catch (Throwable e) {
				throw new CompletionException(e);
			}
		},
		executor);
}
 
源代码23 项目: Flink-CEPplus   文件: TaskExecutorITCase.java
private SupplierWithException<Boolean, Exception> jobIsRunning(Supplier<CompletableFuture<? extends AccessExecutionGraph>> executionGraphFutureSupplier) {
	final Predicate<AccessExecution> runningOrFinished = ExecutionGraphTestUtils.isInExecutionState(ExecutionState.RUNNING).or(ExecutionGraphTestUtils.isInExecutionState(ExecutionState.FINISHED));
	final Predicate<AccessExecutionGraph> allExecutionsRunning = ExecutionGraphTestUtils.allExecutionsPredicate(runningOrFinished);

	return () -> {
		final AccessExecutionGraph executionGraph = executionGraphFutureSupplier.get().join();
		return allExecutionsRunning.test(executionGraph);
	};
}
 
/**
 * Executes the given supplier with the main thread executor until completion, returns the result or a exception.
 * This method blocks until the execution is complete.
 */
public <U> U execute(@Nonnull SupplierWithException<U, Throwable> supplierWithException) {
	return CompletableFuture.supplyAsync(
		FunctionUtils.uncheckedSupplier(supplierWithException),
		mainThreadExecutor)
		.join();
}
 
源代码25 项目: Flink-CEPplus   文件: CommonTestUtils.java
public static void waitUntilCondition(SupplierWithException<Boolean, Exception> condition, Deadline timeout, long retryIntervalMillis) throws Exception {
	while (timeout.hasTimeLeft() && !condition.get()) {
		Thread.sleep(Math.min(retryIntervalMillis, timeout.timeLeft().toMillis()));
	}

	if (!timeout.hasTimeLeft()) {
		throw new TimeoutException("Condition was not met in given timeout.");
	}
}
 
源代码26 项目: Flink-CEPplus   文件: MetricsAvailabilityITCase.java
private static <X> X fetchMetric(final SupplierWithException<CompletableFuture<X>, IOException> clientOperation, final Predicate<X> predicate) throws InterruptedException, ExecutionException, TimeoutException {
	final CompletableFuture<X> responseFuture = FutureUtils.retrySuccessfulWithDelay(() -> {
			try {
				return clientOperation.get();
			} catch (IOException e) {
				throw new RuntimeException(e);
			}
		},
		Time.seconds(1),
		Deadline.fromNow(Duration.ofSeconds(5)),
		predicate,
		new ScheduledExecutorServiceAdapter(scheduledExecutorService));

	return responseFuture.get(30, TimeUnit.SECONDS);
}
 
源代码27 项目: pulsar-flink   文件: FlinkPulsarSourceTest.java
public DummyFlinkPulsarSource(
        SupplierWithException<PulsarFetcher<T>, Exception> testFetcherSupplier,
        PulsarMetadataReader discoverer,
        Properties properties) {
    super("a", "b", mock(DeserializationSchema.class), properties);
    this.testFetcherSupplier = testFetcherSupplier;
    this.discoverer = discoverer;
}
 
源代码28 项目: flink   文件: FlinkKafkaConsumerBaseTest.java
@SuppressWarnings("unchecked")
DummyFlinkKafkaConsumer(
		SupplierWithException<AbstractFetcher<T, ?>, Exception> testFetcherSupplier,
		AbstractPartitionDiscoverer testPartitionDiscoverer,
		boolean isAutoCommitEnabled,
		long discoveryIntervalMillis) {
	this(
		testFetcherSupplier,
		testPartitionDiscoverer,
		isAutoCommitEnabled,
		discoveryIntervalMillis,
		Collections.singletonList("dummy-topic")
		);
}
 
源代码29 项目: flink   文件: BackPressureITCase.java
private SupplierWithException<Boolean, Exception> isJobVertexBackPressured(final JobVertex sourceJobVertex) {
	return () -> {
		final OperatorBackPressureStatsResponse backPressureStatsResponse = dispatcherGateway
			.requestOperatorBackPressureStats(TEST_JOB_ID, sourceJobVertex.getID())
			.get();

		return backPressureStatsResponse.getOperatorBackPressureStats()
			.map(backPressureStats -> isBackPressured(backPressureStats))
			.orElse(false);
	};
}
 
源代码30 项目: flink   文件: LambdaUtil.java
/**
 * Runs the given runnable with the given ClassLoader as the thread's
 * {@link Thread#setContextClassLoader(ClassLoader) context class loader}.
 *
 * <p>The method will make sure to set the context class loader of the calling thread
 * back to what it was before after the runnable completed.
 */
public static <R, E extends Throwable> R withContextClassLoader(
		final ClassLoader cl,
		final SupplierWithException<R, E> s) throws E {

	try (TemporaryClassLoaderContext tmpCl = new TemporaryClassLoaderContext(cl)) {
		return s.get();
	}
}
 
 类所在包
 类方法
 同包方法