java.util.concurrent.LinkedBlockingQueue#size ( )源码实例Demo

下面列出了java.util.concurrent.LinkedBlockingQueue#size ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: ghidra   文件: ThreadedXmlParserTest.java
@SuppressWarnings("unchecked")
@Test
public void testGoodXmlEarlyExit() throws Exception {

	ThreadedXmlPullParserImpl parser =
		new ThreadedXmlPullParserImpl(new ByteArrayInputStream(GOOD_XML.getBytes()),
			testName.getMethodName(), new TestErrorHandler(), false, 3);

	parser.start("doc");
	XmlElement projectXml = parser.start("project");
	assertNotNull(projectXml);
	assertEquals("foo", projectXml.getAttribute("name"));
	parser.end(projectXml);

	LinkedBlockingQueue<XmlElement> queue =
		(LinkedBlockingQueue<XmlElement>) getInstanceField("queue", parser);

	// wait until queue is filled
	while (queue.size() < 3) {
		Thread.yield();
	}

	assertTrue("parser should be running", parser.isParsing());
	parser.dispose();
	int count = 0;
	while (parser.isParsing()) {
		if (count++ > 20) {
			Assert.fail("parser should have shutdown");
		}
		Thread.sleep(1);
	}
	assertTrue("parser should be shutdown", !parser.isParsing());
}
 
源代码2 项目: distributedlog   文件: DistributedLogTool.java
private int bkRecovery(final BookKeeperAdmin bkAdmin, final LinkedBlockingQueue<Long> ledgers,
                       final Set<BookieSocketAddress> bookieAddrs,
                       final boolean dryrun, final boolean skipOpenLedgers)
        throws InterruptedException, BKException {
    final AtomicInteger numPendings = new AtomicInteger(ledgers.size());
    final ExecutorService executorService = Executors.newCachedThreadPool();
    final CountDownLatch doneLatch = new CountDownLatch(concurrency);
    Runnable r = new Runnable() {
        @Override
        public void run() {
            while (!ledgers.isEmpty()) {
                long lid = -1L;
                try {
                    lid = ledgers.take();
                    System.out.println("Recovering ledger " + lid);
                    bkAdmin.recoverBookieData(lid, bookieAddrs, dryrun, skipOpenLedgers);
                    System.out.println("Recovered ledger completed : " + lid + ", " + numPendings.decrementAndGet() + " left");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    doneLatch.countDown();
                    break;
                } catch (BKException ke) {
                    System.out.println("Recovered ledger failed : " + lid + ", rc = " + BKException.getMessage(ke.getCode()));
                }
            }
            doneLatch.countDown();
        }
    };
    for (int i = 0; i < concurrency; i++) {
        executorService.submit(r);
    }
    doneLatch.await();
    SchedulerUtils.shutdownScheduler(executorService, 2, TimeUnit.MINUTES);
    return 0;
}
 
源代码3 项目: yuzhouwan   文件: CollectionStuffTest.java
@Test
public void testQueue() throws Exception {
    LinkedBlockingQueue<Byte> lbq = new LinkedBlockingQueue<>();
    lbq.add(Byte.valueOf("1"));
    lbq.add(Byte.valueOf("2"));
    lbq.add(Byte.valueOf("3"));
    assertEquals(1, (byte) lbq.peek());
    assertEquals(1, (byte) lbq.peek());
    assertEquals(1, (byte) lbq.peek());

    Byte[] bufferList = new Byte[lbq.size()];
    Byte[] lbqList = lbq.toArray(bufferList);
    assertArrayEquals(bufferList, lbqList);
    assertSame(bufferList, lbqList);

    File file = new File("queue.txt");
    try (FileOutputStream fileChannel = new FileOutputStream(file)) {
        byte[] bytes = new byte[3];
        bytes[0] = Byte.parseByte("1");
        bytes[1] = Byte.parseByte("2");
        bytes[2] = Byte.parseByte("3");
        fileChannel.write(bytes);
        fileChannel.flush();
        fileChannel.close();

        try (FileReader fr = new FileReader(file)) {
            char[] chars = new char[3];
            assertEquals(3, fr.read(chars));
            assertEquals(1, chars[0]);
            assertEquals(2, chars[1]);
            assertEquals(3, chars[2]);

            assertEquals(1, (byte) lbq.remove());
            assertEquals(2, (byte) lbq.remove());
            assertEquals(3, (byte) lbq.remove());
        }
    } finally {
        retryDelete(file, 3);
    }
}
 
源代码4 项目: phoenix   文件: ConnectionCachingIT.java
long getNumCachedConnections(Connection conn) throws Exception {
  PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
  ConnectionQueryServices cqs = pConn.getQueryServices();
  // For whatever reason, we sometimes get a delegate here, and sometimes the real thing.
  if (cqs instanceof DelegateConnectionQueryServices) {
    cqs = ((DelegateConnectionQueryServices) cqs).getDelegate();
  }
  assertTrue("ConnectionQueryServices was a " + cqs.getClass(), cqs instanceof ConnectionQueryServicesImpl);
  ConnectionQueryServicesImpl cqsi = (ConnectionQueryServicesImpl) cqs;
  long cachedConnections = 0L;
  for (LinkedBlockingQueue<WeakReference<PhoenixConnection>> queue : cqsi.getCachedConnections()) {
    cachedConnections += queue.size();
  }
  return cachedConnections;
}
 
源代码5 项目: tez   文件: AsyncDispatcherConcurrent.java
public void handle(TezAbstractEvent event) {
  if (stopped) {
    return;
  }
  if (blockNewEvents) {
    return;
  }
  drained = false;
  
  // offload to specific dispatcher if one exists
  Class<? extends Enum> type = event.getType().getDeclaringClass();
  AsyncDispatcherConcurrent registeredDispatcher = eventDispatchers.get(type);
  if (registeredDispatcher != null) {
    registeredDispatcher.getEventHandler().handle(event);
    return;
  }
  
  int index = numThreads > 1 ? event.getSerializingHash() % numThreads : 0;

 // no registered dispatcher. use internal dispatcher.
  LinkedBlockingQueue<Event> queue = eventQueues.get(index);
  /* all this method does is enqueue all the events onto the queue */
  int qSize = queue.size();
  if (qSize !=0 && qSize %1000 == 0) {
    LOG.info("Size of event-queue is " + qSize);
  }
  int remCapacity = queue.remainingCapacity();
  if (remCapacity < 1000) {
    LOG.warn("Very low remaining capacity in the event-queue: "
        + remCapacity);
  }
  try {
    queue.put(event);
  } catch (InterruptedException e) {
    if (!stopped) {
      LOG.warn("AsyncDispatcher thread interrupted", e);
    }
    throw new YarnRuntimeException(e);
  }
}
 
源代码6 项目: ghidra   文件: ThreadedXmlParserTest.java
@SuppressWarnings("unchecked")
@Test
public void testInterruptingParserThreadDoesNotDeadlockClientThread() throws Exception {
	final ThreadedXmlPullParserImpl parser =
		new ThreadedXmlPullParserImpl(new ByteArrayInputStream(GOOD_XML.getBytes()),
			testName.getMethodName(), new TestErrorHandler(), false, 3);

	parser.start("doc");
	XmlElement projectXml = parser.start("project");
	assertNotNull(projectXml);
	assertEquals("foo", projectXml.getAttribute("name"));
	parser.end(projectXml);

	LinkedBlockingQueue<XmlElement> queue =
		(LinkedBlockingQueue<XmlElement>) getInstanceField("queue", parser);

	// wait until queue is filled
	while (queue.size() < 3) {
		Thread.yield();
	}

	ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
	Thread[] threads = new Thread[threadGroup.activeCount() * 2];
	threadGroup.enumerate(threads);

	Thread parserThread = null;
	for (Thread thread : threads) {
		if (thread.getName().startsWith("XMLParser-")) {
			parserThread = thread;
			break;
		}
	}

	assertNotNull(parserThread);

	// 
	// Empty the queue and make sure that we don't deadlock
	//
	final CyclicBarrier startBarrier = new CyclicBarrier(1);
	final boolean[] container = new boolean[] { false };
	new Thread(() -> {
		try {
			startBarrier.await();
		}
		catch (Throwable e) {
			e.printStackTrace();
		}

		while (parser.hasNext()) {
			parser.next();
		}

		container[0] = true;
	}).start();

	// 
	// Interrupt the thread to make sure that this doesn't destroy the world (or deadlock)
	//
	parserThread.interrupt();

	startBarrier.await();// tell the 

	waitForFinish(container);

}
 
源代码7 项目: java-slack-sdk   文件: AsyncRateLimitQueue.java
public Integer getCurrentActiveQueueSize(String methodNameWithSuffix) {
    LinkedBlockingQueue<Message> activeQueue = methodNameToActiveQueue.get(methodNameWithSuffix);
    return activeQueue != null ? activeQueue.size() : 0;
}
 
源代码8 项目: distributedlog   文件: DLAuditor.java
static <T> void executeAction(final LinkedBlockingQueue<T> queue,
                              final int numThreads,
                              final Action<T> action) throws IOException {
    final CountDownLatch failureLatch = new CountDownLatch(1);
    final CountDownLatch doneLatch = new CountDownLatch(queue.size());
    final AtomicInteger numFailures = new AtomicInteger(0);
    final AtomicInteger completedThreads = new AtomicInteger(0);

    ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
    try {
        for (int i = 0; i < numThreads; i++) {
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        T item = queue.poll();
                        if (null == item) {
                            break;
                        }
                        try {
                            action.execute(item);
                        } catch (IOException ioe) {
                            logger.error("Failed to execute action on item '{}'", item, ioe);
                            numFailures.incrementAndGet();
                            failureLatch.countDown();
                            break;
                        }
                        doneLatch.countDown();
                    }
                    if (numFailures.get() == 0 && completedThreads.incrementAndGet() == numThreads) {
                        failureLatch.countDown();
                    }
                }
            });
        }
        try {
            failureLatch.await();
            if (numFailures.get() > 0) {
                throw new IOException("Encountered " + numFailures.get() + " failures on executing action.");
            }
            doneLatch.await();
        } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            logger.warn("Interrupted on executing action", ie);
            throw new DLInterruptedException("Interrupted on executing action", ie);
        }
    } finally {
        executorService.shutdown();
    }
}
 
源代码9 项目: distributedlog   文件: DistributedLogTool.java
@Override
protected int runBKCmd(ZooKeeperClient zkc, BookKeeperClient bkc) throws Exception {
    BookKeeperAdmin bkAdmin = new BookKeeperAdmin(bkc.get());
    try {
        if (query) {
            return bkQuery(bkAdmin, bookiesSrc);
        }
        if (fenceOnly) {
            return bkFence(bkc, ledgers, fenceRate);
        }
        if (!force) {
            System.out.println("Bookies : " + bookiesSrc);
            if (!IOUtils.confirmPrompt("Do you want to recover them: (Y/N)")) {
                return -1;
            }
        }
        if (!ledgers.isEmpty()) {
            System.out.println("Ledgers : " + ledgers);
            long numProcessed = 0;
            Iterator<Long> ledgersIter = ledgers.iterator();
            LinkedBlockingQueue<Long> ledgersToProcess = new LinkedBlockingQueue<Long>();
            while (ledgersIter.hasNext()) {
                long lid = ledgersIter.next();
                if (numPartitions <=0 || (numPartitions > 0 && lid % numPartitions == partition)) {
                    ledgersToProcess.add(lid);
                    ++numProcessed;
                }
                if (ledgersToProcess.size() == 10000) {
                    System.out.println("Processing " + numProcessed + " ledgers");
                    bkRecovery(ledgersToProcess, bookiesSrc, dryrun, skipOpenLedgers);
                    ledgersToProcess.clear();
                    System.out.println("Processed " + numProcessed + " ledgers");
                }
            }
            if (!ledgersToProcess.isEmpty()) {
                System.out.println("Processing " + numProcessed + " ledgers");
                bkRecovery(ledgersToProcess, bookiesSrc, dryrun, skipOpenLedgers);
                System.out.println("Processed " + numProcessed + " ledgers");
            }
            System.out.println("Done.");
            CountDownLatch latch = new CountDownLatch(1);
            latch.await();
            return 0;
        }
        return bkRecovery(bkAdmin, bookiesSrc, dryrun, skipOpenLedgers);
    } finally {
        bkAdmin.close();
    }
}
 
源代码10 项目: distributedlog   文件: DLAuditor.java
static <T> void executeAction(final LinkedBlockingQueue<T> queue,
                              final int numThreads,
                              final Action<T> action) throws IOException {
    final CountDownLatch failureLatch = new CountDownLatch(1);
    final CountDownLatch doneLatch = new CountDownLatch(queue.size());
    final AtomicInteger numFailures = new AtomicInteger(0);
    final AtomicInteger completedThreads = new AtomicInteger(0);

    ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
    try {
        for (int i = 0 ; i < numThreads; i++) {
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        T item = queue.poll();
                        if (null == item) {
                            break;
                        }
                        try {
                            action.execute(item);
                        } catch (IOException ioe) {
                            logger.error("Failed to execute action on item '{}'", item, ioe);
                            numFailures.incrementAndGet();
                            failureLatch.countDown();
                            break;
                        }
                        doneLatch.countDown();
                    }
                    if (numFailures.get() == 0 && completedThreads.incrementAndGet() == numThreads) {
                        failureLatch.countDown();
                    }
                }
            });
        }
        try {
            failureLatch.await();
            if (numFailures.get() > 0) {
                throw new IOException("Encountered " + numFailures.get() + " failures on executing action.");
            }
            doneLatch.await();
        } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            logger.warn("Interrupted on executing action", ie);
            throw new DLInterruptedException("Interrupted on executing action", ie);
        }
    } finally {
        executorService.shutdown();
    }
}
 
源代码11 项目: streamsupport   文件: LBQSpliterator.java
private LBQSpliterator(LinkedBlockingQueue<E> queue) {
    this.queue = queue;
    this.est = queue.size();
    this.putLock = getPutLock(queue);
    this.takeLock = getTakeLock(queue);
}
 
源代码12 项目: mysql_perf_analyzer   文件: AlertScanner.java
public void scan()
{
	Set<String> clusternames = frameworkContext.getDbInfoManager().getMyDatabases(appUser.getName(), false).getMyDbList();
	logger.info("Start scan alerts");
	LinkedBlockingQueue<DBInstanceInfo> dbqueue = new LinkedBlockingQueue<DBInstanceInfo>();
	for(String cl: clusternames)
	{
		DBCredential cred = DBUtils.findDBCredential(frameworkContext, cl, appUser);
		if(cred==null)
		{
			logger.info("No credential for group "+cl+", skip it");
			continue;//log the error
		}
		DBGroupInfo cls = frameworkContext.getDbInfoManager().findGroup(cl);
		if(cls==null)
		{
			logger.info("Group "+cl+" might have been deleted.");
			continue;
		}
		
		for(DBInstanceInfo dbinfo: cls.getInstances())
		{
			dbqueue.add(dbinfo);
		}
	}
		
	int mythreadcnt = this.threadCount;
	if(dbqueue.size()<mythreadcnt)mythreadcnt = dbqueue.size();
	Thread th[] = new Thread[mythreadcnt];
	for(int i=0;i<mythreadcnt;i++)
	{
		AlertScannerRunner runner = new 
				AlertScannerRunner(frameworkContext,
					dbqueue,
					appUser);
		th[i] = new Thread(runner);
		th[i].setName("AlertScannerRunner - "+i);
		th[i].start();
	}
	for(int i=0;i<th.length;i++)try{th[i].join();}catch(Exception ex){}

	logger.info("Done alert scanner");
	this.frameworkContext.getAutoScanner().getMetricDb().flush();//notify persistent store
}
 
源代码13 项目: mysql_perf_analyzer   文件: MetricScanner.java
public void scan(int snap_id)
	{
		Set<String> clusternames = frameworkContext.getDbInfoManager().getMyDatabases(appUser.getName(), false).getMyDbList();
		logger.info("Start scan metrics");
		if (this.buffer == null)
		{
			logger.severe("Data buffer was not found. Scan cannot continue.");
			return;
		}
		LinkedBlockingQueue<DBInstanceInfo> dbqueue = new LinkedBlockingQueue<DBInstanceInfo>();
		for(String cl: clusternames)
		{
			DBCredential cred = DBUtils.findDBCredential(frameworkContext, cl, appUser);
			if(cred==null)
			{
				logger.info("No credential for group "+cl+", skip it");
				continue;//log the error
			}
			DBGroupInfo cls = frameworkContext.getDbInfoManager().findGroup(cl);
			if(cls==null)
			{
				logger.info("Group "+cl+" might have been deleted.");
				continue;
			}
			
			for(DBInstanceInfo dbinfo: cls.getInstances())
			{
				checkAndSetupMetricsBuffer(dbinfo);
				dbqueue.add(dbinfo);
			}
		}
			
		int mythreadcnt = this.threadCount;
		if(dbqueue.size()<mythreadcnt)mythreadcnt = dbqueue.size();
		Thread th[] = new Thread[mythreadcnt];
		metricsScannerRunners = new MetricScannerRunner[mythreadcnt];
		for(int i=0;i<mythreadcnt;i++)
		{
			MetricScannerRunner runner = new 
					MetricScannerRunner(frameworkContext,
						dbqueue,
						appUser,
						snap_id);
			runner.setBuffer(buffer);
//			runner.setBuiltinMetrics(builtinMetrics);
			th[i] = new Thread(runner);
			metricsScannerRunners[i] = runner;
			th[i].setName("MetricScannerRunner - "+i);
			th[i].start();
		}
		for(int i=0;i<th.length;i++)try{th[i].join();}catch(Exception ex){}
	
		logger.info("Done gather metrics");
		this.frameworkContext.getAutoScanner().getMetricDb().flush();//notify persistent store
	}
 
源代码14 项目: SeimiCrawler   文件: DefaultLocalQueue.java
@Override
public long len(String crawlerName) {
    LinkedBlockingQueue<Request> queue = getQueue(crawlerName);
    return queue.size();
}
 
源代码15 项目: incubator-gobblin   文件: AsyncWriterManagerTest.java
/**
 * In the presence of lots of failures, the manager should slow down
 * and not overwhelm the system.
 */
@Test (enabled=false)
public void testFlowControlWithWriteFailures()
    throws Exception {

  FlakyAsyncWriter flakyAsyncWriter =
      new FlakyAsyncWriter(org.apache.gobblin.test.ErrorManager.builder().errorType(ErrorManager.ErrorType.ALL).build());

  int maxOutstandingWrites = 2000;

  final AsyncWriterManager asyncWriterManager =
      AsyncWriterManager.builder().asyncDataWriter(flakyAsyncWriter).retriesEnabled(true).numRetries(5)
          .maxOutstandingWrites(maxOutstandingWrites).failureAllowanceRatio(1.0)  // ok to fail all the time
          .build();

  boolean verbose = false;
  if (verbose) {
    // Create a reporter for metrics. This reporter will write metrics to STDOUT.
    OutputStreamReporter.Factory.newBuilder().build(new Properties());
    // Start all metric reporters.
    RootMetricContext.get().startReporting();
  }
  final int load = 10000; // 10k records per sec
  final long tickDiffInNanos = (1000 * 1000 * 1000) / load;

  final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

  scheduler.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
      GenericRecord record = TestUtils.generateRandomAvroRecord();
      try {
        asyncWriterManager.write(record);
      } catch (IOException e) {
        log.error("Failure during write", e);
        Throwables.propagate(e);
      }
    }
  }, 0, tickDiffInNanos, TimeUnit.NANOSECONDS);

  LinkedBlockingQueue retryQueue = (LinkedBlockingQueue) asyncWriterManager.retryQueue.get();

  int sleepTime = 100;
  int totalTime = 10000;
  for (int i = 0; i < (totalTime / sleepTime); ++i) {
    Thread.sleep(sleepTime);
    int retryQueueSize = retryQueue.size();
    Assert.assertTrue(retryQueueSize <= (maxOutstandingWrites + 1),
        "Retry queue should never exceed the " + "maxOutstandingWrites. Found " + retryQueueSize);
    log.debug("Retry queue size = {}", retryQueue.size());
  }

  scheduler.shutdown();
  asyncWriterManager.commit();
  long recordsIn = asyncWriterManager.recordsIn.getCount();
  long recordsAttempted = asyncWriterManager.recordsAttempted.getCount();
  String msg = String.format("recordsIn = %d, recordsAttempted = %d.", recordsIn, recordsAttempted);
  log.info(msg);
  Assert.assertTrue(recordsAttempted > recordsIn, "There must have been a bunch of failures");
  Assert.assertTrue(retryQueue.size() == 0, "Retry queue should be empty");
}