下面列出了怎么用org.apache.commons.lang3.concurrent.ConcurrentUtils的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public <T> Future<T> execute(final Controller controller, final BackgroundActionRegistry registry, final BackgroundAction<T> action) {
if(log.isDebugEnabled()) {
log.debug(String.format("Run action %s in background", action));
}
// Add action to registry of controller. Will be removed automatically when stopped
registry.add(action);
action.init();
// Start background task
final Callable<T> command = new BackgroundCallable<T>(action, controller);
try {
final Future<T> task = concurrentExecutor.execute(command);
if(log.isInfoEnabled()) {
log.info(String.format("Scheduled background runnable %s for execution", action));
}
return task;
}
catch(RejectedExecutionException e) {
log.error(String.format("Error scheduling background task %s for execution. %s", action, e.getMessage()));
action.cancel();
action.cleanup();
return ConcurrentUtils.constantFuture(null);
}
}
/**
* Returns thread factory for producing threads associated with the specified
* group name. The group name-space is hierarchical, based on slash-delimited
* name segments,
*
* @param groupName group name
* @return thread factory
*/
public static AtriumGroupedThreadFactory groupedThreadFactory(String groupName) {
AtriumGroupedThreadFactory factory = FACTORIES.get(groupName);
if (factory != null) {
return factory;
}
// Find the parent group or root the group hierarchy under default group.
int i = groupName.lastIndexOf(DELIMITER);
if (i > 0) {
String name = groupName.substring(0, i);
ThreadGroup parentGroup = groupedThreadFactory(name).threadGroup();
factory = new AtriumGroupedThreadFactory(new ThreadGroup(parentGroup, groupName));
} else {
factory = new AtriumGroupedThreadFactory(new ThreadGroup(groupName));
}
return ConcurrentUtils.putIfAbsent(FACTORIES, groupName, factory);
}
private Future<ReplicationSnapshot> getValidReplicationInfo() {
try {
try {
lock.acquire();
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
ReplicationSnapshot replicationSnapshot = getReplicationInfoMasterConfig();
if (replicationSnapshot == null) {
replicationSnapshot = getAttributeBlocking(cluster, MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT);
}
if (!isReplicationInfoValid(replicationSnapshot)) {
final MySqlNode snapshotNode = getSnapshotNode();
final String dumpName = getDumpUniqueId() + ".sql";
if (MySqlClusterUtils.IS_MASTER.apply(snapshotNode)) {
return createMasterReplicationSnapshot(snapshotNode, dumpName);
} else {
return createSlaveReplicationSnapshot(snapshotNode, dumpName);
}
}
return ConcurrentUtils.constantFuture(replicationSnapshot);
} finally {
lock.release();
}
}
/**
* Returns thread factory for producing threads associated with the specified
* group name. The group name-space is hierarchical, based on slash-delimited
* name segments, e.g. {@code onos/intent}.
*
* @param groupName group name
* @return thread factory
*/
public static GroupedThreadFactory groupedThreadFactory(String groupName) {
GroupedThreadFactory factory = FACTORIES.get(groupName);
if (factory != null) {
return factory;
}
// Find the parent group or root the group hierarchy under default group.
int i = groupName.lastIndexOf(DELIMITER);
if (i > 0) {
String name = groupName.substring(0, i);
ThreadGroup parentGroup = groupedThreadFactory(name).threadGroup();
factory = new GroupedThreadFactory(new ThreadGroup(parentGroup, groupName));
} else {
factory = new GroupedThreadFactory(new ThreadGroup(groupName));
}
return ConcurrentUtils.putIfAbsent(FACTORIES, groupName, factory);
}
@Async
public Future<OperationStatus> startOperation(final String app, final String table, final String opid, OperationTypes operationType, String... params) {
Operable operable = operationFactory.getOperable(operationType, params);
try {
return operable.start(app, table, opid);
} catch (OperationException ex) {
LoggerFactory.getLogger(OperationExecutor.class.getName()).error(null, ex);
return ConcurrentUtils.constantFuture(OperationStatus.ERROR);
}
}
/**
* Will queue up the <code>BackgroundAction</code> to be run in a background thread
*
* @param action The runnable to execute in a secondary thread
*/
@Override
public <T> Future<T> background(final BackgroundAction<T> action) {
if(registry.contains(action)) {
log.warn(String.format("Skip duplicate background action %s found in registry", action));
return ConcurrentUtils.constantFuture(null);
}
return DefaultBackgroundExecutor.get().execute(this, registry, action);
}
@SuppressWarnings("unchecked")
@Override
public <T> FutureObject<T> saveAsync(Object object) {
ObjectHolder objectHolder = new ObjectHolder(object);
setIdIfNecessary(objectHolder);
MockStore.put(objectHolder.getId(), object, tx());
Future<?> futureId = ConcurrentUtils.constantFuture(objectHolder.getId());
return new FutureObject<T>(r, (Future<IdRef<T>>) futureId, (T) object);
}
@Test
public void testConstantFuture_Integer() throws Exception {
Future<Integer> test = ConcurrentUtils.constantFuture(5);
assertTrue(test.isDone());
assertSame(5, test.get());
assertFalse(test.isCancelled());
}
@Test
public void ConcurrentExceptionSample() throws ConcurrentException {
final Error err = new AssertionError("Test");
try {
ConcurrentUtils.handleCause(new ExecutionException(err));
fail("Error not thrown!");
} catch (final Error e) {
assertEquals("Wrong error", err, e);
}
}
/**
* Returns the managed {@code FileBasedConfigurationBuilder} for the current
* file name pattern. It is determined based on the evaluation of the file
* name pattern using the configured {@code ConfigurationInterpolator}. If
* this is the first access to this configuration file, the builder is
* created.
*
* @return the configuration builder for the configuration corresponding to
* the current evaluation of the file name pattern
* @throws ConfigurationException if the builder cannot be determined (e.g.
* due to missing initialization parameters)
*/
public FileBasedConfigurationBuilder<T> getManagedBuilder()
throws ConfigurationException
{
final Map<String, Object> params = getParameters();
final MultiFileBuilderParametersImpl multiParams =
MultiFileBuilderParametersImpl.fromParameters(params, true);
if (multiParams.getFilePattern() == null)
{
throw new ConfigurationException("No file name pattern is set!");
}
final String fileName = fetchFileName(multiParams);
FileBasedConfigurationBuilder<T> builder =
getManagedBuilders().get(fileName);
if (builder == null)
{
builder =
createInitializedManagedBuilder(fileName,
createManagedBuilderParameters(params, multiParams));
final FileBasedConfigurationBuilder<T> newBuilder =
ConcurrentUtils.putIfAbsent(getManagedBuilders(), fileName,
builder);
if (newBuilder == builder)
{
initListeners(newBuilder);
}
else
{
builder = newBuilder;
}
}
return builder;
}
@Override
public Future<RecordMetadata> send(ProducerRecord record) {
// Fake result: only for testing purpose
return ConcurrentUtils.constantFuture(new RecordMetadata(null, 0, 0, 0, 0, 0, 0));
}
@Override
public Future<Path> calcLeastCostPath(Node fromNode, Node toNode, double starttime, Person person, Vehicle vehicle) {
return ConcurrentUtils.constantFuture(delegate.calcLeastCostPath(fromNode, toNode, starttime, person, vehicle));
}
public Future<MatrixBlock> allocateBlockAsync() {
ExecutorService pool = LazyWriteBuffer.getUtilThreadPool();
return (pool != null) ? pool.submit(() -> allocateBlock()) : //async
ConcurrentUtils.constantFuture(allocateBlock()); //fallback sync
}
public Future<TransferStatus> submit(final TransferCallable runnable) throws BackgroundException {
return ConcurrentUtils.constantFuture(runnable.call());
}
public Future<MatrixBlock> allocateBlockAsync() {
ExecutorService pool = LazyWriteBuffer.getUtilThreadPool();
return (pool != null) ? pool.submit(() -> allocateBlock()) : //async
ConcurrentUtils.constantFuture(allocateBlock()); //fallback sync
}
@Override
public <T> FutureObject<T> fetchAsync(IdRef<T> id) {
T object = fetch(id);
Future<T> futureObject = ConcurrentUtils.constantFuture(object);
return new FutureObject<>(r, futureObject);
}
@Override
public FutureObject<Void> destroyAsync(IdRef<?> id) {
destroy(id);
Future<Void> future = ConcurrentUtils.constantFuture(null);
return new FutureObject<>(r, future);
}
@SuppressWarnings("unchecked")
private <T> FutureObject<T> saveEntityAsync(ObjectHolder objectHolder, Entity entity) {
Key key = datastore.put(entity);
Future<?> futureId = ConcurrentUtils.constantFuture(IdRefToKey.toIdRef(r, key, objectHolder.getModel()));
return new FutureObject<>(r, (Future<IdRef<T>>) futureId, (T) objectHolder.getObject());
}
@Override
public <T> FutureObject<T> fetchAsync(IdRef<T> id) {
T object = fetch(id);
Future<T> futureObject = ConcurrentUtils.constantFuture(object);
return new FutureObject<>(r, futureObject);
}
@Override
public FutureObject<Void> destroyAsync(IdRef<?> id) {
destroy(id);
Future<Void> future = ConcurrentUtils.constantFuture(null);
return new FutureObject<>(r, future);
}
/**
* Returns a future indicating whether this responsibility is qualified for execution. Subclasses
* which require asynchronous processing (like making a network call) to determine if they're
* qualified should override this method and throw an IllegalStateException from within
* {@link #isQualified()} (which should never be called).
*
* @return A future boolean indicating whether the responsibility is qualified.
*/
public Future<Boolean> isAsynchronouslyQualified() {
return ConcurrentUtils.constantFuture(isQualified());
}