java.util.concurrent.BlockingQueue#size ( )源码实例Demo

下面列出了java.util.concurrent.BlockingQueue#size ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: Bats   文件: Node.java
protected void handleRequests(long windowId)
{
  /*
   * we prefer to cater to requests at the end of the window boundary.
   */
  try {
    BlockingQueue<OperatorRequest> requests = context.getRequests();
    int size;
    StatsListener.OperatorResponse response;
    if ((size = requests.size()) > 0) {
      while (size-- > 0) {
        //logger.debug("endwindow: " + t.getWindowId() + " lastprocessed: " + context.getLastProcessedWindowId());
        response = requests.remove().execute(operator, context.getId(), windowId);
        if (response != null) {
          commandResponse.add(response);
        }
      }
    }
  } catch (Error er) {
    throw er;
  } catch (RuntimeException re) {
    throw re;
  } catch (IOException e) {
    throw new RuntimeException(e);
  }
}
 
源代码2 项目: Lottor   文件: RejectedPolicy.java
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
    if (threadName != null) {
        LOG.error("txTransaction Thread pool [{}] is exhausted, executor={}", threadName, executor.toString());
    }

    if (runnable instanceof RejectedRunnable) {
        ((RejectedRunnable) runnable).rejected();
    } else {
        if (!executor.isShutdown()) {
            BlockingQueue<Runnable> queue = executor.getQueue();
            int discardSize = queue.size() >> 1;
            for (int i = 0; i < discardSize; i++) {
                queue.poll();
            }

            try {
                queue.put(runnable);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
 
/**
 * Closes this appender instance. Before exiting, the implementation tries to
 * flush out buffered log events within configured shutdownTimeout seconds. If
 * that doesn't finish within configured shutdownTimeout, it would drop all
 * the buffered log events.
 */
@Override
public void stop() {
  threadPoolExecutor.shutdown();
  BlockingQueue<Runnable> taskQueue = threadPoolExecutor.getQueue();
  int bufferSizeBeforeShutdown = threadPoolExecutor.getQueue().size();
  boolean gracefulShutdown = true;
  try {
    gracefulShutdown = threadPoolExecutor.awaitTermination(shutdownTimeout, TimeUnit.SECONDS);
  }
  catch(InterruptedException e) {
    // we are anyways cleaning up
  }
  finally {
    int bufferSizeAfterShutdown = taskQueue.size();
    if(!gracefulShutdown || bufferSizeAfterShutdown > 0) {
      String errorMsg = "Kinesis Log4J Appender (" + name + ") waited for " + shutdownTimeout
                        + " seconds before terminating but could send only "
                        + (bufferSizeAfterShutdown - bufferSizeBeforeShutdown) + " logevents, it failed to send "
                        + bufferSizeAfterShutdown + " pending log events from it's processing queue";
      addError(errorMsg);
    }
  }
  client.close();
}
 
源代码4 项目: helix   文件: ZkTestHelper.java
public static boolean tryWaitZkEventsCleaned(RealmAwareZkClient zkclient) throws Exception {
  java.lang.reflect.Field field = getField(zkclient.getClass(), "_eventThread");
  field.setAccessible(true);
  Object eventThread = field.get(zkclient);
  // System.out.println("field: " + eventThread);

  java.lang.reflect.Field field2 = getField(eventThread.getClass(), "_events");
  field2.setAccessible(true);
  BlockingQueue queue = (BlockingQueue) field2.get(eventThread);
  // System.out.println("field2: " + queue + ", " + queue.size());

  if (queue == null) {
    LOG.error("fail to get event-queue from zkclient. skip waiting");
    return false;
  }

  for (int i = 0; i < 20; i++) {
    if (queue.size() == 0) {
      return true;
    }
    Thread.sleep(100);
    System.out.println("pending zk-events in queue: " + queue);
  }
  return false;
}
 
源代码5 项目: streams   文件: BasicTasksTest.java
@Test
public void testProcessorTask() {
  int numMessages = 100;
  PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor("");
  StreamsProcessorTask task = new StreamsProcessorTask(processor);
  StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1);
  task.setStreamsTaskCounter(counter);
  BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
  BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
  task.addOutputQueue(outQueue);
  task.addInputQueue(inQueue);
  assertEquals(numMessages, task.getInputQueues().get(0).size());
  ExecutorService service = Executors.newFixedThreadPool(1);
  service.submit(task);
  int attempts = 0;
  while(inQueue.size() != 0 && outQueue.size() != numMessages) {
    Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
    ++attempts;
    if(attempts == 10) {
      fail("Processor task failed to output "+numMessages+" in a timely fashion.");
    }
  }
  task.stopTask();;
  service.shutdown();
  try {
    if(!service.awaitTermination(5, TimeUnit.SECONDS)){
      service.shutdownNow();
      fail("Service did not terminate.");
    }
    assertTrue("Task should have completed running in allotted time.", service.isTerminated());
  } catch (InterruptedException e) {
    fail("Test Interrupted.");
  }
  assertEquals(numMessages, processor.getMessageCount());
  assertEquals(numMessages, counter.getNumReceived());
  assertEquals(numMessages, counter.getNumEmitted());
  assertEquals(0, counter.getNumUnhandledErrors());
  assertEquals(0.0, counter.getErrorRate(), 0.0);
}
 
private static RejectedExecutionHandler buildRejectedExecutionHandler(final int size) {
    return (r, executor) -> {
        BlockingQueue<Runnable> queue = executor.getQueue();
        while (queue.size() >= size) {
            if (executor.isShutdown()) {
                throw new RejectedExecutionException("metrics thread pool executor closed");
            }
            ((MetricsThreadPoolExecutor) executor).onRejected();
        }
        if (!executor.isShutdown()) {
            executor.execute(r);
        }
    };
}
 
源代码7 项目: luxun   文件: StreamFactory.java
private boolean isAllQueuesEmpty() {
       for (BlockingQueue<FetchedDataChunk> queue : queues) {
       	if (!queue.isEmpty()) {
       		if (queue.size() == 1 && queue.peek() != SHUTDOWN_COMMAND) {
       			return false;
       		} else if (queue.size() > 1) {
       			return false;
       		}
       	}
       }
       return true;
}
 
源代码8 项目: hbase   文件: BalancedQueueRpcExecutor.java
@Override
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
  int queueIndex = balancer.getNextQueue();
  BlockingQueue<CallRunner> queue = queues.get(queueIndex);
  // that means we can overflow by at most <num reader> size (5), that's ok
  if (queue.size() >= currentQueueLimit) {
    return false;
  }
  return queue.offer(callTask);
}
 
源代码9 项目: BootNettyRpc   文件: RejectedPolicyWithReport.java
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
    if (threadName != null) {
        LOG.error("Thread pool [{}] is exhausted, executor={}", threadName, executor.toString());
    }

    if (runnable instanceof RejectedRunnable) {
        ((RejectedRunnable) runnable).rejected(); // 交给用户来处理
    } else {
        if (!executor.isShutdown()) {
            BlockingQueue<Runnable> queue = executor.getQueue();
            // 舍弃1/2队列元素,例如7个单位的元素,舍弃3个
            int discardSize = queue.size() >> 1;
            for (int i = 0; i < discardSize; i++) {
                // 从头部移除并返问队列头部的元素
                queue.poll();
            }

            try {
                // 添加一个元素, 如果队列满,则阻塞
                queue.put(runnable);
            } catch (InterruptedException e) {
                // should not be interrupted
            }
        }
    }
}
 
源代码10 项目: Lottor   文件: DiscardedPolicy.java
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
    if (threadName != null) {
        LOG.error("txTransaction Thread pool [{}] is exhausted, executor={}", threadName, executor.toString());
    }

    if (!executor.isShutdown()) {
        BlockingQueue<Runnable> queue = executor.getQueue();
        int discardSize = queue.size() >> 1;
        for (int i = 0; i < discardSize; i++) {
            queue.poll();
        }
        queue.offer(runnable);
    }
}
 
源代码11 项目: streams   文件: BasicTasksTest.java
@Test
public void testMergeTask() {
  int numMessages = 100;
  int incoming = 5;
  StreamsMergeTask task = new StreamsMergeTask();
  BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
  task.addOutputQueue(outQueue);
  for(int i=0; i < incoming; ++i) {
    task.addInputQueue(createInputQueue(numMessages));
  }
  ExecutorService service = Executors.newFixedThreadPool(1);
  service.submit(task);
  int attempts = 0;
  while(outQueue.size() != incoming * numMessages ) {
    Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
    ++attempts;
    if(attempts == 10) {
      assertEquals("Processor task failed to output " + (numMessages * incoming) + " in a timely fashion.", (numMessages * incoming), outQueue.size());
    }
  }
  task.stopTask();
  service.shutdown();
  try {
    if(!service.awaitTermination(5, TimeUnit.SECONDS)){
      service.shutdownNow();
      fail("Service did not terminate.");
    }
    assertTrue("Task should have completed running in allotted time.", service.isTerminated());
  } catch (InterruptedException e) {
    fail("Test Interrupted.");
  }
}
 
源代码12 项目: big-c   文件: FairCallQueue.java
/**
 * Returns the first non-empty queue with equal or lesser priority
 * than <i>startIdx</i>. Wraps around, searching a maximum of N
 * queues, where N is this.queues.size().
 *
 * @param startIdx the queue number to start searching at
 * @return the first non-empty queue with less priority, or null if
 * everything was empty
 */
private BlockingQueue<E> getFirstNonEmptyQueue(int startIdx) {
  final int numQueues = this.queues.size();
  for(int i=0; i < numQueues; i++) {
    int idx = (i + startIdx) % numQueues; // offset and wrap around
    BlockingQueue<E> queue = this.queues.get(idx);
    if (queue.size() != 0) {
      return queue;
    }
  }

  // All queues were empty
  return null;
}
 
源代码13 项目: flink   文件: SystemProcessingTimeService.java
@VisibleForTesting
int getNumTasksScheduled() {
	BlockingQueue<?> queue = timerService.getQueue();
	if (queue == null) {
		return 0;
	} else {
		return queue.size();
	}
}
 
源代码14 项目: rtmp-rtsp-stream-client-java   文件: SrsFlvMuxer.java
private BlockingQueue<SrsFlvFrame> resizeFlvTagCacheInternal(BlockingQueue<SrsFlvFrame> cache, int newSize) {
  if(newSize < cache.size() - cache.remainingCapacity()) {
    throw new RuntimeException("Can't fit current cache inside new cache size");
  }

  BlockingQueue<SrsFlvFrame> newQueue = new LinkedBlockingQueue<>(newSize);
  cache.drainTo(newQueue);
  return newQueue;
}
 
源代码15 项目: streams   文件: BasicTasksTest.java
@Test
public void testProviderTask() {
  int numMessages = 100;
  NumericMessageProvider provider = new NumericMessageProvider(numMessages);
  StreamsProviderTask task = new StreamsProviderTask(provider, false, null);
  BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
  task.addOutputQueue(outQueue);
  //Test that adding input queues to providers is not valid
  BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
  Exception exp = null;
  try {
    task.addInputQueue(inQueue);
  } catch (UnsupportedOperationException uoe) {
    exp = uoe;
  }
  assertNotNull(exp);

  ExecutorService service = Executors.newFixedThreadPool(1);
  service.submit(task);
  int attempts = 0;
  while(outQueue.size() != numMessages) {
    Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
    if(attempts == 10) {
      fail("Provider task failed to output "+numMessages+" in a timely fashion.");
    }
  }
  service.shutdown();
  try {
    if(!service.awaitTermination(10, TimeUnit.SECONDS)){
      service.shutdownNow();
      fail("Service did not terminate.");
    }
    assertTrue("Task should have completed running in allotted time.", service.isTerminated());
  } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
  }
}
 
源代码16 项目: HBase.MCC   文件: HTableStats.java
private static void updateStat(Boolean isPrimary, 
    long time, 
    AtomicLong maxTime, 
    AtomicLong primaryCount, 
    AtomicLong failoverCount, 
    BlockingQueue<Long> rollingAverage) {
  
  long max = maxTime.get();
  while (time > max) {
    if (!maxTime.compareAndSet(max, time)) {
      max = maxTime.get();
    } else {
      break;
    }
  }
  
  if (isPrimary != null) {
    if (isPrimary == true) {
      primaryCount.addAndGet(1);
    } else {
      failoverCount.addAndGet(1);
    }
  }
  
  rollingAverage.add(time);
  if (rollingAverage.size() > 100) {
    rollingAverage.poll();
  }
}
 
源代码17 项目: tomee   文件: KeepAliveServer.java
private void closeInactiveSessions() {

        if (!this.running.get()) {
            return;
        }

        final BlockingQueue<Runnable> queue = this.getQueue();
        if (queue == null) {
            return;
        }

        int backlog = queue.size();
        if (backlog <= 0) {
            return;
        }

        final long now = System.currentTimeMillis();

        final List<Session> current = new ArrayList<Session>();
        current.addAll(this.sessions.values());

        for (final Session session : current) {

            final Lock l = session.lock;

            if (l.tryLock()) {
                try {
                    if (now - session.lastRequest.get() > this.timeout) {

                        backlog--;

                        try {
                            session.close();
                        } catch (Throwable e) {
                            //Ignore
                        } finally {
                            this.removeSession(session);
                        }
                    }
                } finally {
                    l.unlock();
                }
            }

            if (backlog <= 0) {
                return;
            }
        }
    }
 
源代码18 项目: samza   文件: BlockingEnvelopeMap.java
/**
 * {@inheritDoc}
 */
@Override
public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
  long stopTime = clock.currentTimeMillis() + timeout;
  Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messagesToReturn = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();

  metrics.incPoll();

  for (SystemStreamPartition systemStreamPartition : systemStreamPartitions) {
    BlockingQueue<IncomingMessageEnvelope> queue = bufferedMessages.get(systemStreamPartition);
    List<IncomingMessageEnvelope> outgoingList = new ArrayList<IncomingMessageEnvelope>(queue.size());

    if (queue.size() > 0) {
      queue.drainTo(outgoingList);
    } else if (timeout != 0) {
      IncomingMessageEnvelope envelope = null;

      // How long we can legally block (if timeout > 0)
      long timeRemaining = stopTime - clock.currentTimeMillis();

      if (timeout == SystemConsumer.BLOCK_ON_OUTSTANDING_MESSAGES) {
        // Block until we get at least one message, or until we catch up to
        // the head of the stream.
        while (envelope == null && !isAtHead(systemStreamPartition)) {

          // Check for consumerFailure and throw exception
          if (this.failureCause != null) {
            String message = String.format("%s: Consumer has stopped.", this);
            throw new SamzaException(message, this.failureCause);
          }

          metrics.incBlockingPoll(systemStreamPartition);
          envelope = queue.poll(1000, TimeUnit.MILLISECONDS);
        }
      } else if (timeout > 0 && timeRemaining > 0) {
        // Block until we get at least one message.
        metrics.incBlockingTimeoutPoll(systemStreamPartition);
        envelope = queue.poll(timeRemaining, TimeUnit.MILLISECONDS);
      }

      // If we got a message, add it.
      if (envelope != null) {
        outgoingList.add(envelope);
        // Drain any remaining messages without blocking.
        queue.drainTo(outgoingList);
      }
    }

    if (outgoingList.size() > 0) {
      messagesToReturn.put(systemStreamPartition, outgoingList);
      subtractSizeOnQDrain(systemStreamPartition, outgoingList);
    }
  }

  return messagesToReturn;
}
 
源代码19 项目: Tomcat7.0.67   文件: ConnectionPool.java
/**
 * Closes the pool and all disconnects all idle connections
 * Active connections will be closed upon the {@link java.sql.Connection#close close} method is called
 * on the underlying connection instead of being returned to the pool
 * @param force - true to even close the active connections
 */
protected void close(boolean force) {
    //are we already closed
    if (this.closed) return;
    //prevent other threads from entering
    this.closed = true;
    //stop background thread
    if (poolCleaner!=null) {
        poolCleaner.stopRunning();
    }

    /* release all idle connections */
    BlockingQueue<PooledConnection> pool = (idle.size()>0)?idle:(force?busy:idle);
    while (pool.size()>0) {
        try {
            //retrieve the next connection
            PooledConnection con = pool.poll(1000, TimeUnit.MILLISECONDS);
            //close it and retrieve the next one, if one is available
            while (con != null) {
                //close the connection
                if (pool==idle)
                    release(con);
                else
                    abandon(con);
                if (pool.size()>0) {
                    con = pool.poll(1000, TimeUnit.MILLISECONDS);
                } else {
                    break;
                }
            } //while
        } catch (InterruptedException ex) {
            if (getPoolProperties().getPropagateInterruptState()) {
                Thread.currentThread().interrupt();
            }
        }
        if (pool.size()==0 && force && pool!=busy) pool = busy;
    }
    if (this.getPoolProperties().isJmxEnabled()) this.jmxPool = null;
    PoolProperties.InterceptorDefinition[] proxies = getPoolProperties().getJdbcInterceptorsAsArray();
    for (int i=0; i<proxies.length; i++) {
        try {
            JdbcInterceptor interceptor = proxies[i].getInterceptorClass().newInstance();
            interceptor.setProperties(proxies[i].getProperties());
            interceptor.poolClosed(this);
        }catch (Exception x) {
            log.debug("Unable to inform interceptor of pool closure.",x);
        }
    }
}
 
源代码20 项目: streamsupport   文件: JSR166TestCase.java
/**
 * Returns maximum number of tasks that can be submitted to given
 * pool (with bounded queue) before saturation (when submission
 * throws RejectedExecutionException).
 */
static final int saturatedSize(ThreadPoolExecutor pool) {
    BlockingQueue<Runnable> q = pool.getQueue();
    return pool.getMaximumPoolSize() + q.size() + q.remainingCapacity();
}