下面列出了怎么用java.util.concurrent.SynchronousQueue的API类实例代码及写法,或者点击链接到github查看源代码。
public BlockingQueue<Runnable> create(final Options options, final String prefix, final int queueSize) {
switch (this) {
case ARRAY: {
return new ArrayBlockingQueue<>(queueSize > 0 ? queueSize : 1);
}
case LINKED: {
return new LinkedBlockingQueue<>(queueSize > 0 ? queueSize : 1);
}
case PRIORITY: {
return new PriorityBlockingQueue<>();
}
case SYNCHRONOUS: {
return new SynchronousQueue<>(options.get(prefix + ".QueueFair", false));
}
default: {
// The Options class will throw an error if the user supplies an unknown enum string
// The only way we can reach this is if we add a new QueueType element and forget to
// implement it in the above switch statement.
throw new IllegalArgumentException("Unknown QueueType type: " + this);
}
}
}
@SuppressWarnings("unchecked")
protected static <T> Queue<T> createSimilarQueue(Queue<T> orig) {
if (orig instanceof ArrayBlockingQueue) {
ArrayBlockingQueue queue = (ArrayBlockingQueue) orig;
return new ArrayBlockingQueue<T>(queue.size() + queue.remainingCapacity());
} else if (orig instanceof ArrayDeque) {
return new ArrayDeque<T>();
} else if (orig instanceof ConcurrentLinkedQueue) {
return new ConcurrentLinkedQueue<T>();
} else if (orig instanceof DelayQueue) {
return new DelayQueue();
} else if (orig instanceof LinkedBlockingDeque) {
return new LinkedBlockingDeque<T>();
} else if (orig instanceof LinkedBlockingQueue) {
return new LinkedBlockingQueue<T>();
} else if (orig instanceof PriorityBlockingQueue) {
return new PriorityBlockingQueue<T>();
} else if (orig instanceof PriorityQueue) {
return new PriorityQueue<T>(11, ((PriorityQueue) orig).comparator());
} else if (orig instanceof SynchronousQueue) {
return new SynchronousQueue<T>();
} else {
return new LinkedList<T>();
}
}
/**
* drainTo(c, n) empties up to n elements of queue into c
*/
public void testDrainToN() throws InterruptedException {
final SynchronousQueue q = new SynchronousQueue();
Thread t1 = newStartedThread(new CheckedRunnable() {
public void realRun() throws InterruptedException {
q.put(one);
}});
Thread t2 = newStartedThread(new CheckedRunnable() {
public void realRun() throws InterruptedException {
q.put(two);
}});
ArrayList l = new ArrayList();
int drained;
while ((drained = q.drainTo(l, 1)) == 0) Thread.yield();
assertEquals(1, drained);
assertEquals(1, l.size());
while ((drained = q.drainTo(l, 1)) == 0) Thread.yield();
assertEquals(1, drained);
assertEquals(2, l.size());
assertTrue(l.contains(one));
assertTrue(l.contains(two));
awaitTermination(t1);
awaitTermination(t2);
}
public static ThreadPoolExecutor newSingleDaemonThreadExecutor(long keepAliveTime) {
return new ThreadPoolExecutor(1,
1,
keepAliveTime,
TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
DaemonThreadFactory.daemonThreadFactory,
(r, exe) -> {
if (!exe.isShutdown()) {
try {
exe.getQueue().put(r);
} catch (InterruptedException e) {
// ignore
}
}
});
}
@SuppressWarnings("unused")
public static void main(String[] args) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());//Threads.newDaemonThreadFactory("htable"));
for (int i = 0; i < Integer.MAX_VALUE; ++i) {
System.out.println("index: " + i);
Future<?> future = pool.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
void test(String[] args) throws Throwable {
testEmptyCollection(emptyList());
testEmptyCollection(emptySet());
testEmptyCollection(new SynchronousQueue<Object>());
testEmptyMap(emptyMap());
Hashtable<?,?> emptyTable = new Hashtable<>();
testEmptyEnumeration(emptyTable.keys());
testEmptyEnumeration(emptyTable.elements());
testEmptyIterator(emptyTable.keySet().iterator());
testEmptyIterator(emptyTable.values().iterator());
testEmptyIterator(emptyTable.entrySet().iterator());
final Enumeration<EmptyIterator> finalEmptyTyped = emptyEnumeration();
testEmptyEnumeration(finalEmptyTyped);
final Enumeration<?> finalEmptyAbstract = emptyEnumeration();
testEmptyEnumeration(finalEmptyAbstract);
testEmptyIterator(emptyIterator());
}
public void testPollInExecutor(boolean fair) {
final SynchronousQueue q = new SynchronousQueue(fair);
final CheckedBarrier threadsStarted = new CheckedBarrier(2);
final ExecutorService executor = Executors.newFixedThreadPool(2);
try (PoolCleaner cleaner = cleaner(executor)) {
executor.execute(new CheckedRunnable() {
public void realRun() throws InterruptedException {
assertNull(q.poll());
threadsStarted.await();
assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
assertTrue(q.isEmpty());
}});
executor.execute(new CheckedRunnable() {
public void realRun() throws InterruptedException {
threadsStarted.await();
q.put(one);
}});
}
}
@SuppressWarnings("deprecation") // IS_APP_ENGINE is deprecated for internal use only.
@Override
public ExecutorService create(PipelineOptions options) {
ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
threadFactoryBuilder.setThreadFactory(MoreExecutors.platformThreadFactory());
threadFactoryBuilder.setDaemon(true);
/* The SDK requires an unbounded thread pool because a step may create X writers
* each requiring their own thread to perform the writes otherwise a writer may
* block causing deadlock for the step because the writers buffer is full.
* Also, the MapTaskExecutor launches the steps in reverse order and completes
* them in forward order thus requiring enough threads so that each step's writers
* can be active.
*/
return new ThreadPoolExecutor(
0,
Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads.
Long.MAX_VALUE,
TimeUnit.NANOSECONDS, // Keep non-core threads alive forever.
new SynchronousQueue<>(),
threadFactoryBuilder.build());
}
@Test
public void testThreadPoolInjection() throws Exception {
ServerLocator serverLocator = new ServerLocatorImpl(false);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
ScheduledThreadPoolExecutor scheduledThreadPool = new ScheduledThreadPoolExecutor(1);
serverLocator.setThreadPools(threadPool, scheduledThreadPool);
Field threadPoolField = ServerLocatorImpl.class.getDeclaredField("threadPool");
Field scheduledThreadPoolField = ServerLocatorImpl.class.getDeclaredField("scheduledThreadPool");
serverLocator.initialize();
threadPoolField.setAccessible(true);
scheduledThreadPoolField.setAccessible(true);
ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadPoolField.get(serverLocator);
ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) scheduledThreadPoolField.get(serverLocator);
assertEquals(threadPool, tpe);
assertEquals(scheduledThreadPool, stpe);
}
/**
* a deserialized serialized queue is usable
*/
public void testSerialization() {
final SynchronousQueue x = new SynchronousQueue();
final SynchronousQueue y = new SynchronousQueue(false);
final SynchronousQueue z = new SynchronousQueue(true);
assertSerialEquals(x, y);
assertNotSerialEquals(x, z);
SynchronousQueue[] qs = { x, y, z };
for (SynchronousQueue q : qs) {
SynchronousQueue clone = serialClone(q);
assertNotSame(q, clone);
assertSerialEquals(q, clone);
assertTrue(clone.isEmpty());
assertEquals(0, clone.size());
assertEquals(0, clone.remainingCapacity());
assertFalse(clone.offer(zero));
}
}
private static BlockingQueue<Runnable> createBlockingQueue() {
String queue = properties.getString(ThunderConstant.THREAD_POOL_QUEUE_ATTRIBUTE_NAME);
ThreadQueueType queueType = ThreadQueueType.fromString(queue);
int queueCapacity = ThunderConstant.CPUS * properties.getInteger(ThunderConstant.THREAD_POOL_QUEUE_CAPACITY_ATTRIBUTE_NAME);
switch (queueType) {
case LINKED_BLOCKING_QUEUE:
return new LinkedBlockingQueue<Runnable>(queueCapacity);
case ARRAY_BLOCKING_QUEUE:
return new ArrayBlockingQueue<Runnable>(queueCapacity);
case SYNCHRONOUS_QUEUE:
return new SynchronousQueue<Runnable>();
}
return null;
}
public static ThreadPoolExecutor newFixedDaemonThreadPool(int nThreads, long keepAliveTime) {
return new ThreadPoolExecutor(nThreads,
nThreads,
keepAliveTime,
TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
DaemonThreadFactory.daemonThreadFactory,
(r, exe) -> {
if (!exe.isShutdown()) {
try {
exe.getQueue().put(r);
} catch (InterruptedException e) {
// ignore
}
}
}
);
}
public void testIteratorRemove(boolean fair) {
final SynchronousQueue q = new SynchronousQueue(fair);
Iterator it = q.iterator();
try {
it.remove();
shouldThrow();
} catch (IllegalStateException success) {}
}
/**
* Wraps an {@link ThreadPoolExecutor}.
* Max pool size is 256, pool thread timeout after 60 seconds, and core pool size is 32 when queueSize >= 0.
* @param queueSize can be -1 for using an unbounded {@link LinkedBlockingQueue}, 0 for using a
* {@link SynchronousQueue}, greater than 0 for using a {@link ArrayBlockingQueue} of the given size.
*/
public ExecutorThreadPool(int queueSize)
{
this(queueSize < 0 ? new ThreadPoolExecutor(256, 256, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) :
queueSize == 0 ? new ThreadPoolExecutor(32, 256, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()) :
new ThreadPoolExecutor(32, 256, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize)));
}
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
public IndexManager(IndexServer indexServer, ClusterStatus clusterStatus, BlurFilterCache filterCache,
int maxHeapPerRowFetch, int fetchCount, int threadCount, int mutateThreadCount, int facetThreadCount,
DeepPagingCache deepPagingCache, MemoryAllocationWatcher memoryAllocationWatcher, QueryStatusManager statusManager) {
_statusManager = statusManager;
_memoryAllocationWatcher = memoryAllocationWatcher;
_deepPagingCache = deepPagingCache;
_indexServer = indexServer;
_clusterStatus = clusterStatus;
_filterCache = filterCache;
MetricName metricName1 = new MetricName(ORG_APACHE_BLUR, BLUR, "External Queries/s");
MetricName metricName2 = new MetricName(ORG_APACHE_BLUR, BLUR, "Internal Queries/s");
MetricName metricName3 = new MetricName(ORG_APACHE_BLUR, BLUR, "Fetch Timer");
_queriesExternalMeter = Metrics.newMeter(metricName1, "External Queries/s", TimeUnit.SECONDS);
_queriesInternalMeter = Metrics.newMeter(metricName2, "Internal Queries/s", TimeUnit.SECONDS);
_fetchTimer = Metrics.newTimer(metricName3, TimeUnit.MICROSECONDS, TimeUnit.SECONDS);
if (threadCount == 0) {
throw new RuntimeException("Thread Count cannot be 0.");
}
_threadCount = threadCount;
if (mutateThreadCount == 0) {
throw new RuntimeException("Mutate Thread Count cannot be 0.");
}
_mutateThreadCount = mutateThreadCount;
_fetchCount = fetchCount;
_maxHeapPerRowFetch = maxHeapPerRowFetch;
_executor = Executors.newThreadPool("index-manager", _threadCount);
_mutateExecutor = Executors.newThreadPool("index-manager-mutate", _mutateThreadCount);
if (facetThreadCount < 1) {
_facetExecutor = null;
} else {
_facetExecutor = Executors.newThreadPool(new SynchronousQueue<Runnable>(), "facet-execution", facetThreadCount);
}
LOG.info("Init Complete");
}
public AsyncPool(Server server, int size) {
this.currentThread = new AtomicInteger();
this.size = size;
this.pool = new ThreadPoolExecutor(size, Integer.MAX_VALUE,
60, TimeUnit.MILLISECONDS, new SynchronousQueue<>(),
runnable -> new Thread(runnable) {{
setDaemon(true);
setName(String.format("Nukkit Asynchronous Task Handler #%s", currentThread.incrementAndGet()));
}}
);
this.server = server;
}
/** Starts up the compiler thread and waits for it to return the processing environment. */
protected void start() {
checkState(executorService == null, "Cannot restart a Model");
executorService = Executors.newSingleThreadExecutor();
requestQueue = new SynchronousQueue<>();
CompilerRunner compilerRunner = new CompilerRunner();
executorService.execute(compilerRunner);
processingEnv = compilerRunner.getProcessingEnvironment();
}
protected AdbMessageManager(AdbConnection conn) {
this.openStreams = new HashMap<>();
this.conn = conn;
this.msgQueue = new LinkedBlockingQueue<>();
// 三个线程处理消息
executorService = new ThreadPoolExecutor(5, Integer.MAX_VALUE,
0, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>());
executorService.execute(getMessageHandler());
executorService.execute(getMessageHandler());
executorService.execute(getMessageHandler());
}
public Executor getExecutor(AppConf conf) {
String name = conf.get(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int cores = conf.getInt(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
int threads = conf.getInt(Constants.THREADS_KEY, Integer.MAX_VALUE);
int queues = conf.getInt(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
int alive = conf.getInt(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name));
}
/**
* Sets up lifecycle monitoring, and argument registry.
*
* <p>Subclasses must call up to onCreate(). This onCreate method does not call start() it is the
* subclasses responsibility to call start if it desires.
*/
@Override
public void onCreate(Bundle arguments) {
Log.i(TAG, "Instrumentation started!");
logUncaughtExceptions();
// Multidex must be installed early otherwise we could call into code that has
// landed in a different dex split.
installMultidex();
InstrumentationRegistry.registerInstance(this, arguments);
androidx.test.InstrumentationRegistry.registerInstance(this, arguments);
ActivityLifecycleMonitorRegistry.registerInstance(lifecycleMonitor);
ApplicationLifecycleMonitorRegistry.registerInstance(applicationMonitor);
IntentMonitorRegistry.registerInstance(intentMonitor);
handlerForMainLooper = new Handler(Looper.getMainLooper());
final int corePoolSize = 0;
final long keepAliveTime = 0L;
executorService =
new ThreadPoolExecutor(
corePoolSize,
Integer.MAX_VALUE,
keepAliveTime,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread thread = Executors.defaultThreadFactory().newThread(runnable);
thread.setName(MonitoringInstrumentation.class.getSimpleName());
return thread;
}
});
Looper.myQueue().addIdleHandler(idleHandler);
super.onCreate(arguments);
specifyDexMakerCacheProperty();
setupDexmakerClassloader();
useDefaultInterceptingActivityFactory();
}
SelectAccount(Account[] accounts, SynchronousQueue<String> queue) {
this.queue = queue;
accountNames = new String[accounts.length];
for (int i = 0; i < accounts.length; i++) {
accountNames[i] = accounts[i].name;
}
}
/**
* Instantiates a new async executor service impl.
*/
public AsyncExecutorServiceImpl() {
super(MIN_THREAD_POOL_SIZE, MAX_THREAD_POOL_SIZE, KEEP_ALIVE_TIME,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new CustomizableThreadFactory());
ThreadFactory factory = this.getThreadFactory();
if (factory instanceof CustomizableThreadFactory) {
CustomizableThreadFactory customizableThreadFactory = (CustomizableThreadFactory) factory;
customizableThreadFactory
.setThreadNamePrefix("AnkushProgressAwareThread_");
customizableThreadFactory.setDaemon(true);
}
}
private static BlockingQueue<Runnable> initQ(BlockingQueue<Runnable> q) {
if (q instanceof SynchronousQueue || q instanceof SynchronousQueueNoSpin) {
return q;
} else {
return new SynchronousQueueNoSpin<Runnable>();
}
}
/**
* 固定大小线程池,无队列
*
* @param corePoolSize 初始化线程池
* @return the thread pool executor
*/
public static ThreadPoolExecutor newFixedThreadPool(int corePoolSize) {
return new ThreadPoolExecutor(corePoolSize,
corePoolSize,
0,
TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>());
}
/**
* Returns a new {@link ThreadPoolExecutor} to create runtime worker threads.
*
* @return a new {@link ThreadPoolExecutor} for runtime worker threads
*/
static ThreadPoolExecutor createWorkerThreadPoolExecutor() {
String name = "runtimeworker-" + runtimeWorkerCount.incrementAndGet();
ThreadPoolExecutor executor = new ThreadPoolExecutor(WORKER_THREAD_CORE_SIZE, WORKER_THREAD_POOL_SIZE,
WORKER_THREAD_POOL_TTL, TimeUnit.SECONDS, new SynchronousQueue<>(),
new RuntimeWorkerThreadFactory(name));
executor.allowCoreThreadTimeOut(true);
return executor;
}
Db(String name, @Nullable File dbFile, int numWorkers, long walModeCheckpointMs) {
this.name = name;
this.walModeCheckpointNs = TimeUnit.NANOSECONDS.convert(walModeCheckpointMs, TimeUnit.MILLISECONDS);
this.dbFile = dbFile;
this.numWorkers = numWorkers;
this.queue = new SynchronousQueue<>();
this.checkpointer = (walModeCheckpointMs > 0) ? new DbCheckpointer() : null;
}
public void testAddAll_self(boolean fair) {
SynchronousQueue q = new SynchronousQueue(fair);
try {
q.addAll(q);
shouldThrow();
} catch (IllegalArgumentException success) {}
}
@Test
public void interruptReceive() throws Exception {
SynchronousQueue<MessageReplyConsumerBundle> queue = new SynchronousQueue<>();
MessageReceiver messageReceiver = new MessageReceiverImpl(queue);
PubsubMessage message = PubsubMessage.newBuilder().setMessageId("1234").build();
AckReplyConsumer consumer = mock(AckReplyConsumer.class);
Thread t = new Thread(() -> messageReceiver.receiveMessage(message, consumer));
t.start();
t.interrupt();
ThreadUtil.sleep(50);
verify(consumer, times(1)).nack();
}
@Test
void emptyBoundedQueueEqualsNoQueue() {
final ThreadPoolExecutor executor = ThreadPoolExecutors.builder()
.boundedQueue(0)
.fixedSize(1)
.keepAlive(Duration.ofMinutes(1))
.build();
assertThat(executor.getCorePoolSize(), is(1));
assertThat(executor.getMaximumPoolSize(), is(1));
assertThat(executor.getQueue(), is(instanceOf(SynchronousQueue.class)));
assertThat(executor.allowsCoreThreadTimeOut(), is(false));
}