下面列出了怎么用java.util.concurrent.ArrayBlockingQueue的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
protected void doStart() throws Exception
{
super.doStart();
_threadsStarted.set(0);
if (_jobs==null)
{
_jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued)
:new BlockingArrayQueue<Runnable>(_minThreads,_minThreads);
}
int threads=_threadsStarted.get();
while (isRunning() && threads<_minThreads)
{
startThread(threads);
threads=_threadsStarted.get();
}
}
@Override
public void start() {
LOG.info(
"Using {} as call queue; handlerCount={}; maxQueueLength={}; rsReportHandlerCount={}; "
+ "rsReportMaxQueueLength={}",
this.getClass().getSimpleName(), handlerCount, maxQueueLength, rsReportHandlerCount,
rsRsreportMaxQueueLength);
this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(maxQueueLength),
Threads.newDaemonThreadFactory("MasterFifoRpcScheduler.call.handler"),
new ThreadPoolExecutor.CallerRunsPolicy());
this.rsReportExecutor = new ThreadPoolExecutor(rsReportHandlerCount, rsReportHandlerCount, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(rsRsreportMaxQueueLength),
Threads.newDaemonThreadFactory("MasterFifoRpcScheduler.RSReport.handler"),
new ThreadPoolExecutor.CallerRunsPolicy());
}
@Test
public void test_threadPool() {
int total = 4;
ThreadPoolExecutor executor = new ThreadPoolExecutor(total, total, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
for (int i = 0; i < total; i++) {
executor.submit(new Task(i));
}
try {
executor.awaitTermination(1, TimeUnit.SECONDS);
} catch (Exception e) {
//
}
stopped = true;
executor.shutdown();
}
/**
* submit(callable).get() throws InterruptedException if interrupted
*/
public void testInterruptedSubmit() throws InterruptedException {
final CountDownLatch submitted = new CountDownLatch(1);
final CountDownLatch quittingTime = new CountDownLatch(1);
final Callable<Void> awaiter = new CheckedCallable<Void>() {
public Void realCall() throws InterruptedException {
assertTrue(quittingTime.await(2*LONG_DELAY_MS, MILLISECONDS));
return null;
}};
final ExecutorService p
= new ThreadPoolExecutor(1,1,60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p, quittingTime)) {
Thread t = newStartedThread(new CheckedInterruptedRunnable() {
public void realRun() throws Exception {
Future<Void> future = p.submit(awaiter);
submitted.countDown();
future.get();
}});
await(submitted);
t.interrupt();
awaitTermination(t);
}
}
@Setup(Level.Trial)
public void setUp() {
switch (implementation) {
case PARAM_UNSAFE:
//queue = new MpmcArrayQueue<>(CAPACITY);
break;
case PARAM_AFU:
//queue = new MpmcAtomicArrayQueue<>(CAPACITY);
break;
case PARAM_JDK:
queue = new ArrayBlockingQueue<>(CAPACITY);
break;
default:
throw new UnsupportedOperationException("Unsupported implementation " + implementation);
}
}
@Override
public Statement apply(final Statement base, Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
ShadowLooper layoutLooper =
Shadows.shadowOf(
(Looper)
Whitebox.invokeMethod(ComponentTree.class, "getDefaultLayoutThreadLooper"));
mMessageQueue = new ArrayBlockingQueue<>(100);
LayoutLooperThread layoutLooperThread = new LayoutLooperThread(layoutLooper, mMessageQueue);
layoutLooperThread.start();
try {
base.evaluate();
} finally {
mMessageQueue.add(new Message(MessageType.QUIT));
}
}
};
}
/**
* getPoolSize increases, but doesn't overestimate, when threads
* become active
*/
public void testGetPoolSize() throws InterruptedException {
final CountDownLatch done = new CountDownLatch(1);
final ThreadPoolExecutor p =
new ThreadPoolExecutor(1, 1,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p, done)) {
assertEquals(0, p.getPoolSize());
final CountDownLatch threadStarted = new CountDownLatch(1);
p.execute(new CheckedRunnable() {
public void realRun() throws InterruptedException {
threadStarted.countDown();
assertEquals(1, p.getPoolSize());
await(done);
}});
await(threadStarted);
assertEquals(1, p.getPoolSize());
}
}
public ControlledProgressTracker(List<IntArrayList> items) {
if (items.size() == 0) {
canProgress = false;
} else {
canProgress = true;
this.progressItems = new ArrayList<>(items.size());
for (int i = 0; i < items.size(); i++) {
List<Integer> taskLists = items.get(i);
Queue<Integer> progressQueue = new ArrayBlockingQueue<>(taskLists.size());
progressQueue.addAll(taskLists);
progressItems.add(progressQueue);
for (int t : taskLists) {
invertedItems.put(t, i);
}
}
}
}
@Override
public void startBroker() {
commitThreadSize = soaConfig.getCommitThreadSize();
executorRun = new ThreadPoolExecutor(commitThreadSize + 1, commitThreadSize + 1, 10L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(50), SoaThreadFactory.create("commit-run", Thread.MAX_PRIORITY - 1, true),
new ThreadPoolExecutor.CallerRunsPolicy());
soaConfig.registerChanged(new Runnable() {
@Override
public void run() {
if (commitThreadSize != soaConfig.getCommitThreadSize()) {
commitThreadSize = soaConfig.getCommitThreadSize();
executorRun.setCorePoolSize(commitThreadSize + 1);
executorRun.setMaximumPoolSize(commitThreadSize + 1);
}
}
});
executorRun.execute(() -> {
commitOffset();
});
}
@Override
public void onTrigger(ProcessContext context, ProcessSession processSession) throws ProcessException {
if(this.messageQueue == null){
int fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
this.messageQueue = new ArrayBlockingQueue<>(fetchSize);
}
this.folderName = context.getProperty(FOLDER).getValue();
Message emailMessage = this.receiveMessage(context);
if (emailMessage != null) {
this.transfer(emailMessage, context, processSession);
} else {
//No new messages found, yield the processor
context.yield();
}
}
public ProcessPoolOfficeManager(File officeHome, UnoUrl[] unoUrls, String[] runAsArgs, File templateProfileDir, File workDir,
long retryTimeout, long taskQueueTimeout, long taskExecutionTimeout, int maxTasksPerProcess,
ProcessManager processManager) {
this.taskQueueTimeout = taskQueueTimeout;
pool = new ArrayBlockingQueue<PooledOfficeManager>(unoUrls.length);
pooledManagers = new PooledOfficeManager[unoUrls.length];
for (int i = 0; i < unoUrls.length; i++) {
PooledOfficeManagerSettings settings = new PooledOfficeManagerSettings(unoUrls[i]);
settings.setRunAsArgs(runAsArgs);
settings.setTemplateProfileDir(templateProfileDir);
settings.setWorkDir(workDir);
settings.setOfficeHome(officeHome);
settings.setRetryTimeout(retryTimeout);
settings.setTaskExecutionTimeout(taskExecutionTimeout);
settings.setMaxTasksPerProcess(maxTasksPerProcess);
settings.setProcessManager(processManager);
pooledManagers[i] = new PooledOfficeManager(settings);
}
logger.info("ProcessManager implementation is " + processManager.getClass().getSimpleName());
}
public ParallelLoadOperation(final ResourceSet parent, final IProject project) {
this.parent = parent;
if (queueSize == -1) {
this.resourceQueue = new LinkedBlockingQueue<Triple<URI, Resource, Throwable>>();
} else if (queueSize == 0) {
this.resourceQueue = new SynchronousQueue<Triple<URI, Resource, Throwable>>();
} else {
this.resourceQueue = new ArrayBlockingQueue<Triple<URI, Resource, Throwable>>(queueSize);
}
this.resourceSetProvider = new ThreadLocal<ResourceSet>() {
@Override
protected ResourceSet initialValue() {
ResourceSet resourceSet = getResourceSetProvider().get(project);
BuildPhases.setIndexing(resourceSet, BuildPhases.isIndexing(parent));
DirectLinkingSourceLevelURIsAdapter.setSourceLevelUris(resourceSet, DirectLinkingSourceLevelURIsAdapter.findInstalledAdapter(parent).getSourceLevelURIs());
resourceSet.getLoadOptions().putAll(parent.getLoadOptions());
// we are not loading as part of a build
resourceSet.getLoadOptions().remove(ResourceDescriptionsProvider.NAMED_BUILDER_SCOPE);
resourceSet.setURIConverter(parent.getURIConverter());
return resourceSet;
}
};
this.executor = Executors.newFixedThreadPool(nThreads, new ThreadFactoryBuilder().setNameFormat("parallel-load-operation-%d").build()); //$NON-NLS-1$
this.waitTime = getTimeout();
}
@Test
public void testAnyExceptionWrappedInParameterException() {
class App {
@Option(names = "-queue", type = String.class, split = ",")
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
}
CommandLine cmd = new CommandLine(new App());
cmd.getCommandSpec().parser().collectErrors(true);
cmd.parseArgs("-queue", "a,b,c");
ParseResult parseResult = cmd.getParseResult();
assertTrue(parseResult.unmatched().isEmpty());
assertEquals(1, parseResult.errors().size());
assertTrue(parseResult.errors().get(0) instanceof ParameterException);
assertTrue(parseResult.errors().get(0).getCause() instanceof NoSuchMethodException);
assertEquals("NoSuchMethodException: java.util.concurrent.ArrayBlockingQueue.<init>() while processing argument at or before arg[1] 'a,b,c' in [-queue, a,b,c]: java.lang.NoSuchMethodException: java.util.concurrent.ArrayBlockingQueue.<init>()",
parseResult.errors().get(0).getMessage());
assertEquals("java.util.concurrent.ArrayBlockingQueue.<init>()", parseResult.errors().get(0).getCause().getMessage());
}
/**
* Interior removal of elements used by an iterator will cause it
* to be untracked.
*/
public void interiorRemovalOfElementsUsedByIterator() {
boolean fair = rnd.nextBoolean();
int capacity = rnd.nextInt(10, 20);
ArrayBlockingQueue q = new ArrayBlockingQueue(capacity, fair);
randomizePutIndex(q);
q.add(0);
for (int i = 1; i < 2 * capacity; i++) {
q.add(i);
Integer[] elts = { -1, -2, -3 };
for (Integer elt : elts) q.add(elt);
assertEquals(q.remove(), i - 1);
Iterator it = q.iterator();
assertEquals(it.next(), i);
assertEquals(it.next(), elts[0]);
Collections.shuffle(Arrays.asList(elts));
assertTrue(q.remove(elts[0]));
assertTrue(q.remove(elts[1]));
assertEquals(trackedIterators(q), Collections.singletonList(it));
assertTrue(q.remove(elts[2]));
assertNull(itrs(q));
assertEquals(it.next(), -2);
assertIteratorExhausted(it);
assertTrue(isDetached(it));
}
}
/**
* drainTo empties full queue, unblocking a waiting put.
*/
public void testDrainToWithActivePut() throws InterruptedException {
final ArrayBlockingQueue q = populatedQueue(SIZE);
Thread t = new Thread(new CheckedRunnable() {
public void realRun() throws InterruptedException {
q.put(new Integer(SIZE + 1));
}});
t.start();
ArrayList l = new ArrayList();
q.drainTo(l);
assertTrue(l.size() >= SIZE);
for (int i = 0; i < SIZE; ++i)
assertEquals(l.get(i), new Integer(i));
t.join();
assertTrue(q.size() + l.size() >= SIZE);
}
/**
* offer transfers elements across Executor tasks
*/
public void testOfferInExecutor() {
final ArrayBlockingQueue q = new ArrayBlockingQueue(2);
q.add(one);
q.add(two);
final CheckedBarrier threadsStarted = new CheckedBarrier(2);
final ExecutorService executor = Executors.newFixedThreadPool(2);
try (PoolCleaner cleaner = cleaner(executor)) {
executor.execute(new CheckedRunnable() {
public void realRun() throws InterruptedException {
assertFalse(q.offer(three));
threadsStarted.await();
assertTrue(q.offer(three, LONG_DELAY_MS, MILLISECONDS));
assertEquals(0, q.remainingCapacity());
}});
executor.execute(new CheckedRunnable() {
public void realRun() throws InterruptedException {
threadsStarted.await();
assertEquals(0, q.remainingCapacity());
assertSame(one, q.take());
}});
}
}
private void doTest(final HelloContinuation helloPort) throws Exception {
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(6));
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch controlDoneSignal = new CountDownLatch(5);
CountDownLatch helloDoneSignal = new CountDownLatch(5);
executor.execute(new ControlWorker(helloPort, "Fred", startSignal, controlDoneSignal));
executor.execute(new HelloWorker(helloPort, "Fred", "", startSignal, helloDoneSignal));
executor.execute(new ControlWorker(helloPort, "Barry", startSignal, controlDoneSignal));
executor.execute(new HelloWorker(helloPort, "Barry", "Jameson", startSignal, helloDoneSignal));
executor.execute(new ControlWorker(helloPort, "Harry", startSignal, controlDoneSignal));
executor.execute(new HelloWorker(helloPort, "Harry", "", startSignal, helloDoneSignal));
executor.execute(new ControlWorker(helloPort, "Rob", startSignal, controlDoneSignal));
executor.execute(new HelloWorker(helloPort, "Rob", "Davidson", startSignal, helloDoneSignal));
executor.execute(new ControlWorker(helloPort, "James", startSignal, controlDoneSignal));
executor.execute(new HelloWorker(helloPort, "James", "ServiceMix", startSignal, helloDoneSignal));
startSignal.countDown();
controlDoneSignal.await(100, TimeUnit.SECONDS);
helloDoneSignal.await(100, TimeUnit.SECONDS);
executor.shutdownNow();
assertEquals("Not all invocations have been resumed", 0, controlDoneSignal.getCount());
assertEquals("Not all invocations have completed", 0, helloDoneSignal.getCount());
helloPort.sayHi("Dan1", "to:100");
helloPort.sayHi("Dan2", "to:100");
helloPort.sayHi("Dan3", "to:100");
}
/**
* This test tests the CheckedQueue.add method. It creates a queue of one
* {@code String}, gets the checked queue, and attempt to add an Integer to
* the checked queue.
*/
@Test(expectedExceptions = ClassCastException.class)
public void testAddFail2() {
ArrayBlockingQueue<String> abq = new ArrayBlockingQueue(1);
Queue q = Collections.checkedQueue(abq, String.class);
q.add(0);
}
public MetricTrackingWindmillServerStub(
WindmillServerStub server, MemoryMonitor gcThrashingMonitor, boolean useStreamingRequests) {
this.server = server;
this.gcThrashingMonitor = gcThrashingMonitor;
this.readQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
this.readPool = new ArrayList<>(NUM_THREADS);
this.useStreamingRequests = useStreamingRequests;
}
/**
* This test tests the CheckedQueue.add method. It creates a queue of one
* {@code String}, gets the checked queue, and attempt to add an Integer to
* the checked queue.
*/
@Test(expectedExceptions = ClassCastException.class)
public void testAddFail2() {
ArrayBlockingQueue<String> abq = new ArrayBlockingQueue(1);
Queue q = Collections.checkedQueue(abq, String.class);
q.add(0);
}
public DataPointQueue(int batchPutBufferSize, int multiFieldBatchPutBufferSize, int waitCloseTimeLimit, boolean backpressure) {
if (batchPutBufferSize <= 0) {
batchPutBufferSize = 1;
}
if (multiFieldBatchPutBufferSize <= 0) {
multiFieldBatchPutBufferSize = 1;
}
this.pointQueue = new ArrayBlockingQueue<Point>(batchPutBufferSize);
this.multiFieldPointQueue = new ArrayBlockingQueue<MultiFieldPoint>(multiFieldBatchPutBufferSize);
this.waitCloseTimeLimit = waitCloseTimeLimit;
this.backpressure = backpressure;
}
/**
* timed invokeAll(empty collection) returns empty collection
*/
public void testTimedInvokeAll2() throws Exception {
final ExecutorService e =
new CustomTPE(2, 2,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(e)) {
List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>(),
MEDIUM_DELAY_MS, MILLISECONDS);
assertTrue(r.isEmpty());
}
}
public AsyncEventHandler(int queueCapacity,
int numWorkers,
int maxConnections,
int connectionsPerRoute,
int validateAfter,
long closeTimeout,
TimeUnit closeTimeoutUnit) {
queueCapacity = validateInput("queueCapacity", queueCapacity, DEFAULT_QUEUE_CAPACITY);
numWorkers = validateInput("numWorkers", numWorkers, DEFAULT_NUM_WORKERS);
maxConnections = validateInput("maxConnections", maxConnections, DEFAULT_MAX_CONNECTIONS);
connectionsPerRoute = validateInput("connectionsPerRoute", connectionsPerRoute, DEFAULT_MAX_PER_ROUTE);
validateAfter = validateInput("validateAfter", validateAfter, DEFAULT_VALIDATE_AFTER_INACTIVITY);
this.httpClient = OptimizelyHttpClient.builder()
.withMaxTotalConnections(maxConnections)
.withMaxPerRoute(connectionsPerRoute)
.withValidateAfterInactivity(validateAfter)
.build();
this.workerExecutor = new ThreadPoolExecutor(numWorkers, numWorkers,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(queueCapacity),
new NamedThreadFactory("optimizely-event-dispatcher-thread-%s", true));
this.closeTimeout = closeTimeout;
this.closeTimeoutUnit = closeTimeoutUnit;
}
private WorkerTaskWriter newWorkerTaskWriter(PluginWriterParameter writerParameter, ArrayBlockingQueue queue) {
return new WorkerTaskWriter(
workerConfig(),
taskInfo(),
new TaskStatusListenerAdapter() {
@Override
public void onFailure(TaskStatusEvent event, Throwable cause) {
statusListener.onFailure(event, cause);
}
},
queue,
new WorkerTaskWriterContext(workerTaskContext, writerParameter)
);
}
/**
* completed submit of runnable returns successfully
*/
public void testSubmitRunnable() throws Exception {
final ExecutorService e =
new CustomTPE(2, 2,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(e)) {
Future<?> future = e.submit(new NoOpRunnable());
future.get();
assertTrue(future.isDone());
}
}
@Override
public boolean handleReceivedChannelMessage(ChannelMessage currentMessage) {
int src = router.mainTaskOfExecutor(instancePlan.getThisWorker(),
CommunicationContext.DEFAULT_DESTINATION);
RoutingParameters routingParameters;
if (routingParametersCache.containsKey(src)) {
routingParameters = routingParametersCache.get(src);
} else {
routingParameters = sendRoutingParameters(src, CommunicationContext.DEFAULT_DESTINATION);
}
ArrayBlockingQueue<OutMessage> pendingSendMessages = pendingSendMessagesPerSource.get(src);
// create a send message to keep track of the serialization at the initial stage
// the sub-edge is 0
int di = -1;
if (routingParameters.getExternalRoutes().size() > 0) {
di = routingParameters.getDestinationId();
}
OutMessage sendMessage = new OutMessage(src,
currentMessage.getHeader().getEdge(),
di, CommunicationContext.DEFAULT_DESTINATION, currentMessage.getHeader().getFlags(),
routingParameters.getInternalRoutes(),
routingParameters.getExternalRoutes(), dataType, this.keyType, delegate,
CommunicationContext.EMPTY_OBJECT);
sendMessage.getChannelMessages().offer(currentMessage);
// we need to update here
if (!currentMessage.isOutCountUpdated()) {
currentMessage.incrementRefCount(routingParameters.getExternalRoutes().size());
currentMessage.setOutCountUpdated(true);
}
// this is a complete message
sendMessage.setSendState(OutMessage.SendState.SERIALIZED);
// now try to put this into pending
return pendingSendMessages.offer(sendMessage);
}
public static ThreadPoolExecutor getThreadPoolExecutor(String name,
int corePoolSize,
int maximumPoolSize,
long keepAliveTimeInSec) {
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(name)
.setDaemon(true)
.build();
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTimeInSec,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(maximumPoolSize), threadFactory);
executor.allowCoreThreadTimeOut(true);
executor.setRejectedExecutionHandler((r, e) -> log.debug("RejectedExecutionHandler called"));
return executor;
}
/**
* remove(null), contains(null) always return false
*/
public void testNeverContainsNull() {
Collection<?>[] qs = {
new ArrayBlockingQueue<Object>(10),
populatedQueue(2),
};
for (Collection<?> q : qs) {
assertFalse(q.contains(null));
assertFalse(q.remove(null));
}
}
/**
* Constructor throws if corePoolSize argument is less than zero
*/
public void testConstructor6() {
try {
new CustomTPE(-1, 1, 1L, SECONDS,
new ArrayBlockingQueue<Runnable>(10),
new SimpleThreadFactory());
shouldThrow();
} catch (IllegalArgumentException success) {}
}
public RoundRobinDispatcher(
final int quantum,
final int poolSize) {
if (quantum < 10 || poolSize < 1) {
throw new IllegalArgumentException();
}
this.timeQuantum = quantum;
this.pool = new WorkersPool(poolSize);
workingQueue = new ArrayBlockingQueue<Worker>(poolSize);
waitingQueue = new LinkedList<Process>();
proc2Worker = new MutualHashMap<Process, Worker>();
factor = LoadFactor.FULL;
}