下面列出了java.util.concurrent.LinkedBlockingQueue#size ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@SuppressWarnings("unchecked")
@Test
public void testGoodXmlEarlyExit() throws Exception {
ThreadedXmlPullParserImpl parser =
new ThreadedXmlPullParserImpl(new ByteArrayInputStream(GOOD_XML.getBytes()),
testName.getMethodName(), new TestErrorHandler(), false, 3);
parser.start("doc");
XmlElement projectXml = parser.start("project");
assertNotNull(projectXml);
assertEquals("foo", projectXml.getAttribute("name"));
parser.end(projectXml);
LinkedBlockingQueue<XmlElement> queue =
(LinkedBlockingQueue<XmlElement>) getInstanceField("queue", parser);
// wait until queue is filled
while (queue.size() < 3) {
Thread.yield();
}
assertTrue("parser should be running", parser.isParsing());
parser.dispose();
int count = 0;
while (parser.isParsing()) {
if (count++ > 20) {
Assert.fail("parser should have shutdown");
}
Thread.sleep(1);
}
assertTrue("parser should be shutdown", !parser.isParsing());
}
private int bkRecovery(final BookKeeperAdmin bkAdmin, final LinkedBlockingQueue<Long> ledgers,
final Set<BookieSocketAddress> bookieAddrs,
final boolean dryrun, final boolean skipOpenLedgers)
throws InterruptedException, BKException {
final AtomicInteger numPendings = new AtomicInteger(ledgers.size());
final ExecutorService executorService = Executors.newCachedThreadPool();
final CountDownLatch doneLatch = new CountDownLatch(concurrency);
Runnable r = new Runnable() {
@Override
public void run() {
while (!ledgers.isEmpty()) {
long lid = -1L;
try {
lid = ledgers.take();
System.out.println("Recovering ledger " + lid);
bkAdmin.recoverBookieData(lid, bookieAddrs, dryrun, skipOpenLedgers);
System.out.println("Recovered ledger completed : " + lid + ", " + numPendings.decrementAndGet() + " left");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
doneLatch.countDown();
break;
} catch (BKException ke) {
System.out.println("Recovered ledger failed : " + lid + ", rc = " + BKException.getMessage(ke.getCode()));
}
}
doneLatch.countDown();
}
};
for (int i = 0; i < concurrency; i++) {
executorService.submit(r);
}
doneLatch.await();
SchedulerUtils.shutdownScheduler(executorService, 2, TimeUnit.MINUTES);
return 0;
}
@Test
public void testQueue() throws Exception {
LinkedBlockingQueue<Byte> lbq = new LinkedBlockingQueue<>();
lbq.add(Byte.valueOf("1"));
lbq.add(Byte.valueOf("2"));
lbq.add(Byte.valueOf("3"));
assertEquals(1, (byte) lbq.peek());
assertEquals(1, (byte) lbq.peek());
assertEquals(1, (byte) lbq.peek());
Byte[] bufferList = new Byte[lbq.size()];
Byte[] lbqList = lbq.toArray(bufferList);
assertArrayEquals(bufferList, lbqList);
assertSame(bufferList, lbqList);
File file = new File("queue.txt");
try (FileOutputStream fileChannel = new FileOutputStream(file)) {
byte[] bytes = new byte[3];
bytes[0] = Byte.parseByte("1");
bytes[1] = Byte.parseByte("2");
bytes[2] = Byte.parseByte("3");
fileChannel.write(bytes);
fileChannel.flush();
fileChannel.close();
try (FileReader fr = new FileReader(file)) {
char[] chars = new char[3];
assertEquals(3, fr.read(chars));
assertEquals(1, chars[0]);
assertEquals(2, chars[1]);
assertEquals(3, chars[2]);
assertEquals(1, (byte) lbq.remove());
assertEquals(2, (byte) lbq.remove());
assertEquals(3, (byte) lbq.remove());
}
} finally {
retryDelete(file, 3);
}
}
long getNumCachedConnections(Connection conn) throws Exception {
PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
ConnectionQueryServices cqs = pConn.getQueryServices();
// For whatever reason, we sometimes get a delegate here, and sometimes the real thing.
if (cqs instanceof DelegateConnectionQueryServices) {
cqs = ((DelegateConnectionQueryServices) cqs).getDelegate();
}
assertTrue("ConnectionQueryServices was a " + cqs.getClass(), cqs instanceof ConnectionQueryServicesImpl);
ConnectionQueryServicesImpl cqsi = (ConnectionQueryServicesImpl) cqs;
long cachedConnections = 0L;
for (LinkedBlockingQueue<WeakReference<PhoenixConnection>> queue : cqsi.getCachedConnections()) {
cachedConnections += queue.size();
}
return cachedConnections;
}
public void handle(TezAbstractEvent event) {
if (stopped) {
return;
}
if (blockNewEvents) {
return;
}
drained = false;
// offload to specific dispatcher if one exists
Class<? extends Enum> type = event.getType().getDeclaringClass();
AsyncDispatcherConcurrent registeredDispatcher = eventDispatchers.get(type);
if (registeredDispatcher != null) {
registeredDispatcher.getEventHandler().handle(event);
return;
}
int index = numThreads > 1 ? event.getSerializingHash() % numThreads : 0;
// no registered dispatcher. use internal dispatcher.
LinkedBlockingQueue<Event> queue = eventQueues.get(index);
/* all this method does is enqueue all the events onto the queue */
int qSize = queue.size();
if (qSize !=0 && qSize %1000 == 0) {
LOG.info("Size of event-queue is " + qSize);
}
int remCapacity = queue.remainingCapacity();
if (remCapacity < 1000) {
LOG.warn("Very low remaining capacity in the event-queue: "
+ remCapacity);
}
try {
queue.put(event);
} catch (InterruptedException e) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", e);
}
throw new YarnRuntimeException(e);
}
}
@SuppressWarnings("unchecked")
@Test
public void testInterruptingParserThreadDoesNotDeadlockClientThread() throws Exception {
final ThreadedXmlPullParserImpl parser =
new ThreadedXmlPullParserImpl(new ByteArrayInputStream(GOOD_XML.getBytes()),
testName.getMethodName(), new TestErrorHandler(), false, 3);
parser.start("doc");
XmlElement projectXml = parser.start("project");
assertNotNull(projectXml);
assertEquals("foo", projectXml.getAttribute("name"));
parser.end(projectXml);
LinkedBlockingQueue<XmlElement> queue =
(LinkedBlockingQueue<XmlElement>) getInstanceField("queue", parser);
// wait until queue is filled
while (queue.size() < 3) {
Thread.yield();
}
ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
Thread[] threads = new Thread[threadGroup.activeCount() * 2];
threadGroup.enumerate(threads);
Thread parserThread = null;
for (Thread thread : threads) {
if (thread.getName().startsWith("XMLParser-")) {
parserThread = thread;
break;
}
}
assertNotNull(parserThread);
//
// Empty the queue and make sure that we don't deadlock
//
final CyclicBarrier startBarrier = new CyclicBarrier(1);
final boolean[] container = new boolean[] { false };
new Thread(() -> {
try {
startBarrier.await();
}
catch (Throwable e) {
e.printStackTrace();
}
while (parser.hasNext()) {
parser.next();
}
container[0] = true;
}).start();
//
// Interrupt the thread to make sure that this doesn't destroy the world (or deadlock)
//
parserThread.interrupt();
startBarrier.await();// tell the
waitForFinish(container);
}
public Integer getCurrentActiveQueueSize(String methodNameWithSuffix) {
LinkedBlockingQueue<Message> activeQueue = methodNameToActiveQueue.get(methodNameWithSuffix);
return activeQueue != null ? activeQueue.size() : 0;
}
static <T> void executeAction(final LinkedBlockingQueue<T> queue,
final int numThreads,
final Action<T> action) throws IOException {
final CountDownLatch failureLatch = new CountDownLatch(1);
final CountDownLatch doneLatch = new CountDownLatch(queue.size());
final AtomicInteger numFailures = new AtomicInteger(0);
final AtomicInteger completedThreads = new AtomicInteger(0);
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
try {
for (int i = 0; i < numThreads; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
while (true) {
T item = queue.poll();
if (null == item) {
break;
}
try {
action.execute(item);
} catch (IOException ioe) {
logger.error("Failed to execute action on item '{}'", item, ioe);
numFailures.incrementAndGet();
failureLatch.countDown();
break;
}
doneLatch.countDown();
}
if (numFailures.get() == 0 && completedThreads.incrementAndGet() == numThreads) {
failureLatch.countDown();
}
}
});
}
try {
failureLatch.await();
if (numFailures.get() > 0) {
throw new IOException("Encountered " + numFailures.get() + " failures on executing action.");
}
doneLatch.await();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
logger.warn("Interrupted on executing action", ie);
throw new DLInterruptedException("Interrupted on executing action", ie);
}
} finally {
executorService.shutdown();
}
}
@Override
protected int runBKCmd(ZooKeeperClient zkc, BookKeeperClient bkc) throws Exception {
BookKeeperAdmin bkAdmin = new BookKeeperAdmin(bkc.get());
try {
if (query) {
return bkQuery(bkAdmin, bookiesSrc);
}
if (fenceOnly) {
return bkFence(bkc, ledgers, fenceRate);
}
if (!force) {
System.out.println("Bookies : " + bookiesSrc);
if (!IOUtils.confirmPrompt("Do you want to recover them: (Y/N)")) {
return -1;
}
}
if (!ledgers.isEmpty()) {
System.out.println("Ledgers : " + ledgers);
long numProcessed = 0;
Iterator<Long> ledgersIter = ledgers.iterator();
LinkedBlockingQueue<Long> ledgersToProcess = new LinkedBlockingQueue<Long>();
while (ledgersIter.hasNext()) {
long lid = ledgersIter.next();
if (numPartitions <=0 || (numPartitions > 0 && lid % numPartitions == partition)) {
ledgersToProcess.add(lid);
++numProcessed;
}
if (ledgersToProcess.size() == 10000) {
System.out.println("Processing " + numProcessed + " ledgers");
bkRecovery(ledgersToProcess, bookiesSrc, dryrun, skipOpenLedgers);
ledgersToProcess.clear();
System.out.println("Processed " + numProcessed + " ledgers");
}
}
if (!ledgersToProcess.isEmpty()) {
System.out.println("Processing " + numProcessed + " ledgers");
bkRecovery(ledgersToProcess, bookiesSrc, dryrun, skipOpenLedgers);
System.out.println("Processed " + numProcessed + " ledgers");
}
System.out.println("Done.");
CountDownLatch latch = new CountDownLatch(1);
latch.await();
return 0;
}
return bkRecovery(bkAdmin, bookiesSrc, dryrun, skipOpenLedgers);
} finally {
bkAdmin.close();
}
}
static <T> void executeAction(final LinkedBlockingQueue<T> queue,
final int numThreads,
final Action<T> action) throws IOException {
final CountDownLatch failureLatch = new CountDownLatch(1);
final CountDownLatch doneLatch = new CountDownLatch(queue.size());
final AtomicInteger numFailures = new AtomicInteger(0);
final AtomicInteger completedThreads = new AtomicInteger(0);
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
try {
for (int i = 0 ; i < numThreads; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
while (true) {
T item = queue.poll();
if (null == item) {
break;
}
try {
action.execute(item);
} catch (IOException ioe) {
logger.error("Failed to execute action on item '{}'", item, ioe);
numFailures.incrementAndGet();
failureLatch.countDown();
break;
}
doneLatch.countDown();
}
if (numFailures.get() == 0 && completedThreads.incrementAndGet() == numThreads) {
failureLatch.countDown();
}
}
});
}
try {
failureLatch.await();
if (numFailures.get() > 0) {
throw new IOException("Encountered " + numFailures.get() + " failures on executing action.");
}
doneLatch.await();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
logger.warn("Interrupted on executing action", ie);
throw new DLInterruptedException("Interrupted on executing action", ie);
}
} finally {
executorService.shutdown();
}
}
private LBQSpliterator(LinkedBlockingQueue<E> queue) {
this.queue = queue;
this.est = queue.size();
this.putLock = getPutLock(queue);
this.takeLock = getTakeLock(queue);
}
public void scan()
{
Set<String> clusternames = frameworkContext.getDbInfoManager().getMyDatabases(appUser.getName(), false).getMyDbList();
logger.info("Start scan alerts");
LinkedBlockingQueue<DBInstanceInfo> dbqueue = new LinkedBlockingQueue<DBInstanceInfo>();
for(String cl: clusternames)
{
DBCredential cred = DBUtils.findDBCredential(frameworkContext, cl, appUser);
if(cred==null)
{
logger.info("No credential for group "+cl+", skip it");
continue;//log the error
}
DBGroupInfo cls = frameworkContext.getDbInfoManager().findGroup(cl);
if(cls==null)
{
logger.info("Group "+cl+" might have been deleted.");
continue;
}
for(DBInstanceInfo dbinfo: cls.getInstances())
{
dbqueue.add(dbinfo);
}
}
int mythreadcnt = this.threadCount;
if(dbqueue.size()<mythreadcnt)mythreadcnt = dbqueue.size();
Thread th[] = new Thread[mythreadcnt];
for(int i=0;i<mythreadcnt;i++)
{
AlertScannerRunner runner = new
AlertScannerRunner(frameworkContext,
dbqueue,
appUser);
th[i] = new Thread(runner);
th[i].setName("AlertScannerRunner - "+i);
th[i].start();
}
for(int i=0;i<th.length;i++)try{th[i].join();}catch(Exception ex){}
logger.info("Done alert scanner");
this.frameworkContext.getAutoScanner().getMetricDb().flush();//notify persistent store
}
public void scan(int snap_id)
{
Set<String> clusternames = frameworkContext.getDbInfoManager().getMyDatabases(appUser.getName(), false).getMyDbList();
logger.info("Start scan metrics");
if (this.buffer == null)
{
logger.severe("Data buffer was not found. Scan cannot continue.");
return;
}
LinkedBlockingQueue<DBInstanceInfo> dbqueue = new LinkedBlockingQueue<DBInstanceInfo>();
for(String cl: clusternames)
{
DBCredential cred = DBUtils.findDBCredential(frameworkContext, cl, appUser);
if(cred==null)
{
logger.info("No credential for group "+cl+", skip it");
continue;//log the error
}
DBGroupInfo cls = frameworkContext.getDbInfoManager().findGroup(cl);
if(cls==null)
{
logger.info("Group "+cl+" might have been deleted.");
continue;
}
for(DBInstanceInfo dbinfo: cls.getInstances())
{
checkAndSetupMetricsBuffer(dbinfo);
dbqueue.add(dbinfo);
}
}
int mythreadcnt = this.threadCount;
if(dbqueue.size()<mythreadcnt)mythreadcnt = dbqueue.size();
Thread th[] = new Thread[mythreadcnt];
metricsScannerRunners = new MetricScannerRunner[mythreadcnt];
for(int i=0;i<mythreadcnt;i++)
{
MetricScannerRunner runner = new
MetricScannerRunner(frameworkContext,
dbqueue,
appUser,
snap_id);
runner.setBuffer(buffer);
// runner.setBuiltinMetrics(builtinMetrics);
th[i] = new Thread(runner);
metricsScannerRunners[i] = runner;
th[i].setName("MetricScannerRunner - "+i);
th[i].start();
}
for(int i=0;i<th.length;i++)try{th[i].join();}catch(Exception ex){}
logger.info("Done gather metrics");
this.frameworkContext.getAutoScanner().getMetricDb().flush();//notify persistent store
}
@Override
public long len(String crawlerName) {
LinkedBlockingQueue<Request> queue = getQueue(crawlerName);
return queue.size();
}
/**
* In the presence of lots of failures, the manager should slow down
* and not overwhelm the system.
*/
@Test (enabled=false)
public void testFlowControlWithWriteFailures()
throws Exception {
FlakyAsyncWriter flakyAsyncWriter =
new FlakyAsyncWriter(org.apache.gobblin.test.ErrorManager.builder().errorType(ErrorManager.ErrorType.ALL).build());
int maxOutstandingWrites = 2000;
final AsyncWriterManager asyncWriterManager =
AsyncWriterManager.builder().asyncDataWriter(flakyAsyncWriter).retriesEnabled(true).numRetries(5)
.maxOutstandingWrites(maxOutstandingWrites).failureAllowanceRatio(1.0) // ok to fail all the time
.build();
boolean verbose = false;
if (verbose) {
// Create a reporter for metrics. This reporter will write metrics to STDOUT.
OutputStreamReporter.Factory.newBuilder().build(new Properties());
// Start all metric reporters.
RootMetricContext.get().startReporting();
}
final int load = 10000; // 10k records per sec
final long tickDiffInNanos = (1000 * 1000 * 1000) / load;
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
GenericRecord record = TestUtils.generateRandomAvroRecord();
try {
asyncWriterManager.write(record);
} catch (IOException e) {
log.error("Failure during write", e);
Throwables.propagate(e);
}
}
}, 0, tickDiffInNanos, TimeUnit.NANOSECONDS);
LinkedBlockingQueue retryQueue = (LinkedBlockingQueue) asyncWriterManager.retryQueue.get();
int sleepTime = 100;
int totalTime = 10000;
for (int i = 0; i < (totalTime / sleepTime); ++i) {
Thread.sleep(sleepTime);
int retryQueueSize = retryQueue.size();
Assert.assertTrue(retryQueueSize <= (maxOutstandingWrites + 1),
"Retry queue should never exceed the " + "maxOutstandingWrites. Found " + retryQueueSize);
log.debug("Retry queue size = {}", retryQueue.size());
}
scheduler.shutdown();
asyncWriterManager.commit();
long recordsIn = asyncWriterManager.recordsIn.getCount();
long recordsAttempted = asyncWriterManager.recordsAttempted.getCount();
String msg = String.format("recordsIn = %d, recordsAttempted = %d.", recordsIn, recordsAttempted);
log.info(msg);
Assert.assertTrue(recordsAttempted > recordsIn, "There must have been a bunch of failures");
Assert.assertTrue(retryQueue.size() == 0, "Retry queue should be empty");
}