下面列出了怎么用org.apache.flink.util.OptionalFailure的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Merge two collections of accumulators. The second will be merged into the
* first.
*
* @param target
* The collection of accumulators that will be updated
* @param toMerge
* The collection of accumulators that will be merged into the
* other
*/
public static void mergeInto(Map<String, OptionalFailure<Accumulator<?, ?>>> target, Map<String, Accumulator<?, ?>> toMerge) {
for (Map.Entry<String, Accumulator<?, ?>> otherEntry : toMerge.entrySet()) {
OptionalFailure<Accumulator<?, ?>> ownAccumulator = target.get(otherEntry.getKey());
if (ownAccumulator == null) {
// Create initial counter (copy!)
target.put(
otherEntry.getKey(),
wrapUnchecked(otherEntry.getKey(), () -> otherEntry.getValue().clone()));
}
else if (ownAccumulator.isFailure()) {
continue;
}
else {
Accumulator<?, ?> accumulator = ownAccumulator.getUnchecked();
// Both should have the same type
compareAccumulatorTypes(otherEntry.getKey(),
accumulator.getClass(), otherEntry.getValue().getClass());
// Merge target counter with other counter
target.put(
otherEntry.getKey(),
wrapUnchecked(otherEntry.getKey(), () -> mergeSingle(accumulator, otherEntry.getValue().clone())));
}
}
}
/**
* Takes the serialized accumulator results and tries to deserialize them using the provided
* class loader.
* @param serializedAccumulators The serialized accumulator results.
* @param loader The class loader to use.
* @return The deserialized accumulator results.
* @throws IOException
* @throws ClassNotFoundException
*/
public static Map<String, OptionalFailure<Object>> deserializeAccumulators(
Map<String, SerializedValue<OptionalFailure<Object>>> serializedAccumulators,
ClassLoader loader) throws IOException, ClassNotFoundException {
if (serializedAccumulators == null || serializedAccumulators.isEmpty()) {
return Collections.emptyMap();
}
Map<String, OptionalFailure<Object>> accumulators = new HashMap<>(serializedAccumulators.size());
for (Map.Entry<String, SerializedValue<OptionalFailure<Object>>> entry : serializedAccumulators.entrySet()) {
OptionalFailure<Object> value = null;
if (entry.getValue() != null) {
value = entry.getValue().deserializeValue(loader);
}
accumulators.put(entry.getKey(), value);
}
return accumulators;
}
private static StringifiedAccumulatorResult stringifyAccumulatorResult(
String name,
@Nullable OptionalFailure<Accumulator<?, ?>> accumulator) {
if (accumulator == null) {
return new StringifiedAccumulatorResult(name, "null", "null");
}
else if (accumulator.isFailure()) {
return new StringifiedAccumulatorResult(
name,
"null",
ExceptionUtils.stringifyException(accumulator.getFailureCause()));
}
else {
Object localValue;
String simpleName = "null";
try {
simpleName = accumulator.getUnchecked().getClass().getSimpleName();
localValue = accumulator.getUnchecked().getLocalValue();
}
catch (RuntimeException exception) {
LOG.error("Failed to stringify accumulator [" + name + "]", exception);
localValue = ExceptionUtils.stringifyException(exception);
}
return new StringifiedAccumulatorResult(name, simpleName, localValue != null ? localValue.toString() : "null");
}
}
@SuppressWarnings("unchecked")
private Map<String, SerializedValue<OptionalFailure<Object>>> parseAccumulatorResults(
final JsonParser p,
final DeserializationContext ctxt) throws IOException {
final Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults = new HashMap<>();
while (true) {
final JsonToken jsonToken = p.nextToken();
assertNotEndOfInput(p, jsonToken);
if (jsonToken == JsonToken.END_OBJECT) {
break;
}
final String accumulatorName = p.getValueAsString();
p.nextValue();
accumulatorResults.put(
accumulatorName,
(SerializedValue<OptionalFailure<Object>>) serializedValueDeserializer.deserialize(p, ctxt));
}
return accumulatorResults;
}
@SuppressWarnings("unchecked")
private Map<String, SerializedValue<OptionalFailure<Object>>> parseAccumulatorResults(
final JsonParser p,
final DeserializationContext ctxt) throws IOException {
final Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults = new HashMap<>();
while (true) {
final JsonToken jsonToken = p.nextToken();
assertNotEndOfInput(p, jsonToken);
if (jsonToken == JsonToken.END_OBJECT) {
break;
}
final String accumulatorName = p.getValueAsString();
p.nextValue();
accumulatorResults.put(
accumulatorName,
(SerializedValue<OptionalFailure<Object>>) serializedValueDeserializer.deserialize(p, ctxt));
}
return accumulatorResults;
}
private static StringifiedAccumulatorResult stringifyAccumulatorResult(
String name,
@Nullable OptionalFailure<Accumulator<?, ?>> accumulator) {
if (accumulator == null) {
return new StringifiedAccumulatorResult(name, "null", "null");
}
else if (accumulator.isFailure()) {
return new StringifiedAccumulatorResult(
name,
"null",
ExceptionUtils.stringifyException(accumulator.getFailureCause()));
}
else {
Object localValue;
String simpleName = "null";
try {
simpleName = accumulator.getUnchecked().getClass().getSimpleName();
localValue = accumulator.getUnchecked().getLocalValue();
}
catch (RuntimeException exception) {
LOG.error("Failed to stringify accumulator [" + name + "]", exception);
localValue = ExceptionUtils.stringifyException(exception);
}
return new StringifiedAccumulatorResult(name, simpleName, localValue != null ? localValue.toString() : "null");
}
}
@Override
protected CompletableFuture<AsynchronousOperationResult<AsynchronousOperationInfo>> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, SavepointDisposalStatusMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
final TriggerId actualTriggerId = request.getPathParameter(TriggerIdPathParameter.class);
if (actualTriggerId.equals(triggerId)) {
final OptionalFailure<AsynchronousOperationInfo> nextResponse = responses.poll();
if (nextResponse != null) {
if (nextResponse.isFailure()) {
throw new RestHandlerException("Failure", HttpResponseStatus.BAD_REQUEST, nextResponse.getFailureCause());
} else {
return CompletableFuture.completedFuture(AsynchronousOperationResult.completed(nextResponse.getUnchecked()));
}
} else {
throw new AssertionError();
}
} else {
throw new AssertionError();
}
}
@Test
public void stringifyingResultsShouldIncorporateAccumulatorLocalValueDirectly() {
final String name = "a";
final int targetValue = 314159;
final IntCounter acc = new IntCounter();
acc.add(targetValue);
final Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap = new HashMap<>();
accumulatorMap.put(name, OptionalFailure.of(acc));
final StringifiedAccumulatorResult[] results = StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
assertEquals(1, results.length);
final StringifiedAccumulatorResult firstResult = results[0];
assertEquals(name, firstResult.getName());
assertEquals("IntCounter", firstResult.getType());
assertEquals(Integer.toString(targetValue), firstResult.getValue());
}
@Test
public void stringifyingResultsShouldReportNullLocalValueAsNonnullValueString() {
final String name = "a";
final NullBearingAccumulator acc = new NullBearingAccumulator();
final Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap = new HashMap<>();
accumulatorMap.put(name, OptionalFailure.of(acc));
final StringifiedAccumulatorResult[] results = StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
assertEquals(1, results.length);
// Note the use of a String with a content of "null" rather than a null value
final StringifiedAccumulatorResult firstResult = results[0];
assertEquals(name, firstResult.getName());
assertEquals("NullBearingAccumulator", firstResult.getType());
assertEquals("null", firstResult.getValue());
}
@Test
public void stringifyingResultsShouldReportNullAccumulatorWithNonnullValueAndTypeString() {
final String name = "a";
final Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap = new HashMap<>();
accumulatorMap.put(name, null);
final StringifiedAccumulatorResult[] results = StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
assertEquals(1, results.length);
// Note the use of String values with content of "null" rather than null values
final StringifiedAccumulatorResult firstResult = results[0];
assertEquals(name, firstResult.getName());
assertEquals("null", firstResult.getType());
assertEquals("null", firstResult.getValue());
}
@Test
public void stringifyingFailureResults() {
final String name = "a";
final Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap = new HashMap<>();
accumulatorMap.put(name, OptionalFailure.ofFailure(new FlinkRuntimeException("Test")));
final StringifiedAccumulatorResult[] results = StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
assertEquals(1, results.length);
// Note the use of String values with content of "null" rather than null values
final StringifiedAccumulatorResult firstResult = results[0];
assertEquals(name, firstResult.getName());
assertEquals("null", firstResult.getType());
assertTrue(firstResult.getValue().startsWith("org.apache.flink.util.FlinkRuntimeException: Test"));
}
@Test
public void stringifyingResultsShouldReportNullAccumulatorWithNonnullValueAndTypeString() {
final String name = "a";
final Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap = new HashMap<>();
accumulatorMap.put(name, null);
final StringifiedAccumulatorResult[] results = StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
assertEquals(1, results.length);
// Note the use of String values with content of "null" rather than null values
final StringifiedAccumulatorResult firstResult = results[0];
assertEquals(name, firstResult.getName());
assertEquals("null", firstResult.getType());
assertEquals("null", firstResult.getValue());
}
@Override
protected CompletableFuture<AsynchronousOperationResult<AsynchronousOperationInfo>> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, SavepointDisposalStatusMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
final TriggerId actualTriggerId = request.getPathParameter(TriggerIdPathParameter.class);
if (actualTriggerId.equals(triggerId)) {
final OptionalFailure<AsynchronousOperationInfo> nextResponse = responses.poll();
if (nextResponse != null) {
if (nextResponse.isFailure()) {
throw new RestHandlerException("Failure", HttpResponseStatus.BAD_REQUEST, nextResponse.getFailureCause());
} else {
return CompletableFuture.completedFuture(AsynchronousOperationResult.completed(nextResponse.getUnchecked()));
}
} else {
throw new AssertionError();
}
} else {
throw new AssertionError();
}
}
@Override
protected CompletableFuture<AsynchronousOperationResult<AsynchronousOperationInfo>> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, SavepointDisposalStatusMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
final TriggerId actualTriggerId = request.getPathParameter(TriggerIdPathParameter.class);
if (actualTriggerId.equals(triggerId)) {
final OptionalFailure<AsynchronousOperationInfo> nextResponse = responses.poll();
if (nextResponse != null) {
if (nextResponse.isFailure()) {
throw new RestHandlerException("Failure", HttpResponseStatus.BAD_REQUEST, nextResponse.getFailureCause());
} else {
return CompletableFuture.completedFuture(AsynchronousOperationResult.completed(nextResponse.getUnchecked()));
}
} else {
throw new AssertionError();
}
} else {
throw new AssertionError();
}
}
@Test
public void testGetAccumulators() throws Exception {
TestAccumulatorHandler accumulatorHandler = new TestAccumulatorHandler();
try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(accumulatorHandler)){
RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
try {
JobID id = new JobID();
{
Map<String, OptionalFailure<Object>> accumulators = restClusterClient.getAccumulators(id);
assertNotNull(accumulators);
assertEquals(1, accumulators.size());
assertEquals(true, accumulators.containsKey("testKey"));
assertEquals("testValue", accumulators.get("testKey").get().toString());
}
} finally {
restClusterClient.shutdown();
}
}
}
/**
* Merge two collections of accumulators. The second will be merged into the
* first.
*
* @param target
* The collection of accumulators that will be updated
* @param toMerge
* The collection of accumulators that will be merged into the
* other
*/
public static void mergeInto(Map<String, OptionalFailure<Accumulator<?, ?>>> target, Map<String, Accumulator<?, ?>> toMerge) {
for (Map.Entry<String, Accumulator<?, ?>> otherEntry : toMerge.entrySet()) {
OptionalFailure<Accumulator<?, ?>> ownAccumulator = target.get(otherEntry.getKey());
if (ownAccumulator == null) {
// Create initial counter (copy!)
target.put(
otherEntry.getKey(),
wrapUnchecked(otherEntry.getKey(), () -> otherEntry.getValue().clone()));
}
else if (ownAccumulator.isFailure()) {
continue;
}
else {
Accumulator<?, ?> accumulator = ownAccumulator.getUnchecked();
// Both should have the same type
compareAccumulatorTypes(otherEntry.getKey(),
accumulator.getClass(), otherEntry.getValue().getClass());
// Merge target counter with other counter
target.put(
otherEntry.getKey(),
wrapUnchecked(otherEntry.getKey(), () -> mergeSingle(accumulator, otherEntry.getValue().clone())));
}
}
}
/**
* Takes the serialized accumulator results and tries to deserialize them using the provided
* class loader.
* @param serializedAccumulators The serialized accumulator results.
* @param loader The class loader to use.
* @return The deserialized accumulator results.
* @throws IOException
* @throws ClassNotFoundException
*/
public static Map<String, OptionalFailure<Object>> deserializeAccumulators(
Map<String, SerializedValue<OptionalFailure<Object>>> serializedAccumulators,
ClassLoader loader) throws IOException, ClassNotFoundException {
if (serializedAccumulators == null || serializedAccumulators.isEmpty()) {
return Collections.emptyMap();
}
Map<String, OptionalFailure<Object>> accumulators = new HashMap<>(serializedAccumulators.size());
for (Map.Entry<String, SerializedValue<OptionalFailure<Object>>> entry : serializedAccumulators.entrySet()) {
OptionalFailure<Object> value = null;
if (entry.getValue() != null) {
value = entry.getValue().deserializeValue(loader);
}
accumulators.put(entry.getKey(), value);
}
return accumulators;
}
/**
* Merge two collections of accumulators. The second will be merged into the
* first.
*
* @param target
* The collection of accumulators that will be updated
* @param toMerge
* The collection of accumulators that will be merged into the
* other
*/
public static void mergeInto(Map<String, OptionalFailure<Accumulator<?, ?>>> target, Map<String, Accumulator<?, ?>> toMerge) {
for (Map.Entry<String, Accumulator<?, ?>> otherEntry : toMerge.entrySet()) {
OptionalFailure<Accumulator<?, ?>> ownAccumulator = target.get(otherEntry.getKey());
if (ownAccumulator == null) {
// Create initial counter (copy!)
target.put(
otherEntry.getKey(),
wrapUnchecked(otherEntry.getKey(), () -> otherEntry.getValue().clone()));
}
else if (ownAccumulator.isFailure()) {
continue;
}
else {
Accumulator<?, ?> accumulator = ownAccumulator.getUnchecked();
// Both should have the same type
compareAccumulatorTypes(otherEntry.getKey(),
accumulator.getClass(), otherEntry.getValue().getClass());
// Merge target counter with other counter
target.put(
otherEntry.getKey(),
wrapUnchecked(otherEntry.getKey(), () -> mergeSingle(accumulator, otherEntry.getValue().clone())));
}
}
}
/**
* Takes the serialized accumulator results and tries to deserialize them using the provided
* class loader, and then try to unwrap the value unchecked.
* @param serializedAccumulators The serialized accumulator results.
* @param loader The class loader to use.
* @return The deserialized and unwrapped accumulator results.
*/
public static Map<String, Object> deserializeAndUnwrapAccumulators(
Map<String, SerializedValue<OptionalFailure<Object>>> serializedAccumulators,
ClassLoader loader) throws IOException, ClassNotFoundException {
Map<String, OptionalFailure<Object>> deserializedAccumulators = deserializeAccumulators(serializedAccumulators, loader);
if (deserializedAccumulators.isEmpty()) {
return Collections.emptyMap();
}
Map<String, Object> accumulators = new HashMap<>(serializedAccumulators.size());
for (Map.Entry<String, OptionalFailure<Object>> entry : deserializedAccumulators.entrySet()) {
accumulators.put(entry.getKey(), entry.getValue().getUnchecked());
}
return accumulators;
}
@SuppressWarnings("unchecked")
private Map<String, SerializedValue<OptionalFailure<Object>>> parseAccumulatorResults(
final JsonParser p,
final DeserializationContext ctxt) throws IOException {
final Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults = new HashMap<>();
while (true) {
final JsonToken jsonToken = p.nextToken();
assertNotEndOfInput(p, jsonToken);
if (jsonToken == JsonToken.END_OBJECT) {
break;
}
final String accumulatorName = p.getValueAsString();
p.nextValue();
accumulatorResults.put(
accumulatorName,
(SerializedValue<OptionalFailure<Object>>) serializedValueDeserializer.deserialize(p, ctxt));
}
return accumulatorResults;
}
@Test
public void testGetAccumulators() throws Exception {
TestAccumulatorHandler accumulatorHandler = new TestAccumulatorHandler();
try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(accumulatorHandler)){
RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
try {
JobID id = new JobID();
{
Map<String, OptionalFailure<Object>> accumulators = restClusterClient.getAccumulators(id);
assertNotNull(accumulators);
assertEquals(1, accumulators.size());
assertEquals(true, accumulators.containsKey("testKey"));
assertEquals("testValue", accumulators.get("testKey").get().toString());
}
} finally {
restClusterClient.shutdown();
}
}
}
@Test
public void stringifyingResultsShouldIncorporateAccumulatorLocalValueDirectly() {
final String name = "a";
final int targetValue = 314159;
final IntCounter acc = new IntCounter();
acc.add(targetValue);
final Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap = new HashMap<>();
accumulatorMap.put(name, OptionalFailure.of(acc));
final StringifiedAccumulatorResult[] results = StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
assertEquals(1, results.length);
final StringifiedAccumulatorResult firstResult = results[0];
assertEquals(name, firstResult.getName());
assertEquals("IntCounter", firstResult.getType());
assertEquals(Integer.toString(targetValue), firstResult.getValue());
}
@Test
public void stringifyingResultsShouldReportNullAccumulatorWithNonnullValueAndTypeString() {
final String name = "a";
final Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap = new HashMap<>();
accumulatorMap.put(name, null);
final StringifiedAccumulatorResult[] results = StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
assertEquals(1, results.length);
// Note the use of String values with content of "null" rather than null values
final StringifiedAccumulatorResult firstResult = results[0];
assertEquals(name, firstResult.getName());
assertEquals("null", firstResult.getType());
assertEquals("null", firstResult.getValue());
}
@Test
public void stringifyingFailureResults() {
final String name = "a";
final Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap = new HashMap<>();
accumulatorMap.put(name, OptionalFailure.ofFailure(new FlinkRuntimeException("Test")));
final StringifiedAccumulatorResult[] results = StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
assertEquals(1, results.length);
// Note the use of String values with content of "null" rather than null values
final StringifiedAccumulatorResult firstResult = results[0];
assertEquals(name, firstResult.getName());
assertEquals("null", firstResult.getType());
assertTrue(firstResult.getValue().startsWith("org.apache.flink.util.FlinkRuntimeException: Test"));
}
/**
* Transform the Map with accumulators into a Map containing only the
* results.
*/
public static Map<String, OptionalFailure<Object>> toResultMap(Map<String, Accumulator<?, ?>> accumulators) {
Map<String, OptionalFailure<Object>> resultMap = new HashMap<>();
for (Map.Entry<String, Accumulator<?, ?>> entry : accumulators.entrySet()) {
resultMap.put(entry.getKey(), wrapUnchecked(entry.getKey(), () -> entry.getValue().getLocalValue()));
}
return resultMap;
}
private static <R> OptionalFailure<R> wrapUnchecked(String name, Supplier<R> supplier) {
return OptionalFailure.createFrom(() -> {
try {
return supplier.get();
} catch (RuntimeException ex) {
LOG.error("Unexpected error while handling accumulator [" + name + "]", ex);
throw new FlinkException(ex);
}
});
}
/**
* Creates a new JobExecutionResult.
*
* @param jobID The job's ID.
* @param netRuntime The net runtime of the job (excluding pre-flight phase like the optimizer) in milliseconds
* @param accumulators A map of all accumulators produced by the job.
*/
public JobExecutionResult(JobID jobID, long netRuntime, Map<String, OptionalFailure<Object>> accumulators) {
super(jobID);
this.netRuntime = netRuntime;
if (accumulators != null) {
this.accumulatorResults = accumulators;
} else {
this.accumulatorResults = Collections.emptyMap();
}
}
public JobExecutionResult execute(Plan program) throws Exception {
long startTime = System.currentTimeMillis();
initCache(program.getCachedFiles());
Collection<? extends GenericDataSinkBase<?>> sinks = program.getDataSinks();
for (Operator<?> sink : sinks) {
execute(sink);
}
long endTime = System.currentTimeMillis();
Map<String, OptionalFailure<Object>> accumulatorResults = AccumulatorHelper.toResultMap(accumulators);
return new JobExecutionResult(null, endTime - startTime, accumulatorResults);
}
/**
* Flatten a map of accumulator names to Accumulator instances into an array of StringifiedAccumulatorResult values.
*/
public static StringifiedAccumulatorResult[] stringifyAccumulatorResults(Map<String, OptionalFailure<Accumulator<?, ?>>> accs) {
if (accs == null || accs.isEmpty()) {
return new StringifiedAccumulatorResult[0];
}
else {
StringifiedAccumulatorResult[] results = new StringifiedAccumulatorResult[accs.size()];
int i = 0;
for (Map.Entry<String, OptionalFailure<Accumulator<?, ?>>> entry : accs.entrySet()) {
results[i++] = stringifyAccumulatorResult(entry.getKey(), entry.getValue());
}
return results;
}
}
private JobResult(
final JobID jobId,
final ApplicationStatus applicationStatus,
final Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults,
final long netRuntime,
@Nullable final SerializedThrowable serializedThrowable) {
checkArgument(netRuntime >= 0, "netRuntime must be greater than or equals 0");
this.jobId = requireNonNull(jobId);
this.applicationStatus = requireNonNull(applicationStatus);
this.accumulatorResults = requireNonNull(accumulatorResults);
this.netRuntime = netRuntime;
this.serializedThrowable = serializedThrowable;
}