下面列出了java.util.concurrent.LinkedBlockingQueue#add ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private Map<String, Long> calculateStreamSpaceUsage(
final URI uri, final Namespace namespace)
throws IOException {
Iterator<String> streams = namespace.getLogs();
final LinkedBlockingQueue<String> streamQueue = new LinkedBlockingQueue<String>();
while (streams.hasNext()) {
streamQueue.add(streams.next());
}
final Map<String, Long> streamSpaceUsageMap =
new ConcurrentSkipListMap<String, Long>();
final AtomicInteger numStreamsCollected = new AtomicInteger(0);
executeAction(streamQueue, 10, new Action<String>() {
@Override
public void execute(String stream) throws IOException {
streamSpaceUsageMap.put(stream,
calculateStreamSpaceUsage(namespace, stream));
if (numStreamsCollected.incrementAndGet() % 1000 == 0) {
logger.info("Calculated {} streams from uri {}.", numStreamsCollected.get(), uri);
}
}
});
return streamSpaceUsageMap;
}
private void collectLedgersFromDL(final URI uri,
final Namespace namespace,
final Set<Long> ledgers) throws IOException {
logger.info("Enumerating {} to collect streams.", uri);
Iterator<String> streams = namespace.getLogs();
final LinkedBlockingQueue<String> streamQueue = new LinkedBlockingQueue<String>();
while (streams.hasNext()) {
streamQueue.add(streams.next());
}
logger.info("Collected {} streams from uri {} : {}",
new Object[] { streamQueue.size(), uri, streams });
executeAction(streamQueue, 10, new Action<String>() {
@Override
public void execute(String stream) throws IOException {
collectLedgersFromStream(namespace, stream, ledgers);
}
});
}
@Test
public void backpressureScheduledCron(){
captured= "";
diff = System.currentTimeMillis();
LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(1);
blockingQueue.add("10");
blockingQueue.offer("10");
ReactiveSeq.range(0, Integer.MAX_VALUE)
.limit(2)
.peek(v-> diff = System.currentTimeMillis())
.map(i -> i.toString())
.schedule("* * * * * ?", scheduled)
.connect(blockingQueue)
.onePer(2, TimeUnit.SECONDS)
.peek(i->System.out.println("BQ " + blockingQueue))
.peek(System.out::println)
.forEach(c->captured=c);
assertThat(System.currentTimeMillis() - diff,greaterThan(1500l));
}
public void testDismissingQueuedKeyboard(){
Integer testCancelID = 42;
// Currently executing operation
PresentKeyboardOperation testKeyboardOp = mock(PresentKeyboardOperation.class);
doReturn(true).when(testKeyboardOp).isExecuting();
doReturn(96).when(testKeyboardOp).getCancelID();
csm.currentlyPresentedKeyboardOperation = testKeyboardOp;
// Queued operations
PresentKeyboardOperation testKeyboardOp2 = mock(PresentKeyboardOperation.class);
doReturn(true).when(testKeyboardOp2).isExecuting();
doReturn(testCancelID).when(testKeyboardOp2).getCancelID();
LinkedBlockingQueue<Runnable> testOperationQueue = new LinkedBlockingQueue<>();
testOperationQueue.add(testKeyboardOp2);
csm.operationQueue = testOperationQueue;
// Queued operation should be canceled
csm.dismissKeyboard(testCancelID);
verify(testKeyboardOp, times(0)).dismissKeyboard();
verify(testKeyboardOp2, times(1)).dismissKeyboard();
}
/**
* drainTo(c) empties queue into another collection c
*/
public void testDrainTo() {
LinkedBlockingQueue q = populatedQueue(SIZE);
ArrayList l = new ArrayList();
q.drainTo(l);
assertEquals(0, q.size());
assertEquals(SIZE, l.size());
for (int i = 0; i < SIZE; ++i)
assertEquals(l.get(i), new Integer(i));
q.add(zero);
q.add(one);
assertFalse(q.isEmpty());
assertTrue(q.contains(zero));
assertTrue(q.contains(one));
l.clear();
q.drainTo(l);
assertEquals(0, q.size());
assertEquals(2, l.size());
for (int i = 0; i < 2; ++i)
assertEquals(l.get(i), new Integer(i));
}
/**
* offer transfers elements across Executor tasks
*/
public void testOfferInExecutor() {
final LinkedBlockingQueue q = new LinkedBlockingQueue(2);
q.add(one);
q.add(two);
final CheckedBarrier threadsStarted = new CheckedBarrier(2);
final ExecutorService executor = Executors.newFixedThreadPool(2);
try (PoolCleaner cleaner = cleaner(executor)) {
executor.execute(new CheckedRunnable() {
public void realRun() throws InterruptedException {
assertFalse(q.offer(three));
threadsStarted.await();
assertTrue(q.offer(three, LONG_DELAY_MS, MILLISECONDS));
assertEquals(0, q.remainingCapacity());
}});
executor.execute(new CheckedRunnable() {
public void realRun() throws InterruptedException {
threadsStarted.await();
assertSame(one, q.take());
}});
}
}
@Test
public void testRenewLeaseTaskBehaviorOnError() throws Exception {
// add connection to the queue
PhoenixConnection pconn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class);
LinkedBlockingQueue<WeakReference<PhoenixConnection>> connectionsQueue = new LinkedBlockingQueue<>();
connectionsQueue.add(new WeakReference<PhoenixConnection>(pconn));
// create a scanner and add it to the queue
int numLeaseRenewals = 4;
int lockNotAcquiredAt = 1;
int thresholdNotReachedCount = 2;
int failLeaseRenewalAt = 3;
RenewLeaseOnlyTableIterator itr = new RenewLeaseOnlyTableIterator(numLeaseRenewals, thresholdNotReachedCount, lockNotAcquiredAt, failLeaseRenewalAt);
LinkedBlockingQueue<WeakReference<TableResultIterator>> scannerQueue = pconn.getScanners();
scannerQueue.add(new WeakReference<TableResultIterator>(itr));
RenewLeaseTask task = new RenewLeaseTask(connectionsQueue);
assertTrue(connectionsQueue.size() == 1);
assertTrue(scannerQueue.size() == 1);
task.run();
assertTrue(connectionsQueue.size() == 1);
assertTrue(scannerQueue.size() == 1); // lock not acquired
assertEquals(LOCK_NOT_ACQUIRED, itr.getLastRenewLeaseStatus());
task.run();
assertTrue(scannerQueue.size() == 1);
assertTrue(connectionsQueue.size() == 1); // renew lease skipped but scanner still in the queue
assertEquals(THRESHOLD_NOT_REACHED, itr.getLastRenewLeaseStatus());
task.run();
assertTrue(scannerQueue.size() == 0);
assertTrue(connectionsQueue.size() == 0); // there was only one connection in the connectionsQueue and it wasn't added back because of error
pconn.close();
task.run();
assertTrue(scannerQueue.size() == 0);
assertTrue("Closing the connection should have removed it from the queue", connectionsQueue.size() == 0);
}
/**
* Creates an new ChannelWriterOutputView that writes to the given channel and buffers data
* in the given memory segments. If the given memory segments are null, the writer takes its buffers
* directly from the return queue of the writer. Note that this variant locks if no buffers are contained
* in the return queue.
*
* @param writer The writer to write to.
* @param memory The memory used to buffer data, or null, to utilize solely the return queue.
* @param segmentSize The size of the memory segments.
*/
public ChannelWriterOutputView(BlockChannelWriter<MemorySegment> writer, List<MemorySegment> memory, int segmentSize) {
super(segmentSize, HEADER_LENGTH);
if (writer == null) {
throw new NullPointerException();
}
this.writer = writer;
if (memory == null) {
this.numSegments = 0;
} else {
this.numSegments = memory.size();
// load the segments into the queue
final LinkedBlockingQueue<MemorySegment> queue = writer.getReturnQueue();
for (int i = memory.size() - 1; i >= 0; --i) {
final MemorySegment seg = memory.get(i);
if (seg.size() != segmentSize) {
throw new IllegalArgumentException("The supplied memory segments are not of the specified size.");
}
queue.add(seg);
}
}
// get the first segment
try {
advance();
}
catch (IOException ioex) {
throw new RuntimeException("BUG: IOException occurred while getting first block for ChannelWriterOutputView.", ioex);
}
}
public static URI getPrimaryNameNode() {
StringTokenizer tupleTokenizer = new StringTokenizer(CrailConstants.NAMENODE_ADDRESS, ",");
LinkedBlockingQueue<URI> namenodes = new LinkedBlockingQueue<URI>();
while(tupleTokenizer.hasMoreTokens()){
String address = tupleTokenizer.nextToken();
URI uri = URI.create(address);
namenodes.add(uri);
}
URI master = namenodes.poll();
return master;
}
/**
* iterator ordering is FIFO
*/
public void testIteratorOrdering() {
final LinkedBlockingQueue q = new LinkedBlockingQueue(3);
q.add(one);
q.add(two);
q.add(three);
assertEquals(0, q.remainingCapacity());
int k = 0;
for (Iterator it = q.iterator(); it.hasNext();) {
assertEquals(++k, it.next());
}
assertEquals(3, k);
}
/**
* Queue transitions from empty to full when elements added
*/
public void testEmptyFull() {
LinkedBlockingQueue q = new LinkedBlockingQueue(2);
assertTrue(q.isEmpty());
assertEquals("should have room for 2", 2, q.remainingCapacity());
q.add(one);
assertFalse(q.isEmpty());
q.add(two);
assertFalse(q.isEmpty());
assertEquals(0, q.remainingCapacity());
assertFalse(q.offer(three));
}
/**
* clear removes all elements
*/
public void testClear() {
LinkedBlockingQueue q = populatedQueue(SIZE);
q.clear();
assertTrue(q.isEmpty());
assertEquals(0, q.size());
assertEquals(SIZE, q.remainingCapacity());
q.add(one);
assertFalse(q.isEmpty());
assertTrue(q.contains(one));
q.clear();
assertTrue(q.isEmpty());
}
public static URI getPrimaryNameNode() {
StringTokenizer tupleTokenizer = new StringTokenizer(CrailConstants.NAMENODE_ADDRESS, ",");
LinkedBlockingQueue<URI> namenodes = new LinkedBlockingQueue<URI>();
while(tupleTokenizer.hasMoreTokens()){
String address = tupleTokenizer.nextToken();
URI uri = URI.create(address);
namenodes.add(uri);
}
URI master = namenodes.poll();
return master;
}
/**
* 往消息队列中添加一条未接收的消息
*
* @param chatting
* @author Feng
*/
public void addChatting(Chatting chatting) {
LinkedBlockingQueue<Chatting> chattingQueue;
if (!chattingHashtable.containsKey(chatting.getReceiverUserId())) {
chattingQueue = new LinkedBlockingQueue<Chatting>();
chattingHashtable.put(chatting.getReceiverUserId() + "", chattingQueue);
} else
chattingQueue = chattingHashtable.get(chatting.getReceiverUserId());
chattingQueue.add(chatting);
}
private synchronized LinkedBlockingQueue<IAtomContainer> singleSolution() {
LOGGER.debug("Calling MCSSTask " + taskNumber + " with " + mcssList.size() + " items");
LinkedBlockingQueue<IAtomContainer> mcss = new LinkedBlockingQueue<>();
long startTime = getInstance().getTimeInMillis();
IAtomContainer querySeed = mcssList.get(0);
long calcTime = startTime;
try {
for (int index = 1; index < mcssList.size(); index++) {
IAtomContainer target = removeHydrogens(mcssList.get(index));
Collection<Fragment> fragmentsFomMCS;
BaseMapping comparison;
comparison = new Isomorphism(querySeed, target, DEFAULT, atomMatcher, bondMatcher);
comparison.setChemFilters(true, true, true);
fragmentsFomMCS = getMCSS(comparison);
LOGGER.debug("comparison for task " + taskNumber + " has " + fragmentsFomMCS.size()
+ " unique matches of size " + comparison.getFirstAtomMapping().getCount());
LOGGER.debug("MCSS for task " + taskNumber + " has " + querySeed.getAtomCount() + " atoms, and " + querySeed.getBondCount() + " bonds");
LOGGER.debug("Target for task " + taskNumber + " has " + target.getAtomCount() + " atoms, and " + target.getBondCount() + " bonds");
long endCalcTime = getInstance().getTimeInMillis();
LOGGER.debug("Task " + taskNumber + " index " + index + " took " + (endCalcTime - calcTime) + "ms");
calcTime = endCalcTime;
if (fragmentsFomMCS.isEmpty()) {
break;
}
querySeed = fragmentsFomMCS.iterator().next().getContainer();
}
if (querySeed != null) {
mcss.add(querySeed);
long endTime = getInstance().getTimeInMillis();
LOGGER.debug("Done: task " + taskNumber + " took " + (endTime - startTime) + "ms");
LOGGER.debug(" and mcss has " + querySeed.getAtomCount() + " atoms, and " + querySeed.getBondCount() + " bonds");
}
} catch (Exception e) {
LOGGER.error("ERROR IN MCS Thread: ", e.getMessage());
}
return mcss;
}
@Override
protected int runCmd() throws Exception {
String rootPath = getUri().getPath() + "/" + allocationPoolPath;
final ScheduledExecutorService allocationExecutor = Executors.newSingleThreadScheduledExecutor();
ExecutorService executorService = Executors.newFixedThreadPool(concurrency);
checkArgument(getNamespace() instanceof BKDistributedLogNamespace);
BKDistributedLogNamespace bkns = (BKDistributedLogNamespace) getNamespace();
final ZooKeeperClient zkc = ((BKNamespaceDriver) bkns.getNamespaceDriver()).getWriterZKC();
final BookKeeperClient bkc = ((BKNamespaceDriver) bkns.getNamespaceDriver()).getReaderBKC();
try {
List<String> pools = zkc.get().getChildren(rootPath, false);
final LinkedBlockingQueue<String> poolsToDelete = new LinkedBlockingQueue<String>();
if (getForce() || IOUtils.confirmPrompt("Are you sure you want to delete allocator pools : " + pools)) {
for (String pool : pools) {
poolsToDelete.add(rootPath + "/" + pool);
}
final CountDownLatch doneLatch = new CountDownLatch(concurrency);
for (int i = 0; i < concurrency; i++) {
final int tid = i;
executorService.submit(new Runnable() {
@Override
public void run() {
while (!poolsToDelete.isEmpty()) {
String poolPath = poolsToDelete.poll();
if (null == poolPath) {
break;
}
try {
LedgerAllocator allocator =
LedgerAllocatorUtils.createLedgerAllocatorPool(poolPath, 0, getConf(),
zkc, bkc,
allocationExecutor);
allocator.delete();
System.out.println("Deleted allocator pool : " + poolPath + " .");
} catch (IOException ioe) {
System.err.println("Failed to delete allocator pool "
+ poolPath + " : " + ioe.getMessage());
}
}
doneLatch.countDown();
System.out.println("Thread " + tid + " is done.");
}
});
}
doneLatch.await();
}
} finally {
executorService.shutdown();
allocationExecutor.shutdown();
}
return 0;
}
public void scan()
{
Set<String> clusternames = frameworkContext.getDbInfoManager().getMyDatabases(appUser.getName(), false).getMyDbList();
logger.info("Start scan alerts");
LinkedBlockingQueue<DBInstanceInfo> dbqueue = new LinkedBlockingQueue<DBInstanceInfo>();
for(String cl: clusternames)
{
DBCredential cred = DBUtils.findDBCredential(frameworkContext, cl, appUser);
if(cred==null)
{
logger.info("No credential for group "+cl+", skip it");
continue;//log the error
}
DBGroupInfo cls = frameworkContext.getDbInfoManager().findGroup(cl);
if(cls==null)
{
logger.info("Group "+cl+" might have been deleted.");
continue;
}
for(DBInstanceInfo dbinfo: cls.getInstances())
{
dbqueue.add(dbinfo);
}
}
int mythreadcnt = this.threadCount;
if(dbqueue.size()<mythreadcnt)mythreadcnt = dbqueue.size();
Thread th[] = new Thread[mythreadcnt];
for(int i=0;i<mythreadcnt;i++)
{
AlertScannerRunner runner = new
AlertScannerRunner(frameworkContext,
dbqueue,
appUser);
th[i] = new Thread(runner);
th[i].setName("AlertScannerRunner - "+i);
th[i].start();
}
for(int i=0;i<th.length;i++)try{th[i].join();}catch(Exception ex){}
logger.info("Done alert scanner");
this.frameworkContext.getAutoScanner().getMetricDb().flush();//notify persistent store
}
@Test
public void testOnStoppedFinish() throws Exception {
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
testRunner.assertValid();
MqttMessage innerMessage = new MqttMessage();
innerMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()).array());
innerMessage.setQos(2);
MQTTQueueMessage testMessage = new MQTTQueueMessage("testTopic", innerMessage);
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT);
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
consumeMQTT.processSessionFactory = testRunner.getProcessSessionFactory();
Field f = ConsumeMQTT.class.getDeclaredField("mqttQueue");
f.setAccessible(true);
LinkedBlockingQueue<MQTTQueueMessage> queue = (LinkedBlockingQueue<MQTTQueueMessage>) f.get(consumeMQTT);
queue.add(testMessage);
consumeMQTT.onUnscheduled(testRunner.getProcessContext());
consumeMQTT.onStopped(testRunner.getProcessContext());
testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
assertProvenanceEvents(1);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
@Test
public void testOnStoppedFinish() throws Exception {
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
testRunner.assertValid();
MqttMessage innerMessage = new MqttMessage();
innerMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()).array());
innerMessage.setQos(2);
MQTTQueueMessage testMessage = new MQTTQueueMessage("testTopic", innerMessage);
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
consumeMQTT.processSessionFactory = testRunner.getProcessSessionFactory();
Field f = ConsumeMQTT.class.getDeclaredField("mqttQueue");
f.setAccessible(true);
LinkedBlockingQueue<MQTTQueueMessage> queue = (LinkedBlockingQueue<MQTTQueueMessage>) f.get(consumeMQTT);
queue.add(testMessage);
consumeMQTT.onUnscheduled(testRunner.getProcessContext());
consumeMQTT.onStopped(testRunner.getProcessContext());
testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
assertProvenanceEvents(1);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
/**
* It is necessary to only have one outstanding conditional append per writerId to make sure the status can be resolved in the event of a reconnect.
*/
@Test(timeout = 10000)
public void testOnlyOneWriteAtATime() throws ConnectionFailedException, SegmentSealedException {
MockConnectionFactoryImpl connectionFactory = new MockConnectionFactoryImpl();
MockController controller = new MockController("localhost", 0, connectionFactory, true);
ConditionalOutputStreamFactory factory = new ConditionalOutputStreamFactoryImpl(controller, connectionFactory);
Segment segment = new Segment("scope", "testWrite", 1);
ConditionalOutputStream cOut = factory.createConditionalOutputStream(segment,
DelegationTokenProviderFactory.create("token", controller, segment), EventWriterConfig.builder().build());
ByteBuffer data = ByteBuffer.allocate(10);
ClientConnection mock = Mockito.mock(ClientConnection.class);
PravegaNodeUri location = new PravegaNodeUri("localhost", 0);
connectionFactory.provideConnection(location, mock);
setupAppend(connectionFactory, segment, mock, location);
LinkedBlockingQueue<Boolean> replies = new LinkedBlockingQueue<>();
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
ConditionalAppend argument = (ConditionalAppend) invocation.getArgument(0);
ReplyProcessor processor = connectionFactory.getProcessor(location);
if (replies.take()) {
processor.process(new WireCommands.DataAppended(argument.getRequestId(), argument.getWriterId(), argument.getEventNumber(), 0, -1));
} else {
processor.process(new WireCommands.ConditionalCheckFailed(argument.getWriterId(), argument.getEventNumber(),
argument.getRequestId()));
}
return null;
}
}).when(mock).sendAsync(any(ConditionalAppend.class), any(ClientConnection.CompletedCallback.class));
replies.add(true);
replies.add(false);
assertTrue(cOut.write(data, 0));
assertFalse(cOut.write(data, 1));
AssertExtensions.assertBlocks(() -> {
assertTrue(cOut.write(data, 2));
}, () -> {
replies.add(true);
});
AssertExtensions.assertBlocks(() -> {
assertFalse(cOut.write(data, 3));
}, () -> {
replies.add(false);
});
AssertExtensions.assertBlocks(() -> {
assertTrue(cOut.write(data, 4));
}, () -> {
AssertExtensions.assertBlocks(() -> {
assertFalse(cOut.write(data, 5));
}, () -> {
replies.add(true);
replies.add(false);
});
});
AssertExtensions.assertBlocks(() -> {
assertFalse(cOut.write(data, 6));
}, () -> {
AssertExtensions.assertBlocks(() -> {
assertTrue(cOut.write(data, 7));
}, () -> {
replies.add(false);
replies.add(true);
});
});
}