下面列出了怎么用java.util.concurrent.BlockingQueue的API类实例代码及写法,或者点击链接到github查看源代码。
private void cleanupExpiredSockets() {
for (final BlockingQueue<EndpointConnection> connectionQueue : connectionQueueMap.values()) {
final List<EndpointConnection> connections = new ArrayList<>();
EndpointConnection connection;
while ((connection = connectionQueue.poll()) != null) {
// If the socket has not been used in 10 seconds, shut it down.
final long lastUsed = connection.getLastTimeUsed();
if (lastUsed < System.currentTimeMillis() - idleExpirationMillis) {
try {
connection.getSocketClientProtocol().shutdown(connection.getPeer());
} catch (final Exception e) {
logger.debug("Failed to shut down {} using {} due to {}",
connection.getSocketClientProtocol(), connection.getPeer(), e);
}
terminate(connection);
} else {
connections.add(connection);
}
}
connectionQueue.addAll(connections);
}
}
public void shutdown() {
shutdown = true;
taskExecutor.shutdown();
peerSelector.clear();
for (final EndpointConnection conn : activeConnections) {
conn.getPeer().getCommunicationsSession().interrupt();
}
for (final BlockingQueue<EndpointConnection> connectionQueue : connectionQueueMap.values()) {
EndpointConnection state;
while ((state = connectionQueue.poll()) != null) {
terminate(state);
}
}
}
private CommunicationWorker[] scheduleWaitFor(Map<Integer, INodeInstance> nodes) {
BlockingQueue<INodeInstance> tasks;
tasks = new ArrayBlockingQueue<>(nodes.size() * 2);
tasks.addAll(nodes.values());
CommunicationWorker[] workers = new CommunicationWorker[numThreads];
workers[0] = new CommunicationWorker(tasks);
doneSignal = new CountDownLatch(numThreads - 1);
for (int i = 1; i < numThreads; i++) {
workers[i] = new CommunicationWorker(tasks);
threads.submit(workers[i]);
}
return workers;
}
@Test
public void testPeek() {
final int cap = 10;
BlockingQueue<Integer> dbq = new PushPullBlockingQueue<Integer>(cap);
try {
Assert.assertNull(dbq.peek());
} catch(NoSuchElementException nsex) {
Assert.fail();
}
for(int i=0; i<cap; i++) {
dbq.offer(Integer.valueOf(i));
Assert.assertEquals(Integer.valueOf(0), dbq.peek());
}
for(int i=0; i<cap; i++) {
Assert.assertEquals(Integer.valueOf(i), dbq.peek());
dbq.poll(); // count up values checking peeks
}
}
public SocketChannelDispatcher(final EventFactory<E> eventFactory,
final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory,
final BlockingQueue<ByteBuffer> bufferPool,
final BlockingQueue<E> events,
final ComponentLog logger,
final int maxConnections,
final SSLContext sslContext,
final SslContextFactory.ClientAuth clientAuth,
final Charset charset) {
this.eventFactory = eventFactory;
this.handlerFactory = handlerFactory;
this.bufferPool = bufferPool;
this.events = events;
this.logger = logger;
this.maxConnections = maxConnections;
this.keyQueue = new LinkedBlockingQueue<>(maxConnections);
this.sslContext = sslContext;
this.clientAuth = clientAuth;
this.charset = charset;
if (bufferPool == null || bufferPool.size() == 0 || bufferPool.size() != maxConnections) {
throw new IllegalArgumentException(
"A pool of available ByteBuffers equal to the maximum number of connections is required");
}
}
@Test
public void testPeek() {
final int cap = 10;
BlockingQueue<Integer> dbq = new MPMCBlockingQueue<>(cap);
try {
Assert.assertNull(dbq.peek());
} catch(NoSuchElementException nsex) {
Assert.fail();
}
for(int i=0; i<cap; i++) {
dbq.offer(Integer.valueOf(i));
Assert.assertEquals(Integer.valueOf(0), dbq.peek());
}
for(int i=0; i<cap; i++) {
Assert.assertEquals(Integer.valueOf(i), dbq.peek());
dbq.poll(); // count up values checking peeks
}
}
private void testScheduleDelay( boolean shouldThrow ) throws ClassNotFoundException, InstantiationException, IllegalAccessException, InterruptedException,
BrokenBarrierException, TimeoutException
{
BlockingQueue<DaemonEntry> queue = new LinkedBlockingQueue<>( );
ExecutorService executor = Executors.newSingleThreadExecutor( );
DaemonScheduler scheduler = new DaemonScheduler( queue, executor );
try
{
DaemonEntry entry = getDaemonEntry( "JUNIT" );
TestDaemon testDaemon = (TestDaemon) entry.getDaemon( );
testDaemon.setRunThrows( shouldThrow );
Instant start = Instant.now( );
scheduler.schedule( entry, 500L, TimeUnit.MILLISECONDS );
assertFalse( testDaemon.hasRun( ) );
testDaemon.go( );
assertTrue( 500L <= Duration.between( start, Instant.now( ) ).toMillis( ) );
testDaemon.waitForCompletion( );
assertTrue( testDaemon.hasRun( ) );
}
finally
{
scheduler.shutdown( );
}
}
/**
* 打印binlog 位置日志
*
* @param logPositions
*/
private void debugLogPosition(ConcurrentLinkedQueue<LogPosition> logPositions) {
if (LogUtils.debug.isDebugEnabled()) {
Iterator<LogPosition> liter = logPositions.iterator();
boolean isHead = true;
int count = 0;
while (liter.hasNext()) {
LogPosition lp = liter.next();
if (isHead) {
LogUtils.debug.debug(host + " truncLogPosQueue logPositions head is " + lp);
isHead = false;
}
count++;
}
LogUtils.debug.debug(host + " truncLogPosQueue logPositions queue size " + count);
BlockingQueue<Object> queue = this.throttler;
if (queue != null) {
LogUtils.debug.debug(host + " throttler queue size " + queue.size());
}
}
}
private void drainQueue(BlockingQueue<Pair<AbstractDocument, Future>> queue, int threshold, long sleepTime,
TimeUnit sleepUnit, List<Pair<AbstractDocument, Future>> failedFutures) {
while (queue.remainingCapacity() < threshold) {
if (sleepTime > 0) {
Pair<AbstractDocument, Future> topElement = queue.peek();
if (topElement != null) {
try {
topElement.getSecond().get(sleepTime, sleepUnit);
} catch (Exception te) {
failedFutures.add(topElement);
}
queue.poll();
}
}
}
}
@Test
public void shouldPushMessageToQueue() throws InterruptedException {
BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> queue = new LinkedBlockingQueue<>();
queueSink = new OffsetMapQueueSink(queue, queueConfig);
Records messages = new Records(Collections.singletonList(new Record(offsetInfo, new HashMap<>())));
Status status = queueSink.push(messages);
assertTrue(status.isSuccess());
assertEquals(1, queue.size());
Map<TopicPartition, OffsetAndMetadata> partitionsCommitOffset = queue.take();
assertEquals(1, partitionsCommitOffset.size());
Map.Entry<TopicPartition, OffsetAndMetadata> offset = partitionsCommitOffset.entrySet().iterator().next();
assertEquals(offset.getKey().topic(), "default-topic");
assertEquals(offset.getKey().partition(), 0);
assertEquals(offset.getValue().offset(), 1);
}
@Test(timeout = TIMEOUT)
public void shouldFireOnFailureWhenInternalServerErrorReceived() throws InterruptedException, IOException {
final MockWebServer mockWebServer = new MockWebServer();
final MockResponse response = new MockResponse();
response.setResponseCode(500);
response.setStatus("HTTP/1.1 500 Internal Server Error");
mockWebServer.enqueue(response);
mockWebServer.start();
final URI uri = mockWebServer.url("/").uri();
final Connection connection = new Connection(uri, new Consumer.Options());
final BlockingQueue<String> events = new LinkedBlockingQueue<String>();
connection.setListener(new DefaultConnectionListener() {
@Override
public void onFailure(Exception e) {
events.offer("onFailed");
}
});
connection.open();
assertThat(events.take(), is("onFailed"));
mockWebServer.shutdown();
}
@Test
public void testOffer() {
final int cap = 16;
BlockingQueue<Integer> dbq = new PushPullBlockingQueue<Integer>(cap);
for(int i=0; i<cap; i++) {
dbq.offer(Integer.valueOf(i));
}
Assert.assertFalse(dbq.offer(Integer.valueOf(cap)));
for(int i=0; i<cap; i++) {
Assert.assertEquals(Integer.valueOf(i), dbq.poll());
}
}
public Result remove(IpRule ipRule, BlockingQueue<UpdateMessage> workLoad) {
try {
InetAddress address = InetAddress.getByAddress(ipRule.getIpAddress());
log.debug("address {}", address.getHostAddress());
if (!rules.containsKey(address)) {
return new Result(false, "nothing to remove for address " + address.getHostAddress());
} else {
workLoad.put(UpdateMessage.removeIpRule(address));
rules.remove(address);
}
} catch (UnknownHostException | InterruptedException e) {
log.error("addIpRule couldn't add {}", ipRule, e);
return new Result(false, e.getMessage());
}
return new Result(true, "");
}
@POST
@Path("/lock/{address}")
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "[锁账户] 清除缓存的锁定账户", notes = "Clear the cache unlock account.")
public RpcClientResult lock(@ApiParam(name = "address", value = "账户地址", required = true) @PathParam("address") String address) {
Account account = accountService.getAccount(address).getData();
if (null == account) {
return Result.getFailed(AccountErrorCode.ACCOUNT_NOT_EXIST).toRpcClientResult();
}
accountCacheService.removeAccount(account.getAddress());
BlockingQueue<Runnable> queue = scheduler.getQueue();
String addr = account.getAddress().toString();
Runnable scheduledFuture = (Runnable) accountUnlockSchedulerMap.get(addr);
if (queue.contains(scheduledFuture)) {
scheduler.remove(scheduledFuture);
accountUnlockSchedulerMap.remove(addr);
}
Map<String, Boolean> map = new HashMap<>();
map.put("value", true);
return Result.getSuccess().setData(map).toRpcClientResult();
}
protected ChannelDispatcher createChannelReader(final String protocol, final BlockingQueue<ByteBuffer> bufferPool,
final BlockingQueue<RawSyslogEvent> events, final int maxConnections,
final SSLContextService sslContextService, final Charset charset) throws IOException {
final EventFactory<RawSyslogEvent> eventFactory = new RawSyslogEventFactory();
if (UDP_VALUE.getValue().equals(protocol)) {
return new DatagramChannelDispatcher(eventFactory, bufferPool, events, getLogger());
} else {
// if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
SSLContext sslContext = null;
if (sslContextService != null) {
sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
}
final ChannelHandlerFactory<RawSyslogEvent<SocketChannel>, AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>();
return new SocketChannelDispatcher(eventFactory, handlerFactory, bufferPool, events, getLogger(), maxConnections, sslContext, charset);
}
}
/**
* 获取线程池的阻塞队列
*/
public BlockingQueue<Runnable> getWorkQueue() {
if (this.workQueue != null) {
return this.workQueue;
} else {
if (this.workQueueFrom == null) {
return null;
} else {
try {
Class<?> clz = Class.forName(workQueueFrom);
Object instance = clz.newInstance();
this.workQueue = (BlockingQueue<Runnable>) instance;
return this.workQueue;
} catch (Exception e) {
throw new ConfigurationException("无法读取包路径'" + workQueueFrom + "'来作为'" + BlockingQueue.class + "'实例。", e);
}
}
}
}
/**
* Finish all writes (block until it is done)
*/
public synchronized void flushWrites() {
if (service != null) {
BlockingQueue<Runnable> queue = service.getQueue();
final long milli = 10; // period to sleep between checking for empty queue
while (!service.isTerminated() && queue.peek() != null) {
try {
Thread.sleep(milli);
} catch (InterruptedException e) {
}
}
}
flushDatasets();
}
public void addFile(String fileName){
int hashCode = Math.abs(fileName.hashCode());
int index = hashCode % readFileThreadNum;
BlockingQueue<String> readQueue = threadReadFileMap.get(index);
if(readQueue == null){
readQueue = new LinkedBlockingQueue<>();
threadReadFileMap.put(index, readQueue);
}
readQueue.offer(fileName);
}
@Test
public void testIfPublisherIgnoresStaleEventsAndContinuesWithNextFlowWhenDeliveryQueueIsNotEmpty()
throws InterruptedException {
FanOutRecordsPublisher fanOutRecordsPublisher = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
FanOutRecordsPublisher.RecordFlow recordFlow =
new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "shard-001");
final int[] totalRecordsRetrieved = { 0 };
BlockingQueue<BatchUniqueIdentifier> ackQueue = new LinkedBlockingQueue<>();
fanOutRecordsPublisher.subscribe(new Subscriber<RecordsRetrieved>() {
@Override public void onSubscribe(Subscription subscription) {}
@Override public void onNext(RecordsRetrieved recordsRetrieved) {
totalRecordsRetrieved[0]++;
// Enqueue the ack for bursty delivery
ackQueue.add(recordsRetrieved.batchUniqueIdentifier());
// Send stale event periodically
}
@Override public void onError(Throwable throwable) {}
@Override public void onComplete() {}
});
IntStream.rangeClosed(1, 10).forEach(i -> fanOutRecordsPublisher.bufferCurrentEventAndScheduleIfRequired(
new FanOutRecordsPublisher.FanoutRecordsRetrieved(ProcessRecordsInput.builder().build(), i + "", recordFlow.getSubscribeToShardId()),
recordFlow));
BatchUniqueIdentifier batchUniqueIdentifierQueued;
int count = 0;
// Now that we allowed upto 10 elements queued up, send a pair of good and stale ack to verify records
// delivered as expected.
while(count++ < 10 && (batchUniqueIdentifierQueued = ackQueue.take()) != null) {
final BatchUniqueIdentifier batchUniqueIdentifierFinal = batchUniqueIdentifierQueued;
fanOutRecordsPublisher
.evictAckedEventAndScheduleNextEvent(() -> batchUniqueIdentifierFinal);
fanOutRecordsPublisher.evictAckedEventAndScheduleNextEvent(
() -> new BatchUniqueIdentifier("some_uuid_str", "some_old_flow"));
}
assertEquals(10, totalRecordsRetrieved[0]);
}
@Override
public void init() throws Exception {
final String iterationId = getConfiguration().getIterationId();
if (iterationId == null || iterationId.length() == 0) {
throw new Exception("Missing iteration ID in the task configuration");
}
final String brokerID = StreamIterationHead.createBrokerIdString(getEnvironment().getJobID(), iterationId,
getEnvironment().getTaskInfo().getIndexOfThisSubtask());
final long iterationWaitTime = getConfiguration().getIterationWaitTime();
LOG.info("Iteration tail {} trying to acquire feedback queue under {}", getName(), brokerID);
@SuppressWarnings("unchecked")
BlockingQueue<StreamRecord<IN>> dataChannel =
(BlockingQueue<StreamRecord<IN>>) BlockingQueueBroker.INSTANCE.get(brokerID);
LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID);
RecordPusher<IN> headOperator = new RecordPusher<>();
headOperator.setup(this, getConfiguration(), new IterationTailOutput<>(dataChannel, iterationWaitTime));
this.headOperator = headOperator;
// call super.init() last because that needs this.headOperator to be set up
super.init();
}
private Class<? extends BlockingQueue<String>> getBlockingQueueClassInstance(String className) {
try {
return (Class<? extends BlockingQueue<String>>) Class.forName(className);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
static Executor getExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, boolean allowCoreThreadTimeOut) {
JobExecutor singleExecutor = new JobExecutor(corePoolSize, maximumPoolSize,
keepAliveTime, unit, workQueue, threadFactory);
if (Build.VERSION.SDK_INT >= 9)
singleExecutor.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
return singleExecutor;
}
TasksAcceptor(
UnicastProcessor<Task> tasksToProcess,
ConcurrentMap<String, BlockingQueue<Task>> idToCompletedTasksMap,
ConcurrentMap<String, RSocket> idToRSocketMap) {
this.tasksToProcess = tasksToProcess;
this.idToCompletedTasksMap = idToCompletedTasksMap;
this.idToRSocketMap = idToRSocketMap;
}
@Override
public Message poll() {
// If its null, or we hit the end, reset it.
if (consumerIdIterator == null || !consumerIdIterator.hasNext()) {
consumerIdIterator = messageBuffer.keySet().iterator();
}
// Try every buffer until we hit the end.
Message returnMsg = null;
while (returnMsg == null && consumerIdIterator.hasNext()) {
// Advance iterator
final VirtualSpoutIdentifier nextConsumerId = consumerIdIterator.next();
// Find our buffer
final BlockingQueue<Message> queue = messageBuffer.get(nextConsumerId);
// We missed?
if (queue == null) {
logger.debug("Non-existent queue found, resetting iterator.");
consumerIdIterator = messageBuffer.keySet().iterator();
continue;
}
returnMsg = queue.poll();
}
return returnMsg;
}
public PeriodicQueryPrunerExecutor(final PeriodicQueryResultStorage periodicStorage, final FluoClient client, final int numThreads,
final BlockingQueue<NodeBin> bins) {
Preconditions.checkArgument(numThreads > 0);
this.periodicStorage = periodicStorage;
this.numThreads = numThreads;
executor = Executors.newFixedThreadPool(numThreads);
this.bins = bins;
this.client = client;
this.pruners = new ArrayList<>();
}
private void schedulerExecution(Map<Integer, INodeInstance> nodes) {
BlockingQueue<INodeInstance> tasks;
tasks = new ArrayBlockingQueue<>(nodes.size() * 2);
tasks.addAll(nodes.values());
for (INodeInstance node : tasks) {
node.prepare(config);
}
doneSignal = new CountDownLatch(numThreads);
for (int i = 0; i < numThreads; i++) {
threads.execute(new StreamWorker(tasks));
}
}
@Test
public void notify_logDeleted_doesNotExist() throws Exception {
// The signal that will kill the working thread.
final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
// The queue used to feed work.
final BlockingQueue<LogEvent> logEventQueue = new ArrayBlockingQueue<>(10);
// The queue work is written to.
final BlockingQueue<QueryEvent> queryEventQueue = new ArrayBlockingQueue<>(10);
// Start the worker that will be tested.
final Thread logEventWorker = new Thread(new LogEventWorker(logEventQueue,
queryEventQueue, 50, TimeUnit.MILLISECONDS, shutdownSignal));
logEventWorker.start();
try {
// Write a unit of work that indicates a log was deleted. Since it was never created,
// this will not cause anything to be written to the QueryEvent queue.
logEventQueue.offer(LogEvent.delete("rya"));
// Show that a single unit of work was created for deleting everything for "rya".
assertNull(queryEventQueue.poll(500, TimeUnit.MILLISECONDS));
} finally {
shutdownSignal.set(true);
logEventWorker.join();
}
}
@Test
public void testAsSynchronousQueue() {
final int cap = 1;
BlockingQueue<Integer> dbq = new MPMCBlockingQueue<>(cap);
while(dbq.offer(Integer.valueOf(0)));
Assert.assertFalse(dbq.offer(Integer.valueOf(10)));
Assert.assertEquals(2, dbq.size());
Assert.assertEquals(Integer.valueOf(0), dbq.poll());
}
@Override
public void writeExternal(final ObjectOutput out) throws IOException {
// write out the version of the serialized data for future use
out.writeByte(2);
final boolean hasExec = handler.executor != null && handler.executor != JNDIContext.globalExecutor();
out.writeBoolean(hasExec);
if (hasExec) {
out.writeInt(handler.executor.getMaximumPoolSize());
final BlockingQueue<Runnable> queue = handler.executor.getQueue();
out.writeInt(queue.size() + queue.remainingCapacity());
}
handler.client.setMetaData(metaData);
handler.client.writeExternal(out);
final EJBMetaDataImpl ejb = handler.ejb;
out.writeObject(ejb.homeClass);
out.writeObject(ejb.remoteClass);
out.writeObject(ejb.keyClass);
out.writeByte(ejb.type);
out.writeUTF(ejb.deploymentID);
out.writeShort(ejb.deploymentCode);
handler.server.setMetaData(metaData);
handler.server.writeExternal(out);
/// out.writeObject( handler.primaryKey );
}
ReportingThreadExecutionPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RuntimeException("Rejected Task - " + r);
}
});
}