下面列出了java.util.concurrent.BlockingQueue#size ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
protected void handleRequests(long windowId)
{
/*
* we prefer to cater to requests at the end of the window boundary.
*/
try {
BlockingQueue<OperatorRequest> requests = context.getRequests();
int size;
StatsListener.OperatorResponse response;
if ((size = requests.size()) > 0) {
while (size-- > 0) {
//logger.debug("endwindow: " + t.getWindowId() + " lastprocessed: " + context.getLastProcessedWindowId());
response = requests.remove().execute(operator, context.getId(), windowId);
if (response != null) {
commandResponse.add(response);
}
}
}
} catch (Error er) {
throw er;
} catch (RuntimeException re) {
throw re;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
if (threadName != null) {
LOG.error("txTransaction Thread pool [{}] is exhausted, executor={}", threadName, executor.toString());
}
if (runnable instanceof RejectedRunnable) {
((RejectedRunnable) runnable).rejected();
} else {
if (!executor.isShutdown()) {
BlockingQueue<Runnable> queue = executor.getQueue();
int discardSize = queue.size() >> 1;
for (int i = 0; i < discardSize; i++) {
queue.poll();
}
try {
queue.put(runnable);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* Closes this appender instance. Before exiting, the implementation tries to
* flush out buffered log events within configured shutdownTimeout seconds. If
* that doesn't finish within configured shutdownTimeout, it would drop all
* the buffered log events.
*/
@Override
public void stop() {
threadPoolExecutor.shutdown();
BlockingQueue<Runnable> taskQueue = threadPoolExecutor.getQueue();
int bufferSizeBeforeShutdown = threadPoolExecutor.getQueue().size();
boolean gracefulShutdown = true;
try {
gracefulShutdown = threadPoolExecutor.awaitTermination(shutdownTimeout, TimeUnit.SECONDS);
}
catch(InterruptedException e) {
// we are anyways cleaning up
}
finally {
int bufferSizeAfterShutdown = taskQueue.size();
if(!gracefulShutdown || bufferSizeAfterShutdown > 0) {
String errorMsg = "Kinesis Log4J Appender (" + name + ") waited for " + shutdownTimeout
+ " seconds before terminating but could send only "
+ (bufferSizeAfterShutdown - bufferSizeBeforeShutdown) + " logevents, it failed to send "
+ bufferSizeAfterShutdown + " pending log events from it's processing queue";
addError(errorMsg);
}
}
client.close();
}
public static boolean tryWaitZkEventsCleaned(RealmAwareZkClient zkclient) throws Exception {
java.lang.reflect.Field field = getField(zkclient.getClass(), "_eventThread");
field.setAccessible(true);
Object eventThread = field.get(zkclient);
// System.out.println("field: " + eventThread);
java.lang.reflect.Field field2 = getField(eventThread.getClass(), "_events");
field2.setAccessible(true);
BlockingQueue queue = (BlockingQueue) field2.get(eventThread);
// System.out.println("field2: " + queue + ", " + queue.size());
if (queue == null) {
LOG.error("fail to get event-queue from zkclient. skip waiting");
return false;
}
for (int i = 0; i < 20; i++) {
if (queue.size() == 0) {
return true;
}
Thread.sleep(100);
System.out.println("pending zk-events in queue: " + queue);
}
return false;
}
@Test
public void testProcessorTask() {
int numMessages = 100;
PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor("");
StreamsProcessorTask task = new StreamsProcessorTask(processor);
StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1);
task.setStreamsTaskCounter(counter);
BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
task.addOutputQueue(outQueue);
task.addInputQueue(inQueue);
assertEquals(numMessages, task.getInputQueues().get(0).size());
ExecutorService service = Executors.newFixedThreadPool(1);
service.submit(task);
int attempts = 0;
while(inQueue.size() != 0 && outQueue.size() != numMessages) {
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
++attempts;
if(attempts == 10) {
fail("Processor task failed to output "+numMessages+" in a timely fashion.");
}
}
task.stopTask();;
service.shutdown();
try {
if(!service.awaitTermination(5, TimeUnit.SECONDS)){
service.shutdownNow();
fail("Service did not terminate.");
}
assertTrue("Task should have completed running in allotted time.", service.isTerminated());
} catch (InterruptedException e) {
fail("Test Interrupted.");
}
assertEquals(numMessages, processor.getMessageCount());
assertEquals(numMessages, counter.getNumReceived());
assertEquals(numMessages, counter.getNumEmitted());
assertEquals(0, counter.getNumUnhandledErrors());
assertEquals(0.0, counter.getErrorRate(), 0.0);
}
private static RejectedExecutionHandler buildRejectedExecutionHandler(final int size) {
return (r, executor) -> {
BlockingQueue<Runnable> queue = executor.getQueue();
while (queue.size() >= size) {
if (executor.isShutdown()) {
throw new RejectedExecutionException("metrics thread pool executor closed");
}
((MetricsThreadPoolExecutor) executor).onRejected();
}
if (!executor.isShutdown()) {
executor.execute(r);
}
};
}
private boolean isAllQueuesEmpty() {
for (BlockingQueue<FetchedDataChunk> queue : queues) {
if (!queue.isEmpty()) {
if (queue.size() == 1 && queue.peek() != SHUTDOWN_COMMAND) {
return false;
} else if (queue.size() > 1) {
return false;
}
}
}
return true;
}
@Override
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
int queueIndex = balancer.getNextQueue();
BlockingQueue<CallRunner> queue = queues.get(queueIndex);
// that means we can overflow by at most <num reader> size (5), that's ok
if (queue.size() >= currentQueueLimit) {
return false;
}
return queue.offer(callTask);
}
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
if (threadName != null) {
LOG.error("Thread pool [{}] is exhausted, executor={}", threadName, executor.toString());
}
if (runnable instanceof RejectedRunnable) {
((RejectedRunnable) runnable).rejected(); // 交给用户来处理
} else {
if (!executor.isShutdown()) {
BlockingQueue<Runnable> queue = executor.getQueue();
// 舍弃1/2队列元素,例如7个单位的元素,舍弃3个
int discardSize = queue.size() >> 1;
for (int i = 0; i < discardSize; i++) {
// 从头部移除并返问队列头部的元素
queue.poll();
}
try {
// 添加一个元素, 如果队列满,则阻塞
queue.put(runnable);
} catch (InterruptedException e) {
// should not be interrupted
}
}
}
}
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
if (threadName != null) {
LOG.error("txTransaction Thread pool [{}] is exhausted, executor={}", threadName, executor.toString());
}
if (!executor.isShutdown()) {
BlockingQueue<Runnable> queue = executor.getQueue();
int discardSize = queue.size() >> 1;
for (int i = 0; i < discardSize; i++) {
queue.poll();
}
queue.offer(runnable);
}
}
@Test
public void testMergeTask() {
int numMessages = 100;
int incoming = 5;
StreamsMergeTask task = new StreamsMergeTask();
BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
task.addOutputQueue(outQueue);
for(int i=0; i < incoming; ++i) {
task.addInputQueue(createInputQueue(numMessages));
}
ExecutorService service = Executors.newFixedThreadPool(1);
service.submit(task);
int attempts = 0;
while(outQueue.size() != incoming * numMessages ) {
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
++attempts;
if(attempts == 10) {
assertEquals("Processor task failed to output " + (numMessages * incoming) + " in a timely fashion.", (numMessages * incoming), outQueue.size());
}
}
task.stopTask();
service.shutdown();
try {
if(!service.awaitTermination(5, TimeUnit.SECONDS)){
service.shutdownNow();
fail("Service did not terminate.");
}
assertTrue("Task should have completed running in allotted time.", service.isTerminated());
} catch (InterruptedException e) {
fail("Test Interrupted.");
}
}
/**
* Returns the first non-empty queue with equal or lesser priority
* than <i>startIdx</i>. Wraps around, searching a maximum of N
* queues, where N is this.queues.size().
*
* @param startIdx the queue number to start searching at
* @return the first non-empty queue with less priority, or null if
* everything was empty
*/
private BlockingQueue<E> getFirstNonEmptyQueue(int startIdx) {
final int numQueues = this.queues.size();
for(int i=0; i < numQueues; i++) {
int idx = (i + startIdx) % numQueues; // offset and wrap around
BlockingQueue<E> queue = this.queues.get(idx);
if (queue.size() != 0) {
return queue;
}
}
// All queues were empty
return null;
}
@VisibleForTesting
int getNumTasksScheduled() {
BlockingQueue<?> queue = timerService.getQueue();
if (queue == null) {
return 0;
} else {
return queue.size();
}
}
private BlockingQueue<SrsFlvFrame> resizeFlvTagCacheInternal(BlockingQueue<SrsFlvFrame> cache, int newSize) {
if(newSize < cache.size() - cache.remainingCapacity()) {
throw new RuntimeException("Can't fit current cache inside new cache size");
}
BlockingQueue<SrsFlvFrame> newQueue = new LinkedBlockingQueue<>(newSize);
cache.drainTo(newQueue);
return newQueue;
}
@Test
public void testProviderTask() {
int numMessages = 100;
NumericMessageProvider provider = new NumericMessageProvider(numMessages);
StreamsProviderTask task = new StreamsProviderTask(provider, false, null);
BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
task.addOutputQueue(outQueue);
//Test that adding input queues to providers is not valid
BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
Exception exp = null;
try {
task.addInputQueue(inQueue);
} catch (UnsupportedOperationException uoe) {
exp = uoe;
}
assertNotNull(exp);
ExecutorService service = Executors.newFixedThreadPool(1);
service.submit(task);
int attempts = 0;
while(outQueue.size() != numMessages) {
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
if(attempts == 10) {
fail("Provider task failed to output "+numMessages+" in a timely fashion.");
}
}
service.shutdown();
try {
if(!service.awaitTermination(10, TimeUnit.SECONDS)){
service.shutdownNow();
fail("Service did not terminate.");
}
assertTrue("Task should have completed running in allotted time.", service.isTerminated());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static void updateStat(Boolean isPrimary,
long time,
AtomicLong maxTime,
AtomicLong primaryCount,
AtomicLong failoverCount,
BlockingQueue<Long> rollingAverage) {
long max = maxTime.get();
while (time > max) {
if (!maxTime.compareAndSet(max, time)) {
max = maxTime.get();
} else {
break;
}
}
if (isPrimary != null) {
if (isPrimary == true) {
primaryCount.addAndGet(1);
} else {
failoverCount.addAndGet(1);
}
}
rollingAverage.add(time);
if (rollingAverage.size() > 100) {
rollingAverage.poll();
}
}
private void closeInactiveSessions() {
if (!this.running.get()) {
return;
}
final BlockingQueue<Runnable> queue = this.getQueue();
if (queue == null) {
return;
}
int backlog = queue.size();
if (backlog <= 0) {
return;
}
final long now = System.currentTimeMillis();
final List<Session> current = new ArrayList<Session>();
current.addAll(this.sessions.values());
for (final Session session : current) {
final Lock l = session.lock;
if (l.tryLock()) {
try {
if (now - session.lastRequest.get() > this.timeout) {
backlog--;
try {
session.close();
} catch (Throwable e) {
//Ignore
} finally {
this.removeSession(session);
}
}
} finally {
l.unlock();
}
}
if (backlog <= 0) {
return;
}
}
}
/**
* {@inheritDoc}
*/
@Override
public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
long stopTime = clock.currentTimeMillis() + timeout;
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messagesToReturn = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
metrics.incPoll();
for (SystemStreamPartition systemStreamPartition : systemStreamPartitions) {
BlockingQueue<IncomingMessageEnvelope> queue = bufferedMessages.get(systemStreamPartition);
List<IncomingMessageEnvelope> outgoingList = new ArrayList<IncomingMessageEnvelope>(queue.size());
if (queue.size() > 0) {
queue.drainTo(outgoingList);
} else if (timeout != 0) {
IncomingMessageEnvelope envelope = null;
// How long we can legally block (if timeout > 0)
long timeRemaining = stopTime - clock.currentTimeMillis();
if (timeout == SystemConsumer.BLOCK_ON_OUTSTANDING_MESSAGES) {
// Block until we get at least one message, or until we catch up to
// the head of the stream.
while (envelope == null && !isAtHead(systemStreamPartition)) {
// Check for consumerFailure and throw exception
if (this.failureCause != null) {
String message = String.format("%s: Consumer has stopped.", this);
throw new SamzaException(message, this.failureCause);
}
metrics.incBlockingPoll(systemStreamPartition);
envelope = queue.poll(1000, TimeUnit.MILLISECONDS);
}
} else if (timeout > 0 && timeRemaining > 0) {
// Block until we get at least one message.
metrics.incBlockingTimeoutPoll(systemStreamPartition);
envelope = queue.poll(timeRemaining, TimeUnit.MILLISECONDS);
}
// If we got a message, add it.
if (envelope != null) {
outgoingList.add(envelope);
// Drain any remaining messages without blocking.
queue.drainTo(outgoingList);
}
}
if (outgoingList.size() > 0) {
messagesToReturn.put(systemStreamPartition, outgoingList);
subtractSizeOnQDrain(systemStreamPartition, outgoingList);
}
}
return messagesToReturn;
}
/**
* Closes the pool and all disconnects all idle connections
* Active connections will be closed upon the {@link java.sql.Connection#close close} method is called
* on the underlying connection instead of being returned to the pool
* @param force - true to even close the active connections
*/
protected void close(boolean force) {
//are we already closed
if (this.closed) return;
//prevent other threads from entering
this.closed = true;
//stop background thread
if (poolCleaner!=null) {
poolCleaner.stopRunning();
}
/* release all idle connections */
BlockingQueue<PooledConnection> pool = (idle.size()>0)?idle:(force?busy:idle);
while (pool.size()>0) {
try {
//retrieve the next connection
PooledConnection con = pool.poll(1000, TimeUnit.MILLISECONDS);
//close it and retrieve the next one, if one is available
while (con != null) {
//close the connection
if (pool==idle)
release(con);
else
abandon(con);
if (pool.size()>0) {
con = pool.poll(1000, TimeUnit.MILLISECONDS);
} else {
break;
}
} //while
} catch (InterruptedException ex) {
if (getPoolProperties().getPropagateInterruptState()) {
Thread.currentThread().interrupt();
}
}
if (pool.size()==0 && force && pool!=busy) pool = busy;
}
if (this.getPoolProperties().isJmxEnabled()) this.jmxPool = null;
PoolProperties.InterceptorDefinition[] proxies = getPoolProperties().getJdbcInterceptorsAsArray();
for (int i=0; i<proxies.length; i++) {
try {
JdbcInterceptor interceptor = proxies[i].getInterceptorClass().newInstance();
interceptor.setProperties(proxies[i].getProperties());
interceptor.poolClosed(this);
}catch (Exception x) {
log.debug("Unable to inform interceptor of pool closure.",x);
}
}
}
/**
* Returns maximum number of tasks that can be submitted to given
* pool (with bounded queue) before saturation (when submission
* throws RejectedExecutionException).
*/
static final int saturatedSize(ThreadPoolExecutor pool) {
BlockingQueue<Runnable> q = pool.getQueue();
return pool.getMaximumPoolSize() + q.size() + q.remainingCapacity();
}