下面列出了java.util.AbstractSet#java.util.concurrent.LinkedBlockingQueue 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public ThreadPoolExecutor build() {
BlockingQueue<Runnable> queue = null;
if (queueSize < 1) {
queue = new LinkedBlockingQueue<Runnable>();
} else {
queue = new ArrayBlockingQueue<Runnable>(queueSize);
}
threadFactory = createThreadFactory(threadFactory, threadNamePrefix, daemon);
if (rejectHandler == null) {
rejectHandler = defaultRejectHandler;
}
return new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, queue, threadFactory,
rejectHandler);
}
@Override
public StreamsResultSet readCurrent() {
BlockingQueue<StreamsDatum> batch = new LinkedBlockingQueue<>();
int batchCount = 0;
while (!this.datumQueue.isEmpty() && batchCount < MAX_BATCH_SIZE) {
StreamsDatum datum = ComponentUtils.pollWhileNotEmpty(this.datumQueue);
if (datum != null) {
++batchCount;
ComponentUtils.offerUntilSuccess(datum, batch);
}
}
boolean pullIsEmpty = batch.isEmpty() && this.datumQueue.isEmpty() && this.executor.isTerminated();
this.isComplete.set(this.previousPullWasEmpty && pullIsEmpty);
this.previousPullWasEmpty = pullIsEmpty;
return new StreamsResultSet(batch);
}
@Override
public Map<String, String> startAndWaitForData () throws IOException, InterruptedException
{
InetSocketAddress addr = new InetSocketAddress(port);
BlockingQueue<Map<String, String> > blockingQueue = new LinkedBlockingQueue<Map<String, String> >();
synchronized (lock)
{
sharedQueue = blockingQueue ;
server = HttpServer.create(addr, 0);
server.createContext("/", new HandlerMapParameter(blockingQueue));
httpThreadPool = Executors.newCachedThreadPool() ;
server.setExecutor(httpThreadPool);
server.start();
}
return blockingQueue.poll(10 * 60, TimeUnit.SECONDS);
}
/**
* Initialize the Executor that handles listener events. Only used by
* non-primary gateways
*/
private void initializeListenerExecutor()
{
// Create the ThreadGroups
final ThreadGroup loggerGroup = LogWriterImpl.createThreadGroup(
"Gateway Listener Group", getLogger());
// Create the Executor
ThreadFactory tf = new ThreadFactory() {
public Thread newThread(Runnable command) {
Thread thread = new Thread(loggerGroup, command, "Queued Gateway Listener Thread");
thread.setDaemon(true);
return thread;
}
};
LinkedBlockingQueue q = new LinkedBlockingQueue();
this._executor = new ThreadPoolExecutor(1, 1/*max unused*/,
120, TimeUnit.SECONDS, q, tf);
}
@Before
public void setup() {
eventFactory = new TestEventHolderFactory();
channelHandlerFactory = new LumberjackSocketChannelHandlerFactory<>();
byteBuffers = new LinkedBlockingQueue<>();
byteBuffers.add(ByteBuffer.allocate(4096));
events = new LinkedBlockingQueue<>();
logger = Mockito.mock(ComponentLog.class);
maxConnections = 1;
sslContext = null;
charset = StandardCharsets.UTF_8;
dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBuffers, events, logger,
maxConnections, sslContext, charset);
}
/**
*
* @param corePoolSize 最小线程数,包括空闲线程
* @param maxPoolSize 最大线程数
* @param keepAliveTime 当线程数大于核心时,终止多余的空闲线程等待新任务的最长时间
* @param cacheSize 执行队列大小
* @param prefix 线程池前缀名称
*/
public HandlerExecutor(int corePoolSize, int maxPoolSize, int keepAliveTime, int cacheSize, String prefix) {
TimeUnit unit = TimeUnit.MINUTES;
/**
* 任务队列
*/
LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
/**
* 队例满到无法接受新任务时。直接抛弃
*/
RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardPolicy();
if (prefix == null) {
prefix = "";
}
ThreadFactory threadFactory = new HandlerThreadFactory(prefix);
pool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Test(timeout = 60000)
public void testWatchIndefinitely() throws Exception {
TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
ownerships.add(new ServerLoad("server1"));
final LinkedBlockingQueue<TreeSet<ServerLoad>> serverLoadNotifications =
new LinkedBlockingQueue<TreeSet<ServerLoad>>();
PlacementStateManager.PlacementCallback callback = new PlacementStateManager.PlacementCallback() {
@Override
public void callback(TreeSet<ServerLoad> serverLoads) {
serverLoadNotifications.add(serverLoads);
}
};
zkPlacementStateManager.saveOwnership(ownerships); // need to initialize the zk path before watching
zkPlacementStateManager.watch(callback);
// cannot verify the callback here as it may call before the verify is called
zkPlacementStateManager.saveOwnership(ownerships);
assertEquals(ownerships, waitForServerLoadsNotificationAsc(serverLoadNotifications, 1));
ServerLoad server2 = new ServerLoad("server2");
server2.addStream(new StreamLoad("hella-important-stream", 415));
ownerships.add(server2);
zkPlacementStateManager.saveOwnership(ownerships);
assertEquals(ownerships, waitForServerLoadsNotificationAsc(serverLoadNotifications, 2));
}
private void newBinding() {
requestQueue = new LinkedBlockingQueue<>();
responseQueue = new LinkedBlockingQueue<>();
final Flow<HttpRequest, HttpResponse, NotUsed> handler =
Flow.fromGraph(KillSwitches.<HttpRequest>single())
.mapAsync(1, request -> {
requestQueue.offer(request);
return responseQueue.take();
})
.mapMaterializedValue(killSwitch -> {
Objects.requireNonNull(killSwitchTrigger.peek())
.thenAccept(_void -> killSwitch.shutdown());
return NotUsed.getInstance();
});
binding = Http.get(actorSystem).bindAndHandle(handler, ConnectHttp.toHost("127.0.0.1", 0), mat)
.toCompletableFuture()
.join();
}
/**
* Create a PeriodicNotificationApplication.
* @param conf - Configuration object that specifies the parameters needed to create the application
* @return PeriodicNotificationApplication to periodically poll Rya Fluo for new results
* @throws PeriodicApplicationException
*/
public static PeriodicNotificationApplication getPeriodicApplication(final PeriodicNotificationApplicationConfiguration conf) throws PeriodicApplicationException {
final Properties kafkaConsumerProps = getKafkaConsumerProperties(conf);
final Properties kafkaProducerProps = getKafkaProducerProperties(conf);
final BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
final BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>();
final BlockingQueue<BindingSetRecord> bindingSets = new LinkedBlockingQueue<>();
FluoClient fluo = null;
try {
final PeriodicQueryResultStorage storage = getPeriodicQueryResultStorage(conf);
fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf);
final NotificationCoordinatorExecutor coordinator = getCoordinator(conf.getCoordinatorThreads(), notifications);
addRegisteredNotices(coordinator, fluo.newSnapshot());
final KafkaExporterExecutor exporter = getExporter(conf.getExporterThreads(), kafkaProducerProps, bindingSets);
final PeriodicQueryPrunerExecutor pruner = getPruner(storage, fluo, conf.getPrunerThreads(), bins);
final NotificationProcessorExecutor processor = getProcessor(storage, notifications, bins, bindingSets, conf.getProcessorThreads());
final KafkaNotificationProvider provider = getProvider(conf.getProducerThreads(), conf.getNotificationTopic(), coordinator, kafkaConsumerProps);
return PeriodicNotificationApplication.builder().setCoordinator(coordinator).setProvider(provider).setExporter(exporter)
.setProcessor(processor).setPruner(pruner).build();
} catch (AccumuloException | AccumuloSecurityException e) {
throw new PeriodicApplicationException(e.getMessage());
}
}
@Override
public synchronized void start(StartContext context) throws StartException {
final String namePattern = "External Management Request Threads -- %t";
final ThreadFactory threadFactory = doPrivileged(new PrivilegedAction<ThreadFactory>() {
public ThreadFactory run() {
return new JBossThreadFactory(threadGroup, Boolean.FALSE, null, namePattern, null, null);
}
});
int poolSize = getPoolSize();
if (EnhancedQueueExecutor.DISABLE_HINT) {
final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(WORK_QUEUE_SIZE);
executorService = new ThreadPoolExecutor(poolSize, poolSize, 60L, TimeUnit.SECONDS,
workQueue, threadFactory);
} else {
executorService = new EnhancedQueueExecutor.Builder()
.setCorePoolSize(poolSize)
.setMaximumPoolSize(poolSize)
.setKeepAliveTime(60L, TimeUnit.SECONDS)
.setMaximumQueueSize(WORK_QUEUE_SIZE)
.setThreadFactory(threadFactory)
.build();
}
}
private void start(final HttpConfig httpConfig) {
// init RestTemplate
OkHttp3ClientHttpRequestFactory factory = new OkHttp3ClientHttpRequestFactory();
factory.setConnectTimeout((int) this.connectionTimeout.toMillis());
factory.setReadTimeout((int) HttpConstants.CLIENT_POLLING_READ_TIMEOUT);
this.httpClient = new RestTemplate(factory);
// It could be initialized multiple times, so you need to control that.
if (RUNNING.compareAndSet(false, true)) {
// fetch all group configs.
this.fetchGroupConfig(ConfigGroupEnum.values());
// one thread for listener, another one for fetch configuration data.
this.executor = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
SoulThreadFactory.create("http-long-polling", true));
// start long polling.
this.executor.execute(new HttpLongPollingTask());
} else {
log.info("soul http long polling was started, executor=[{}]", executor);
}
}
@Test
public void onConnectedByCustomInterface() throws URISyntaxException, InterruptedException {
final BlockingQueue<String> events = new LinkedBlockingQueue<String>();
final Consumer consumer = new Consumer(new URI("ws://example.com:28080"));
final Channel channel = new Channel("CommentsChannel");
final Subscription subscription = consumer.getSubscriptions().create(channel, CustomSubscription.class);
final Subscription returned = subscription.onConnected(new Subscription.ConnectedCallback() {
@Override
public void call() {
events.offer("onConnected");
}
});
assertThat(returned, is(theInstance(subscription)));
consumer.getSubscriptions().notifyConnected(subscription.getIdentifier());
assertThat(events.take(), is("onConnected"));
}
public static LinkedBlockingQueue<Object> transformToBinary(LinkedBlockingQueue<Object> output) {
LinkedBlockingQueue<Object> ret = new LinkedBlockingQueue<>();
for (Object o : output) {
BaseRow row = ((StreamRecord<BaseRow>) o).getValue();
BinaryRow binaryRow;
if (row.isNullAt(0)) {
binaryRow = newRow(row.getString(2).toString(), row.getString(3) + "null");
} else if (row.isNullAt(2)) {
binaryRow = newRow(row.getString(0).toString(), row.getString(1) + "null");
} else {
String value1 = row.getString(1).toString();
String value2 = row.getString(3).toString();
binaryRow = newRow(row.getString(0).toString(), value1 + value2);
}
ret.add(new StreamRecord(binaryRow));
}
return ret;
}
@Test
public void shouldReadFromQueueForeverAndPushToSink() throws InterruptedException {
BlockingQueue<Records> queue = new LinkedBlockingQueue<>();
BqQueueWorker worker = new BqQueueWorker("bq-worker", successfulSink, queueConfig, committer, queue, workerState);
Records messages2 = mock(Records.class);
when(committer.acknowledge(any())).thenReturn(true);
queue.put(messages);
queue.put(messages2);
Thread workerThread = new Thread(worker);
workerThread.start();
await().atMost(10, TimeUnit.SECONDS).until(() -> queue.isEmpty());
workerState.closeWorker();
workerThread.join();
verify(successfulSink).push(messages);
verify(successfulSink).push(messages2);
}
/**
* Creates a new instance of ServerCommunicationSystem
*/
public ServerCommunicationSystem(ServerViewController controller, ServiceReplica replica) throws Exception {
super("Server CS");
this.controller = controller;
inQueue = new LinkedBlockingQueue<SystemMessage>(controller.getStaticConf().getInQueueSize());
// create a new conf, with updated port number for servers
// TOMConfiguration serversConf = new TOMConfiguration(conf.getProcessId(),
// Configuration.getHomeDir(), "hosts.config");
// serversConf.increasePortNumber();
serversConn = new ServersCommunicationLayer(controller, inQueue, replica);
messageHandler = new MessageHandler(controller.getStaticConf().getHmacAlgorithm());
// ******* EDUARDO BEGIN **************//
// if (manager.isInCurrentView() || manager.isInInitView()) {
clientsConn = CommunicationSystemServerSideFactory.getCommunicationSystemServerSide(controller);
// }
// ******* EDUARDO END **************//
// start();
}
@Before
public void setup() {
eventFactory = new TestEventHolderFactory();
channelHandlerFactory = new BeatsSocketChannelHandlerFactory<>();
byteBuffers = new LinkedBlockingQueue<>();
byteBuffers.add(ByteBuffer.allocate(4096));
events = new LinkedBlockingQueue<>();
logger = Mockito.mock(ComponentLog.class);
maxConnections = 1;
sslContext = null;
charset = StandardCharsets.UTF_8;
dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBuffers, events, logger,
maxConnections, sslContext, charset);
}
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<>() :
(queues < 0 ? new LinkedBlockingQueue<>()
: new LinkedBlockingQueue<>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
public static void start() {
if (!flag) {
synchronized (lockObj) {
if (!flag) {
flag = true;
executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), SoaThreadFactory.create("HeartBeatSevice", true));
doHt();
}
}
}
}
/**
* 获取对应插件缓存还未执行加载的Intent
*/
public static LinkedBlockingQueue<IntentRequest> getCachedIntent(String pkgName) {
if (TextUtils.isEmpty(pkgName)) {
return null;
}
return sIntentCacheMap.get(pkgName);
}
/**
* creates a websocket with client role
*
* @param listener The listener for this instance
* @param draft The draft which should be used
*/
public WebSocketImpl(WebSocketListener listener, Draft draft) {
if (listener == null || (draft == null && role
== Role.SERVER))// socket can be null because we want do be able to create the object without already having a bound channel
{ throw new IllegalArgumentException("parameters must not be null"); }
this.outQueue = new LinkedBlockingQueue<ByteBuffer>();
inQueue = new LinkedBlockingQueue<ByteBuffer>();
this.wsl = listener;
this.role = Role.CLIENT;
if (draft != null) { this.draft = draft.copyInstance(); }
}
@Test
public void test_BlockingQueue() {
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(16);
for (int i = 0; i < 16; i++) {
queue.add("idx-" + i);
}
List<String> list = new ArrayList<>();
queue.drainTo(list, 8);
Assert.assertEquals(queue.size(), list.size());
}
public MobileCommProcessor(Context mContext, SystemVoiceMediator mediator, Handler handler) {
super(mContext, mediator);
this.mHandler = handler;
mAppConfig = (AppConfig) ((Service) mContext).getApplication();
tPools = new ThreadPoolExecutor(10, 20, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
phoneCallListener();
// 注册收件箱内容观察者
mContext.getContentResolver().registerContentObserver(Uri.parse(PhoneContactUtils.SMS_URI_INBOX),
true, new SmsObserver(handler));
// 注册联系人内容观察者
mContext.getContentResolver().registerContentObserver(ContactsContract.Contacts.CONTENT_URI,
true, new ContactObserver(handler));
}
public static ExecutorService getCoprocessorPool() {
if (coprocessorPool != null) {
return coprocessorPool;
}
synchronized (HBaseConnection.class) {
if (coprocessorPool != null) {
return coprocessorPool;
}
KylinConfig config = KylinConfig.getInstanceFromEnv();
// copy from HConnectionImplementation.getBatchPool()
int maxThreads = config.getHBaseMaxConnectionThreads();
int coreThreads = config.getHBaseCoreConnectionThreads();
long keepAliveTime = config.getHBaseConnectionThreadPoolAliveSeconds();
LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(maxThreads * 100);
ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, //
Threads.newDaemonThreadFactory("kylin-coproc-"));
tpe.allowCoreThreadTimeOut(true);
logger.info("Creating coprocessor thread pool with max of {}, core of {}", maxThreads, coreThreads);
coprocessorPool = tpe;
return coprocessorPool;
}
}
@Override
public boolean start() {
if (running.getAndSet(true)) {
return false;
}
else {
queue = new LinkedBlockingQueue<Runnable>();
executorService.submit(queueRunnable);
return true;
}
}
public DeletionTrackerImpl(Configuration conf) {
super(conf);
this.dagCleanupService = new ThreadPoolExecutor(0, conf.getInt(TezConfiguration.TEZ_AM_DAG_CLEANUP_THREAD_COUNT_LIMIT,
TezConfiguration.TEZ_AM_DAG_CLEANUP_THREAD_COUNT_LIMIT_DEFAULT), 10,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ShuffleDeleteTracker #%d").build());
}
@PostConstruct
public void init() {
this.isActive = new AtomicBoolean(true);
try {
this.metadataUpdates = new LinkedBlockingQueue<String>();
} finally {
this.isActive.set(false);
}
}
@Test(timeout = TIMEOUT)
public void removeWhenIdentifierIsNotUnique() throws IOException, InterruptedException {
final BlockingQueue<String> events = new LinkedBlockingQueue<String>();
final MockWebServer mockWebServer = new MockWebServer();
final MockResponse response = new MockResponse();
response.withWebSocketUpgrade(new DefaultWebSocketListener() {
@Override
public void onMessage(BufferedSource payload, WebSocket.PayloadType type) throws IOException {
events.offer("onMessage:" + payload.readUtf8());
payload.close();
}
});
mockWebServer.enqueue(response);
final Consumer consumer = new Consumer(mockWebServer.url("/").uri());
final Subscriptions subscriptions = consumer.getSubscriptions();
final Subscription subscription1 = subscriptions.create(new Channel("CommentsChannel"));
// Channel is same as subscription1
final Subscription subscription2 = subscriptions.create(new Channel("CommentsChannel"));
consumer.connect();
events.take(); // WebSocketListener#onMessage (subscribe)
events.take(); // WebSocketListener#onMessage (subscribe)
subscriptions.remove(subscription1);
assertThat(subscriptions.contains(subscription1), is(false));
assertThat(subscriptions.contains(subscription2), is(true));
assertThat(events.take(), is("onMessage:" + Command.unsubscribe(subscription1.getIdentifier()).toJson()));
mockWebServer.shutdown();
}
public RemoteQueryListener() {
final ThreadGroup queryThreadGroup = new ThreadGroup(
"Query Executor Thread Group");
ThreadFactory queryThreadFactory = new ThreadFactory() {
public Thread newThread(Runnable command)
{
return new Thread(queryThreadGroup, command, queryThreadGroup.getName()
+ " Query Executor Thread ");
}
};
queryExecutorPool = new ThreadPoolExecutor(MAX_THREADS, MAX_THREADS, 15,
TimeUnit.SECONDS, new LinkedBlockingQueue(), queryThreadFactory);
}
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
/**
* Create a new ConcurrentQueryLoader
*
* @param monitor the Monitor to load queries to
* @param threads the number of threads to use
* @param queueSize the size of the buffer to hold queries in
*/
public ConcurrentQueryLoader(Monitor monitor, int threads, int queueSize) {
this.monitor = monitor;
this.queue = new LinkedBlockingQueue<>(queueSize);
this.executor = Executors.newFixedThreadPool(threads, new NamedThreadFactory("loader"));
this.shutdownLatch = new CountDownLatch(threads);
for (int i = 0; i < threads; i++) {
this.executor.submit(new Worker(queueSize / threads));
}
}