下面列出了org.apache.hadoop.mapred.JobContextImpl#org.apache.flink.util.FlinkRuntimeException 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Removes the entropy marker string from the path, if the given file system is an
* entropy-injecting file system (implements {@link EntropyInjectingFileSystem}) and
* the entropy marker key is present. Otherwise, this returns the path as is.
*
* @param path The path to filter.
* @return The path without the marker string.
*/
public static Path removeEntropyMarkerIfPresent(FileSystem fs, Path path) {
final EntropyInjectingFileSystem efs = getEntropyFs(fs);
if (efs == null) {
return path;
}
else {
try {
return resolveEntropy(path, efs, false);
}
catch (IOException e) {
// this should never happen, because the path was valid before and we only remove characters.
// rethrow to silence the compiler
throw new FlinkRuntimeException(e.getMessage(), e);
}
}
}
@Test
public void testEndpointsMustBeUnique() throws Exception {
final RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = Arrays.asList(
Tuple2.of(new TestHeaders(), testHandler),
Tuple2.of(new TestHeaders(), testUploadHandler)
);
assertThrows("REST handler registration",
FlinkRuntimeException.class,
() -> {
try (TestRestServerEndpoint restServerEndpoint = new TestRestServerEndpoint(serverConfig, handlers)) {
restServerEndpoint.start();
return null;
}
});
}
private State<T> findFinalStateAfterProceed(
ConditionContext context,
State<T> state,
T event) {
final Stack<State<T>> statesToCheck = new Stack<>();
statesToCheck.push(state);
try {
while (!statesToCheck.isEmpty()) {
final State<T> currentState = statesToCheck.pop();
for (StateTransition<T> transition : currentState.getStateTransitions()) {
if (transition.getAction() == StateTransitionAction.PROCEED &&
checkFilterCondition(context, transition.getCondition(), event)) {
if (transition.getTargetState().isFinal()) {
return transition.getTargetState();
} else {
statesToCheck.push(transition.getTargetState());
}
}
}
}
} catch (Exception e) {
throw new FlinkRuntimeException("Failure happened in filter function.", e);
}
return null;
}
public int getNumberOfAvailableSlotsForGroup(SlotSharingGroupId slotSharingGroupId, JobVertexID jobVertexId) {
final SlotSharingManager multiTaskSlotManager = slotSharingManagersMap.get(slotSharingGroupId);
if (multiTaskSlotManager != null) {
int availableSlots = 0;
for (SlotSharingManager.MultiTaskSlot multiTaskSlot : multiTaskSlotManager.getResolvedRootSlots()) {
if (!multiTaskSlot.contains(jobVertexId)) {
availableSlots++;
}
}
return availableSlots;
} else {
throw new FlinkRuntimeException("No MultiTaskSlotmanager registered under " + slotSharingGroupId + '.');
}
}
@Override
public UV getValue() {
if (deleted) {
return null;
} else {
if (userValue == null) {
try {
userValue = deserializeUserValue(dataInputView, rawValueBytes, valueSerializer);
} catch (IOException e) {
throw new FlinkRuntimeException("Error while deserializing the user value.", e);
}
}
return userValue;
}
}
private static void initializeBoundedBlockingPartitions(
ResultSubpartition[] subpartitions,
ResultPartition parent,
BoundedBlockingSubpartitionType blockingSubpartitionType,
int networkBufferSize,
FileChannelManager channelManager) {
int i = 0;
try {
for (i = 0; i < subpartitions.length; i++) {
final File spillFile = channelManager.createChannel().getPathFile();
subpartitions[i] = blockingSubpartitionType.create(i, parent, spillFile, networkBufferSize);
}
}
catch (IOException e) {
// undo all the work so that a failed constructor does not leave any resources
// in need of disposal
releasePartitionsQuietly(subpartitions, i);
// this is not good, we should not be forced to wrap this in a runtime exception.
// the fact that the ResultPartition and Task constructor (which calls this) do not tolerate any exceptions
// is incompatible with eager initialization of resources (RAII).
throw new FlinkRuntimeException(e);
}
}
public TaskSubmissionTestEnvironment build() throws Exception {
final TestingRpcService testingRpcService = new TestingRpcService();
final ShuffleEnvironment<?, ?> network = optionalShuffleEnvironment.orElseGet(() -> {
try {
return createShuffleEnvironment(resourceID,
localCommunication,
configuration,
testingRpcService,
mockShuffleEnvironment);
} catch (Exception e) {
throw new FlinkRuntimeException("Failed to build TaskSubmissionTestEnvironment", e);
}
});
return new TaskSubmissionTestEnvironment(
jobId,
jobMasterId,
slotSize,
jobMasterGateway,
configuration,
taskManagerActionListeners,
metricQueryServiceAddress,
testingRpcService,
network);
}
@Override
@SuppressWarnings("unchecked")
@Nonnull
public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
if (stateFactory == null) {
String message = String.format("State %s is not supported by %s",
stateDesc.getClass(), TtlStateFactory.class);
throw new FlinkRuntimeException(message);
}
IS state = stateFactory.createInternalState(namespaceSerializer, stateDesc);
stateSnapshotFilters.put(stateDesc.getName(),
(StateSnapshotTransformer<Object>) getStateSnapshotTransformer(stateDesc, snapshotTransformFactory));
((MockInternalKvState<K, N, SV>) state).values = () -> stateValues
.computeIfAbsent(stateDesc.getName(), n -> new HashMap<>())
.computeIfAbsent(getCurrentKey(), k -> new HashMap<>());
return state;
}
@Test
public void testClusterShutdownWhenSubmissionFails() throws Exception {
// we're "listening" on this to be completed to verify that the cluster
// is being shut down from the ApplicationDispatcherBootstrap
final CompletableFuture<ApplicationStatus> externalShutdownFuture = new CompletableFuture<>();
final TestingDispatcherGateway.Builder dispatcherBuilder = new TestingDispatcherGateway.Builder()
.setSubmitFunction(jobGraph -> {
throw new FlinkRuntimeException("Nope!");
})
.setClusterShutdownFunction((status) -> {
externalShutdownFuture.complete(status);
return CompletableFuture.completedFuture(Acknowledge.get());
});
ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap(3);
final CompletableFuture<Acknowledge> shutdownFuture =
bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), scheduledExecutor);
// wait until the bootstrap "thinks" it's done
shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
// verify that the dispatcher is actually being shut down
assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS), is(ApplicationStatus.FAILED));
}
@Override
@Nonnull
public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
if (stateFactory == null) {
String message = String.format("State %s is not supported by %s",
stateDesc.getClass(), this.getClass());
throw new FlinkRuntimeException(message);
}
Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult = tryRegisterKvStateInformation(
stateDesc, namespaceSerializer, snapshotTransformFactory);
return stateFactory.createState(stateDesc, registerResult, RocksDBKeyedStateBackend.this);
}
/**
* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their
* {@link StateHandleID}.
*/
private void downloadDataForAllStateHandles(
Map<StateHandleID, StreamStateHandle> stateHandleMap,
Path restoreInstancePath,
CloseableRegistry closeableRegistry) throws Exception {
try {
List<Runnable> runnables = createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry);
List<CompletableFuture<Void>> futures = new ArrayList<>(runnables.size());
for (Runnable runnable : runnables) {
futures.add(CompletableFuture.runAsync(runnable, executorService));
}
FutureUtils.waitForAll(futures).get();
} catch (ExecutionException e) {
Throwable throwable = ExceptionUtils.stripExecutionException(e);
throwable = ExceptionUtils.stripException(throwable, RuntimeException.class);
if (throwable instanceof IOException) {
throw (IOException) throwable;
} else {
throw new FlinkRuntimeException("Failed to download data for state handles.", e);
}
}
}
@Override
public int nextUnexpiredOffset(byte[] bytes, long ttl, long currentTimestamp) {
input.setBuffer(bytes);
int lastElementOffset = 0;
while (input.available() > 0) {
try {
long timestamp = nextElementLastAccessTimestamp();
if (!TtlUtils.expired(timestamp, ttl, currentTimestamp)) {
break;
}
lastElementOffset = input.getPosition();
} catch (IOException e) {
throw new FlinkRuntimeException("Failed to deserialize list element for TTL compaction filter", e);
}
}
return lastElementOffset;
}
private static void ensureTruncateInitialized() throws FlinkRuntimeException {
if (truncateHandle == null) {
Method truncateMethod;
try {
truncateMethod = FileSystem.class.getMethod("truncate", Path.class, long.class);
}
catch (NoSuchMethodException e) {
throw new FlinkRuntimeException("Could not find a public truncate method on the Hadoop File System.");
}
if (!Modifier.isPublic(truncateMethod.getModifiers())) {
throw new FlinkRuntimeException("Could not find a public truncate method on the Hadoop File System.");
}
truncateHandle = truncateMethod;
}
}
@Override
public UV setValue(UV value) {
if (deleted) {
throw new IllegalStateException("The value has already been deleted.");
}
UV oldValue = getValue();
try {
userValue = value;
rawValueBytes = serializeValueNullSensitive(value, valueSerializer);
db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
} catch (IOException | RocksDBException e) {
throw new FlinkRuntimeException("Error while putting data into RocksDB.", e);
}
return oldValue;
}
@Override
public void updateInternal(List<V> values) {
Preconditions.checkNotNull(values, "List of values to add cannot be null.");
if (!values.isEmpty()) {
try {
backend.db.put(
columnFamily,
writeOptions,
serializeCurrentKeyWithGroupAndNamespace(),
serializeValueList(values, elementSerializer, DELIMITER));
} catch (IOException | RocksDBException e) {
throw new FlinkRuntimeException("Error while updating data to RocksDB", e);
}
} else {
clear();
}
}
@Override
@Nonnull
public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
if (stateFactory == null) {
String message = String.format("State %s is not supported by %s",
stateDesc.getClass(), this.getClass());
throw new FlinkRuntimeException(message);
}
StateTable<K, N, SV> stateTable = tryRegisterStateTable(
namespaceSerializer, stateDesc, getStateSnapshotTransformFactory(stateDesc, snapshotTransformFactory));
return stateFactory.createState(stateDesc, stateTable, getKeySerializer());
}
@Override
public Collection<SlotOffer> offerSlots(TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, Collection<SlotOffer> offers) {
hasReceivedSlotOffers.trigger();
final Collection<SlotInfo> slotInfos = Optional.ofNullable(registeredSlots.get(taskManagerLocation.getResourceID()))
.orElseThrow(() -> new FlinkRuntimeException("TaskManager not registered."));
int slotIndex = slotInfos.size();
for (SlotOffer offer : offers) {
slotInfos.add(new SimpleSlotContext(
offer.getAllocationId(),
taskManagerLocation,
slotIndex,
taskManagerGateway));
slotIndex++;
}
return offers;
}
@Override
public OUT getResult(TtlValue<ACC> accumulator) {
Preconditions.checkNotNull(updater, "State updater should be set in TtlAggregatingState");
Preconditions.checkNotNull(stateClear, "State clearing should be set in TtlAggregatingState");
ACC userAcc;
try {
userAcc = getWithTtlCheckAndUpdate(() -> accumulator, updater, stateClear);
} catch (Exception e) {
throw new FlinkRuntimeException("Failed to retrieve original internal aggregating state", e);
}
return userAcc == null ? null : original.getResult(userAcc);
}
@Override
public void invoke() throws Exception {
if (!failed && getEnvironment().getTaskInfo().getIndexOfThisSubtask() == 0) {
failed = true;
throw new FlinkRuntimeException(getClass().getSimpleName());
} else {
super.invoke();
}
}
public int getNumberOfSharedSlots(SlotSharingGroupId slotSharingGroupId) {
final SlotSharingManager multiTaskSlotManager = slotSharingManagersMap.get(slotSharingGroupId);
if (multiTaskSlotManager != null) {
return multiTaskSlotManager.getResolvedRootSlots().size();
} else {
throw new FlinkRuntimeException("No MultiTaskSlotManager registered under " + slotSharingGroupId + '.');
}
}
@Nonnull
private E deserializeElement(@Nonnull byte[] bytes) {
try {
final int numPrefixBytes = groupPrefixBytes.length;
inputView.setBuffer(bytes, numPrefixBytes, bytes.length - numPrefixBytes);
return byteOrderProducingSerializer.deserialize(inputView);
} catch (IOException e) {
throw new FlinkRuntimeException("Error while deserializing the element.", e);
}
}
private static void checkAllEndpointsAndHandlersAreUnique(final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers) {
// check for all handlers that
// 1) the instance is only registered once
// 2) only 1 handler is registered for each endpoint (defined by (version, method, url))
// technically the first check is redundant since a duplicate instance also returns the same headers which
// should fail the second check, but we get a better error message
final Set<String> uniqueEndpoints = new HashSet<>();
final Set<ChannelInboundHandler> distinctHandlers = Collections.newSetFromMap(new IdentityHashMap<>());
for (Tuple2<RestHandlerSpecification, ChannelInboundHandler> handler : handlers) {
boolean isNewHandler = distinctHandlers.add(handler.f1);
if (!isNewHandler) {
throw new FlinkRuntimeException("Duplicate REST handler instance found."
+ " Please ensure each instance is registered only once.");
}
final RestHandlerSpecification headers = handler.f0;
for (RestAPIVersion supportedAPIVersion : headers.getSupportedAPIVersions()) {
final String parameterizedEndpoint = supportedAPIVersion.toString() + headers.getHttpMethod() + headers.getTargetRestEndpointURL();
// normalize path parameters; distinct path parameters still clash at runtime
final String normalizedEndpoint = parameterizedEndpoint.replaceAll(":[\\w-]+", ":param");
boolean isNewEndpoint = uniqueEndpoints.add(normalizedEndpoint);
if (!isNewEndpoint) {
throw new FlinkRuntimeException(
String.format(
"REST handler registration overlaps with another registration for: version=%s, method=%s, url=%s.",
supportedAPIVersion, headers.getHttpMethod(), headers.getTargetRestEndpointURL()));
}
}
}
}
@Test
public void testOfGenericClassForFlink() {
try {
TypeInformation.of(Tuple3.class);
fail("should fail with an exception");
}
catch (FlinkRuntimeException e) {
// check that the error message mentions the TypeHint
assertNotEquals(-1, e.getMessage().indexOf("TypeHint"));
}
}
@Override
public void clear() {
try {
backend.db.delete(columnFamily, writeOptions, serializeCurrentKeyWithGroupAndNamespace());
} catch (RocksDBException e) {
throw new FlinkRuntimeException("Error while removing entry from RocksDB", e);
}
}
void updateInternal(byte[] key, SV valueToStore) {
try {
// write the new value to RocksDB
backend.db.put(columnFamily, writeOptions, key, getValueBytes(valueToStore));
}
catch (RocksDBException e) {
throw new FlinkRuntimeException("Error while adding value to RocksDB", e);
}
}
/**
* Capture the command-line standard output from the driver execution.
*
* @param args driver command-line arguments
* @return standard output from driver execution
* @throws Exception on error
*/
private String getSystemOutput(String[] args) throws Exception {
ByteArrayOutputStream output = new ByteArrayOutputStream();
// Configure object reuse mode
switch (mode) {
case CLUSTER:
case COLLECTION:
args = ArrayUtils.add(args, "--__disable_object_reuse");
break;
case CLUSTER_OBJECT_REUSE:
// object reuse is enabled by default when executing drivers
break;
default:
throw new FlinkRuntimeException("Unknown execution mode " + mode);
}
// Redirect stdout
PrintStream stdout = System.out;
System.setOut(new PrintStream(output));
Runner.main(args);
// Restore stdout
System.setOut(stdout);
return output.toString();
}
private boolean isFinalState(ComputationState state) {
State<T> stateObject = getState(state);
if (stateObject == null) {
throw new FlinkRuntimeException("State " + state.getCurrentStateName() + " does not exist in the NFA. NFA has states "
+ states.values());
}
return stateObject.isFinal();
}
public StreamIterationHead(Environment env) {
super(env);
final String iterationId = getConfiguration().getIterationId();
if (iterationId == null || iterationId.length() == 0) {
throw new FlinkRuntimeException("Missing iteration ID in the task configuration");
}
this.dataChannel = new ArrayBlockingQueue<>(1);
this.brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId,
getEnvironment().getTaskInfo().getIndexOfThisSubtask());
this.iterationWaitTime = getConfiguration().getIterationWaitTime();
this.shouldWait = iterationWaitTime > 0;
}
@Override
public OUT getResult(TtlValue<ACC> accumulator) {
Preconditions.checkNotNull(updater, "State updater should be set in TtlAggregatingState");
Preconditions.checkNotNull(stateClear, "State clearing should be set in TtlAggregatingState");
ACC userAcc;
try {
userAcc = getWithTtlCheckAndUpdate(() -> accumulator, updater, stateClear);
} catch (Exception e) {
throw new FlinkRuntimeException("Failed to retrieve original internal aggregating state", e);
}
return userAcc == null ? null : original.getResult(userAcc);
}
/**
* Sets the key and key-group as prefix. This will serialize them into the buffer and the will be used to create
* composite keys with provided namespaces.
*
* @param key the key.
* @param keyGroupId the key-group id for the key.
*/
public void setKeyAndKeyGroup(@Nonnull K key, @Nonnegative int keyGroupId) {
try {
serializeKeyGroupAndKey(key, keyGroupId);
} catch (IOException shouldNeverHappen) {
throw new FlinkRuntimeException(shouldNeverHappen);
}
}