类org.apache.flink.util.OptionalFailure源码实例Demo

下面列出了怎么用org.apache.flink.util.OptionalFailure的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: Flink-CEPplus   文件: AccumulatorHelper.java
/**
 * Merge two collections of accumulators. The second will be merged into the
 * first.
 *
 * @param target
 *            The collection of accumulators that will be updated
 * @param toMerge
 *            The collection of accumulators that will be merged into the
 *            other
 */
public static void mergeInto(Map<String, OptionalFailure<Accumulator<?, ?>>> target, Map<String, Accumulator<?, ?>> toMerge) {
	for (Map.Entry<String, Accumulator<?, ?>> otherEntry : toMerge.entrySet()) {
		OptionalFailure<Accumulator<?, ?>> ownAccumulator = target.get(otherEntry.getKey());
		if (ownAccumulator == null) {
			// Create initial counter (copy!)
			target.put(
				otherEntry.getKey(),
				wrapUnchecked(otherEntry.getKey(), () -> otherEntry.getValue().clone()));
		}
		else if (ownAccumulator.isFailure()) {
			continue;
		}
		else {
			Accumulator<?, ?> accumulator = ownAccumulator.getUnchecked();
			// Both should have the same type
			compareAccumulatorTypes(otherEntry.getKey(),
				accumulator.getClass(), otherEntry.getValue().getClass());
			// Merge target counter with other counter

			target.put(
				otherEntry.getKey(),
				wrapUnchecked(otherEntry.getKey(), () -> mergeSingle(accumulator, otherEntry.getValue().clone())));
		}
	}
}
 
源代码2 项目: Flink-CEPplus   文件: AccumulatorHelper.java
/**
 * Takes the serialized accumulator results and tries to deserialize them using the provided
 * class loader.
 * @param serializedAccumulators The serialized accumulator results.
 * @param loader The class loader to use.
 * @return The deserialized accumulator results.
 * @throws IOException
 * @throws ClassNotFoundException
 */
public static Map<String, OptionalFailure<Object>> deserializeAccumulators(
		Map<String, SerializedValue<OptionalFailure<Object>>> serializedAccumulators,
		ClassLoader loader) throws IOException, ClassNotFoundException {

	if (serializedAccumulators == null || serializedAccumulators.isEmpty()) {
		return Collections.emptyMap();
	}

	Map<String, OptionalFailure<Object>> accumulators = new HashMap<>(serializedAccumulators.size());

	for (Map.Entry<String, SerializedValue<OptionalFailure<Object>>> entry : serializedAccumulators.entrySet()) {

		OptionalFailure<Object> value = null;
		if (entry.getValue() != null) {
			value = entry.getValue().deserializeValue(loader);
		}

		accumulators.put(entry.getKey(), value);
	}

	return accumulators;
}
 
private static StringifiedAccumulatorResult stringifyAccumulatorResult(
		String name,
		@Nullable OptionalFailure<Accumulator<?, ?>> accumulator) {
	if (accumulator == null) {
		return new StringifiedAccumulatorResult(name, "null", "null");
	}
	else if (accumulator.isFailure()) {
		return new StringifiedAccumulatorResult(
			name,
			"null",
			ExceptionUtils.stringifyException(accumulator.getFailureCause()));
	}
	else {
		Object localValue;
		String simpleName = "null";
		try {
			simpleName = accumulator.getUnchecked().getClass().getSimpleName();
			localValue = accumulator.getUnchecked().getLocalValue();
		}
		catch (RuntimeException exception) {
			LOG.error("Failed to stringify accumulator [" + name + "]", exception);
			localValue = ExceptionUtils.stringifyException(exception);
		}
		return new StringifiedAccumulatorResult(name, simpleName, localValue != null ? localValue.toString() : "null");
	}
}
 
源代码4 项目: flink   文件: JobResultDeserializer.java
@SuppressWarnings("unchecked")
private Map<String, SerializedValue<OptionalFailure<Object>>> parseAccumulatorResults(
		final JsonParser p,
		final DeserializationContext ctxt) throws IOException {

	final Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults = new HashMap<>();
	while (true) {
		final JsonToken jsonToken = p.nextToken();
		assertNotEndOfInput(p, jsonToken);
		if (jsonToken == JsonToken.END_OBJECT) {
			break;
		}
		final String accumulatorName = p.getValueAsString();
		p.nextValue();
		accumulatorResults.put(
			accumulatorName,
			(SerializedValue<OptionalFailure<Object>>) serializedValueDeserializer.deserialize(p, ctxt));
	}
	return accumulatorResults;
}
 
源代码5 项目: Flink-CEPplus   文件: JobResultDeserializer.java
@SuppressWarnings("unchecked")
private Map<String, SerializedValue<OptionalFailure<Object>>> parseAccumulatorResults(
		final JsonParser p,
		final DeserializationContext ctxt) throws IOException {

	final Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults = new HashMap<>();
	while (true) {
		final JsonToken jsonToken = p.nextToken();
		assertNotEndOfInput(p, jsonToken);
		if (jsonToken == JsonToken.END_OBJECT) {
			break;
		}
		final String accumulatorName = p.getValueAsString();
		p.nextValue();
		accumulatorResults.put(
			accumulatorName,
			(SerializedValue<OptionalFailure<Object>>) serializedValueDeserializer.deserialize(p, ctxt));
	}
	return accumulatorResults;
}
 
源代码6 项目: flink   文件: StringifiedAccumulatorResult.java
private static StringifiedAccumulatorResult stringifyAccumulatorResult(
		String name,
		@Nullable OptionalFailure<Accumulator<?, ?>> accumulator) {
	if (accumulator == null) {
		return new StringifiedAccumulatorResult(name, "null", "null");
	}
	else if (accumulator.isFailure()) {
		return new StringifiedAccumulatorResult(
			name,
			"null",
			ExceptionUtils.stringifyException(accumulator.getFailureCause()));
	}
	else {
		Object localValue;
		String simpleName = "null";
		try {
			simpleName = accumulator.getUnchecked().getClass().getSimpleName();
			localValue = accumulator.getUnchecked().getLocalValue();
		}
		catch (RuntimeException exception) {
			LOG.error("Failed to stringify accumulator [" + name + "]", exception);
			localValue = ExceptionUtils.stringifyException(exception);
		}
		return new StringifiedAccumulatorResult(name, simpleName, localValue != null ? localValue.toString() : "null");
	}
}
 
源代码7 项目: flink   文件: RestClusterClientTest.java
@Override
protected CompletableFuture<AsynchronousOperationResult<AsynchronousOperationInfo>> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, SavepointDisposalStatusMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
	final TriggerId actualTriggerId = request.getPathParameter(TriggerIdPathParameter.class);

	if (actualTriggerId.equals(triggerId)) {
		final OptionalFailure<AsynchronousOperationInfo> nextResponse = responses.poll();

		if (nextResponse != null) {
			if (nextResponse.isFailure()) {
				throw new RestHandlerException("Failure", HttpResponseStatus.BAD_REQUEST, nextResponse.getFailureCause());
			} else {
				return CompletableFuture.completedFuture(AsynchronousOperationResult.completed(nextResponse.getUnchecked()));
			}
		} else {
			throw new AssertionError();
		}
	} else {
		throw new AssertionError();
	}
}
 
@Test
public void stringifyingResultsShouldIncorporateAccumulatorLocalValueDirectly() {
	final String name = "a";
	final int targetValue = 314159;
	final IntCounter acc = new IntCounter();
	acc.add(targetValue);
	final Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap = new HashMap<>();
	accumulatorMap.put(name, OptionalFailure.of(acc));

	final StringifiedAccumulatorResult[] results = StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);

	assertEquals(1, results.length);

	final StringifiedAccumulatorResult firstResult = results[0];
	assertEquals(name, firstResult.getName());
	assertEquals("IntCounter", firstResult.getType());
	assertEquals(Integer.toString(targetValue), firstResult.getValue());
}
 
@Test
public void stringifyingResultsShouldReportNullLocalValueAsNonnullValueString() {
	final String name = "a";
	final NullBearingAccumulator acc = new NullBearingAccumulator();
	final Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap = new HashMap<>();
	accumulatorMap.put(name, OptionalFailure.of(acc));

	final StringifiedAccumulatorResult[] results = StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);

	assertEquals(1, results.length);

	// Note the use of a String with a content of "null" rather than a null value
	final StringifiedAccumulatorResult firstResult = results[0];
	assertEquals(name, firstResult.getName());
	assertEquals("NullBearingAccumulator", firstResult.getType());
	assertEquals("null", firstResult.getValue());
}
 
@Test
public void stringifyingResultsShouldReportNullAccumulatorWithNonnullValueAndTypeString() {
	final String name = "a";
	final Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap = new HashMap<>();
	accumulatorMap.put(name, null);

	final StringifiedAccumulatorResult[] results = StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);

	assertEquals(1, results.length);

	// Note the use of String values with content of "null" rather than null values
	final StringifiedAccumulatorResult firstResult = results[0];
	assertEquals(name, firstResult.getName());
	assertEquals("null", firstResult.getType());
	assertEquals("null", firstResult.getValue());
}
 
@Test
public void stringifyingFailureResults() {
	final String name = "a";
	final Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap = new HashMap<>();
	accumulatorMap.put(name, OptionalFailure.ofFailure(new FlinkRuntimeException("Test")));

	final StringifiedAccumulatorResult[] results = StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);

	assertEquals(1, results.length);

	// Note the use of String values with content of "null" rather than null values
	final StringifiedAccumulatorResult firstResult = results[0];
	assertEquals(name, firstResult.getName());
	assertEquals("null", firstResult.getType());
	assertTrue(firstResult.getValue().startsWith("org.apache.flink.util.FlinkRuntimeException: Test"));
}
 
源代码12 项目: flink   文件: StringifiedAccumulatorResultTest.java
@Test
public void stringifyingResultsShouldReportNullAccumulatorWithNonnullValueAndTypeString() {
	final String name = "a";
	final Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap = new HashMap<>();
	accumulatorMap.put(name, null);

	final StringifiedAccumulatorResult[] results = StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);

	assertEquals(1, results.length);

	// Note the use of String values with content of "null" rather than null values
	final StringifiedAccumulatorResult firstResult = results[0];
	assertEquals(name, firstResult.getName());
	assertEquals("null", firstResult.getType());
	assertEquals("null", firstResult.getValue());
}
 
源代码13 项目: flink   文件: RestClusterClientTest.java
@Override
protected CompletableFuture<AsynchronousOperationResult<AsynchronousOperationInfo>> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, SavepointDisposalStatusMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
	final TriggerId actualTriggerId = request.getPathParameter(TriggerIdPathParameter.class);

	if (actualTriggerId.equals(triggerId)) {
		final OptionalFailure<AsynchronousOperationInfo> nextResponse = responses.poll();

		if (nextResponse != null) {
			if (nextResponse.isFailure()) {
				throw new RestHandlerException("Failure", HttpResponseStatus.BAD_REQUEST, nextResponse.getFailureCause());
			} else {
				return CompletableFuture.completedFuture(AsynchronousOperationResult.completed(nextResponse.getUnchecked()));
			}
		} else {
			throw new AssertionError();
		}
	} else {
		throw new AssertionError();
	}
}
 
源代码14 项目: Flink-CEPplus   文件: RestClusterClientTest.java
@Override
protected CompletableFuture<AsynchronousOperationResult<AsynchronousOperationInfo>> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, SavepointDisposalStatusMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
	final TriggerId actualTriggerId = request.getPathParameter(TriggerIdPathParameter.class);

	if (actualTriggerId.equals(triggerId)) {
		final OptionalFailure<AsynchronousOperationInfo> nextResponse = responses.poll();

		if (nextResponse != null) {
			if (nextResponse.isFailure()) {
				throw new RestHandlerException("Failure", HttpResponseStatus.BAD_REQUEST, nextResponse.getFailureCause());
			} else {
				return CompletableFuture.completedFuture(AsynchronousOperationResult.completed(nextResponse.getUnchecked()));
			}
		} else {
			throw new AssertionError();
		}
	} else {
		throw new AssertionError();
	}
}
 
源代码15 项目: Flink-CEPplus   文件: RestClusterClientTest.java
@Test
public void testGetAccumulators() throws Exception {
	TestAccumulatorHandler accumulatorHandler = new TestAccumulatorHandler();

	try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(accumulatorHandler)){
		RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());

		try {
			JobID id = new JobID();

			{
				Map<String, OptionalFailure<Object>> accumulators = restClusterClient.getAccumulators(id);
				assertNotNull(accumulators);
				assertEquals(1, accumulators.size());

				assertEquals(true, accumulators.containsKey("testKey"));
				assertEquals("testValue", accumulators.get("testKey").get().toString());
			}
		} finally {
			restClusterClient.shutdown();
		}
	}
}
 
源代码16 项目: flink   文件: AccumulatorHelper.java
/**
 * Merge two collections of accumulators. The second will be merged into the
 * first.
 *
 * @param target
 *            The collection of accumulators that will be updated
 * @param toMerge
 *            The collection of accumulators that will be merged into the
 *            other
 */
public static void mergeInto(Map<String, OptionalFailure<Accumulator<?, ?>>> target, Map<String, Accumulator<?, ?>> toMerge) {
	for (Map.Entry<String, Accumulator<?, ?>> otherEntry : toMerge.entrySet()) {
		OptionalFailure<Accumulator<?, ?>> ownAccumulator = target.get(otherEntry.getKey());
		if (ownAccumulator == null) {
			// Create initial counter (copy!)
			target.put(
				otherEntry.getKey(),
				wrapUnchecked(otherEntry.getKey(), () -> otherEntry.getValue().clone()));
		}
		else if (ownAccumulator.isFailure()) {
			continue;
		}
		else {
			Accumulator<?, ?> accumulator = ownAccumulator.getUnchecked();
			// Both should have the same type
			compareAccumulatorTypes(otherEntry.getKey(),
				accumulator.getClass(), otherEntry.getValue().getClass());
			// Merge target counter with other counter

			target.put(
				otherEntry.getKey(),
				wrapUnchecked(otherEntry.getKey(), () -> mergeSingle(accumulator, otherEntry.getValue().clone())));
		}
	}
}
 
源代码17 项目: flink   文件: AccumulatorHelper.java
/**
 * Takes the serialized accumulator results and tries to deserialize them using the provided
 * class loader.
 * @param serializedAccumulators The serialized accumulator results.
 * @param loader The class loader to use.
 * @return The deserialized accumulator results.
 * @throws IOException
 * @throws ClassNotFoundException
 */
public static Map<String, OptionalFailure<Object>> deserializeAccumulators(
		Map<String, SerializedValue<OptionalFailure<Object>>> serializedAccumulators,
		ClassLoader loader) throws IOException, ClassNotFoundException {

	if (serializedAccumulators == null || serializedAccumulators.isEmpty()) {
		return Collections.emptyMap();
	}

	Map<String, OptionalFailure<Object>> accumulators = new HashMap<>(serializedAccumulators.size());

	for (Map.Entry<String, SerializedValue<OptionalFailure<Object>>> entry : serializedAccumulators.entrySet()) {

		OptionalFailure<Object> value = null;
		if (entry.getValue() != null) {
			value = entry.getValue().deserializeValue(loader);
		}

		accumulators.put(entry.getKey(), value);
	}

	return accumulators;
}
 
源代码18 项目: flink   文件: AccumulatorHelper.java
/**
 * Merge two collections of accumulators. The second will be merged into the
 * first.
 *
 * @param target
 *            The collection of accumulators that will be updated
 * @param toMerge
 *            The collection of accumulators that will be merged into the
 *            other
 */
public static void mergeInto(Map<String, OptionalFailure<Accumulator<?, ?>>> target, Map<String, Accumulator<?, ?>> toMerge) {
	for (Map.Entry<String, Accumulator<?, ?>> otherEntry : toMerge.entrySet()) {
		OptionalFailure<Accumulator<?, ?>> ownAccumulator = target.get(otherEntry.getKey());
		if (ownAccumulator == null) {
			// Create initial counter (copy!)
			target.put(
				otherEntry.getKey(),
				wrapUnchecked(otherEntry.getKey(), () -> otherEntry.getValue().clone()));
		}
		else if (ownAccumulator.isFailure()) {
			continue;
		}
		else {
			Accumulator<?, ?> accumulator = ownAccumulator.getUnchecked();
			// Both should have the same type
			compareAccumulatorTypes(otherEntry.getKey(),
				accumulator.getClass(), otherEntry.getValue().getClass());
			// Merge target counter with other counter

			target.put(
				otherEntry.getKey(),
				wrapUnchecked(otherEntry.getKey(), () -> mergeSingle(accumulator, otherEntry.getValue().clone())));
		}
	}
}
 
源代码19 项目: flink   文件: AccumulatorHelper.java
/**
 * Takes the serialized accumulator results and tries to deserialize them using the provided
 * class loader, and then try to unwrap the value unchecked.
 * @param serializedAccumulators The serialized accumulator results.
 * @param loader The class loader to use.
 * @return The deserialized and unwrapped accumulator results.
 */
public static Map<String, Object> deserializeAndUnwrapAccumulators(
	Map<String, SerializedValue<OptionalFailure<Object>>> serializedAccumulators,
	ClassLoader loader) throws IOException, ClassNotFoundException {

	Map<String, OptionalFailure<Object>> deserializedAccumulators = deserializeAccumulators(serializedAccumulators, loader);

	if (deserializedAccumulators.isEmpty()) {
		return Collections.emptyMap();
	}

	Map<String, Object> accumulators = new HashMap<>(serializedAccumulators.size());

	for (Map.Entry<String, OptionalFailure<Object>> entry : deserializedAccumulators.entrySet()) {
		accumulators.put(entry.getKey(), entry.getValue().getUnchecked());
	}

	return accumulators;
}
 
源代码20 项目: flink   文件: JobResultDeserializer.java
@SuppressWarnings("unchecked")
private Map<String, SerializedValue<OptionalFailure<Object>>> parseAccumulatorResults(
		final JsonParser p,
		final DeserializationContext ctxt) throws IOException {

	final Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults = new HashMap<>();
	while (true) {
		final JsonToken jsonToken = p.nextToken();
		assertNotEndOfInput(p, jsonToken);
		if (jsonToken == JsonToken.END_OBJECT) {
			break;
		}
		final String accumulatorName = p.getValueAsString();
		p.nextValue();
		accumulatorResults.put(
			accumulatorName,
			(SerializedValue<OptionalFailure<Object>>) serializedValueDeserializer.deserialize(p, ctxt));
	}
	return accumulatorResults;
}
 
源代码21 项目: flink   文件: RestClusterClientTest.java
@Test
public void testGetAccumulators() throws Exception {
	TestAccumulatorHandler accumulatorHandler = new TestAccumulatorHandler();

	try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(accumulatorHandler)){
		RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());

		try {
			JobID id = new JobID();

			{
				Map<String, OptionalFailure<Object>> accumulators = restClusterClient.getAccumulators(id);
				assertNotNull(accumulators);
				assertEquals(1, accumulators.size());

				assertEquals(true, accumulators.containsKey("testKey"));
				assertEquals("testValue", accumulators.get("testKey").get().toString());
			}
		} finally {
			restClusterClient.shutdown();
		}
	}
}
 
源代码22 项目: flink   文件: StringifiedAccumulatorResultTest.java
@Test
public void stringifyingResultsShouldIncorporateAccumulatorLocalValueDirectly() {
	final String name = "a";
	final int targetValue = 314159;
	final IntCounter acc = new IntCounter();
	acc.add(targetValue);
	final Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap = new HashMap<>();
	accumulatorMap.put(name, OptionalFailure.of(acc));

	final StringifiedAccumulatorResult[] results = StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);

	assertEquals(1, results.length);

	final StringifiedAccumulatorResult firstResult = results[0];
	assertEquals(name, firstResult.getName());
	assertEquals("IntCounter", firstResult.getType());
	assertEquals(Integer.toString(targetValue), firstResult.getValue());
}
 
源代码23 项目: flink   文件: StringifiedAccumulatorResultTest.java
@Test
public void stringifyingResultsShouldReportNullAccumulatorWithNonnullValueAndTypeString() {
	final String name = "a";
	final Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap = new HashMap<>();
	accumulatorMap.put(name, null);

	final StringifiedAccumulatorResult[] results = StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);

	assertEquals(1, results.length);

	// Note the use of String values with content of "null" rather than null values
	final StringifiedAccumulatorResult firstResult = results[0];
	assertEquals(name, firstResult.getName());
	assertEquals("null", firstResult.getType());
	assertEquals("null", firstResult.getValue());
}
 
源代码24 项目: flink   文件: StringifiedAccumulatorResultTest.java
@Test
public void stringifyingFailureResults() {
	final String name = "a";
	final Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap = new HashMap<>();
	accumulatorMap.put(name, OptionalFailure.ofFailure(new FlinkRuntimeException("Test")));

	final StringifiedAccumulatorResult[] results = StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);

	assertEquals(1, results.length);

	// Note the use of String values with content of "null" rather than null values
	final StringifiedAccumulatorResult firstResult = results[0];
	assertEquals(name, firstResult.getName());
	assertEquals("null", firstResult.getType());
	assertTrue(firstResult.getValue().startsWith("org.apache.flink.util.FlinkRuntimeException: Test"));
}
 
源代码25 项目: Flink-CEPplus   文件: AccumulatorHelper.java
/**
 * Transform the Map with accumulators into a Map containing only the
 * results.
 */
public static Map<String, OptionalFailure<Object>> toResultMap(Map<String, Accumulator<?, ?>> accumulators) {
	Map<String, OptionalFailure<Object>> resultMap = new HashMap<>();
	for (Map.Entry<String, Accumulator<?, ?>> entry : accumulators.entrySet()) {
		resultMap.put(entry.getKey(), wrapUnchecked(entry.getKey(), () -> entry.getValue().getLocalValue()));
	}
	return resultMap;
}
 
源代码26 项目: Flink-CEPplus   文件: AccumulatorHelper.java
private static <R> OptionalFailure<R> wrapUnchecked(String name, Supplier<R> supplier) {
	return OptionalFailure.createFrom(() -> {
		try {
			return supplier.get();
		} catch (RuntimeException ex) {
			LOG.error("Unexpected error while handling accumulator [" + name + "]", ex);
			throw new FlinkException(ex);
		}
	});
}
 
源代码27 项目: Flink-CEPplus   文件: JobExecutionResult.java
/**
 * Creates a new JobExecutionResult.
 *
 * @param jobID The job's ID.
 * @param netRuntime The net runtime of the job (excluding pre-flight phase like the optimizer) in milliseconds
 * @param accumulators A map of all accumulators produced by the job.
 */
public JobExecutionResult(JobID jobID, long netRuntime, Map<String, OptionalFailure<Object>> accumulators) {
	super(jobID);
	this.netRuntime = netRuntime;

	if (accumulators != null) {
		this.accumulatorResults = accumulators;
	} else {
		this.accumulatorResults = Collections.emptyMap();
	}
}
 
源代码28 项目: Flink-CEPplus   文件: CollectionExecutor.java
public JobExecutionResult execute(Plan program) throws Exception {
	long startTime = System.currentTimeMillis();

	initCache(program.getCachedFiles());
	Collection<? extends GenericDataSinkBase<?>> sinks = program.getDataSinks();
	for (Operator<?> sink : sinks) {
		execute(sink);
	}
	
	long endTime = System.currentTimeMillis();
	Map<String, OptionalFailure<Object>> accumulatorResults = AccumulatorHelper.toResultMap(accumulators);
	return new JobExecutionResult(null, endTime - startTime, accumulatorResults);
}
 
/**
 * Flatten a map of accumulator names to Accumulator instances into an array of StringifiedAccumulatorResult values.
    */
public static StringifiedAccumulatorResult[] stringifyAccumulatorResults(Map<String, OptionalFailure<Accumulator<?, ?>>> accs) {
	if (accs == null || accs.isEmpty()) {
		return new StringifiedAccumulatorResult[0];
	}
	else {
		StringifiedAccumulatorResult[] results = new StringifiedAccumulatorResult[accs.size()];

		int i = 0;
		for (Map.Entry<String, OptionalFailure<Accumulator<?, ?>>> entry : accs.entrySet()) {
			results[i++] = stringifyAccumulatorResult(entry.getKey(), entry.getValue());
		}
		return results;
	}
}
 
源代码30 项目: Flink-CEPplus   文件: JobResult.java
private JobResult(
		final JobID jobId,
		final ApplicationStatus applicationStatus,
		final Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults,
		final long netRuntime,
		@Nullable final SerializedThrowable serializedThrowable) {

	checkArgument(netRuntime >= 0, "netRuntime must be greater than or equals 0");

	this.jobId = requireNonNull(jobId);
	this.applicationStatus = requireNonNull(applicationStatus);
	this.accumulatorResults = requireNonNull(accumulatorResults);
	this.netRuntime = netRuntime;
	this.serializedThrowable = serializedThrowable;
}
 
 类所在包
 类方法
 同包方法