下面列出了java.util.concurrent.BlockingQueue#clear ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testClear() {
final int cap = 100;
final BlockingQueue<Integer> dbq = new PushPullBlockingQueue<Integer>(cap);
for(int i=0; i<cap; i++) {
dbq.offer(Integer.valueOf(i));
}
Set<Integer> si = new HashSet(cap);
for(int i=0; i<cap/10; i++) {
si.add(Integer.valueOf(i));
}
Assert.assertTrue(dbq.containsAll(si));
dbq.clear();
Assert.assertFalse(dbq.containsAll(si));
Assert.assertEquals(0, dbq.size());
Assert.assertTrue(dbq.isEmpty());
Assert.assertNull(dbq.poll());
}
/**
* Test that you can task will only output aritcles after a certain published time.
* @throws Exception Exception
*/
@Test(enabled = false)
public void testNonPerpetualTimeFramedPull() throws Exception {
com.healthmarketscience.common.util.resource.Handler.init();
BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>();
DateTime publishedSince = new DateTime().withYear(2014).withDayOfMonth(5).withMonthOfYear(9).withZone(DateTimeZone.UTC);
RssStreamProviderTask task = new RssStreamProviderTask(queue, "fake url", publishedSince, 10000, false);
Set<String> batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml"));
assertEquals( queue.size(), 15);
assertEquals( batch.size(), 20);
assertTrue( queue.size() < batch.size());
task.PREVIOUSLY_SEEN.put("fake url", batch);
//Test that it will out previously seen articles
queue.clear();
batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml"));
assertEquals( queue.size(), 15);
assertEquals( batch.size(), 20);
assertTrue( queue.size() < batch.size());
}
@Test
public void testAuditLoggingToSyslog() throws Exception {
final BlockingQueue<SyslogServerEventIF> queue = BlockedSyslogServerEventHandler.getQueue();
queue.clear();
SyslogServerEventIF syslogEvent = null;
try {
setAuditlogEnabled(true);
// enabling audit-log is auditable event
syslogEvent = queue.poll(1 * ADJUSTED_SECOND, TimeUnit.MILLISECONDS);
// but we don't expect a message in TCP syslog server
Assert.assertNull("No message was expected in the syslog, because TCP syslog server is used", syslogEvent);
} finally {
setAuditlogEnabled(false);
}
for (Long property : properties) {
CoreUtils.applyUpdate(
Util.createRemoveOperation(PathAddress.pathAddress().append(SYSTEM_PROPERTY, Long.toString(property))),
managementClient.getControllerClient());
}
properties.clear();
}
@Test
public void testContainsAll() {
final int cap = 100;
final BlockingQueue<Integer> dbq = new MPMCBlockingQueue<>(cap);
for(int i=0; i<cap; i++) {
dbq.offer(Integer.valueOf(i));
}
Set<Integer> si = new HashSet(10);
for(int i=0; i<cap/10; i++) {
si.add(Integer.valueOf(i));
}
Assert.assertTrue(dbq.containsAll(si));
si.add(Integer.valueOf(-1));
Assert.assertFalse(dbq.containsAll(si));
si.remove(-1);
dbq.clear();
Assert.assertFalse(dbq.containsAll(si));
}
/**
* Test that perpetual streams will not output previously seen articles.
* @throws Exception Exception
*/
@Test(enabled = false)
public void testPerpetualNoTimeFramePull() throws Exception {
com.healthmarketscience.common.util.resource.Handler.init();
BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>();
RssStreamProviderTask task = new RssStreamProviderTask(queue, "fake url", new DateTime().minusYears(10), 10000, true);
Set<String> batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml"));
assertEquals(batch.size(), queue.size(), "Expected batch size to be the same as amount of queued datums");
task.PREVIOUSLY_SEEN.put("fake url", batch);
//Test that it will not out previously seen articles
queue.clear();
batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml"));
assertEquals( queue.size(), 0 );
assertEquals( batch.size(), 20 );
task.PREVIOUSLY_SEEN.put("fake url", batch);
//Test that not seen urls aren't blocked.
queue.clear();
batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist2.xml"));
assertEquals(batch.size(), queue.size());
assertEquals( queue.size(), 25);
assertEquals( batch.size(), 25);
}
@Test
public void testClear() {
final int cap = 100;
final BlockingQueue<Integer> dbq = new DisruptorBlockingQueue<Integer>(cap);
for(int i=0; i<cap; i++) {
dbq.offer(Integer.valueOf(i));
}
Set<Integer> si = new HashSet(cap);
for(int i=0; i<cap/10; i++) {
si.add(Integer.valueOf(i));
}
Assert.assertTrue(dbq.containsAll(si));
dbq.clear();
Assert.assertFalse(dbq.containsAll(si));
Assert.assertEquals(0, dbq.size());
Assert.assertTrue(dbq.isEmpty());
Assert.assertNull(dbq.poll());
}
@Test
public void testContainsAll() {
final int cap = 100;
final BlockingQueue<Integer> dbq = new PushPullBlockingQueue<Integer>(cap);
for(int i=0; i<cap; i++) {
dbq.offer(Integer.valueOf(i));
}
Set<Integer> si = new HashSet(10);
for(int i=0; i<cap/10; i++) {
si.add(Integer.valueOf(i));
}
Assert.assertTrue(dbq.containsAll(si));
si.add(Integer.valueOf(-1));
Assert.assertFalse(dbq.containsAll(si));
si.remove(-1);
dbq.clear();
Assert.assertFalse(dbq.containsAll(si));
}
@Test
public void testCleanExpiredRequestInQueue() throws Exception {
BrokerFastFailure brokerFastFailure = new BrokerFastFailure(null);
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
brokerFastFailure.cleanExpiredRequestInQueue(queue, 1);
assertThat(queue.size()).isZero();
//Normal Runnable
Runnable runnable = new Runnable() {
@Override
public void run() {
}
};
queue.add(runnable);
assertThat(queue.size()).isEqualTo(1);
brokerFastFailure.cleanExpiredRequestInQueue(queue, 1);
assertThat(queue.size()).isEqualTo(1);
queue.clear();
//With expired request
RequestTask expiredRequest = new RequestTask(runnable, null, null);
queue.add(new FutureTaskExt<>(expiredRequest, null));
TimeUnit.MILLISECONDS.sleep(100);
RequestTask requestTask = new RequestTask(runnable, null, null);
queue.add(new FutureTaskExt<>(requestTask, null));
assertThat(queue.size()).isEqualTo(2);
brokerFastFailure.cleanExpiredRequestInQueue(queue, 100);
assertThat(queue.size()).isEqualTo(1);
assertThat(((FutureTaskExt) queue.peek()).getRunnable()).isEqualTo(requestTask);
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
listingLock.lock();
try {
final BlockingQueue<FileInfo> fileQueue = fileQueueRef.get();
if (fileQueue != null) {
fileQueue.clear();
}
fileQueueRef.set(null); // create new queue on next listing, in case queue type needs to change
} finally {
listingLock.unlock();
}
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
listingLock.lock();
try {
final BlockingQueue<FileInfo> fileQueue = fileQueueRef.get();
if (fileQueue != null) {
fileQueue.clear();
}
fileQueueRef.set(null); // create new queue on next listing, in case queue type needs to change
} finally {
listingLock.unlock();
}
}
/**
* JMX连接
* @param url
* JMX连接地址
* @param jmxUser
* JMX授权用户 null为无授权用户
* @param jmxPassword
* JMX授权密码 null为无授权密码
* @param timeout
* 超时时间
* @param unit
* 超时单位
* @return
* @throws IOException
*/
public static JMXConnector connectWithTimeout( final JMXServiceURL url,String jmxUser,String jmxPassword, long timeout, TimeUnit unit) throws Exception {
final BlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(1);
ExecuteThreadUtil.execute(() -> {
try {
JMXConnector connector;
if(jmxUser != null && jmxPassword != null){
Map<String,Object> env = new HashMap<>();
String[] credentials = new String[] { jmxUser, jmxPassword };
env.put(JMXConnector.CREDENTIALS, credentials);
connector = JMXConnectorFactory.connect(url,env);
}else{
connector = JMXConnectorFactory.connect(url,null);
}
if (!blockingQueue.offer(connector))
connector.close();
} catch (Throwable t) {
blockingQueue.offer(t);
}
});
Object result = BlockingQueueUtil.getResult(blockingQueue,timeout,unit);
blockingQueue.clear();
if (result instanceof JMXConnector){
return (JMXConnector) result;
}else if (result == null){
throw new SocketTimeoutException("Connect timed out: " + url);
}else if(result instanceof Throwable){
throw new IOException("JMX Connect Failed : " + url,((Throwable) result));
}
return null;
}
/**
* JMX连接
* @param url
* JMX连接地址
* @param jmxUser
* JMX授权用户 null为无授权用户
* @param jmxPassword
* JMX授权密码 null为无授权密码
* @param timeout
* 超时时间
* @param unit
* 超时单位
* @return
* @throws IOException
*/
public static JMXConnector connectWithTimeout( final JMXServiceURL url,String jmxUser,String jmxPassword, long timeout, TimeUnit unit) throws Exception {
final BlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(1);
ExecuteThreadUtil.execute(() -> {
try {
JMXConnector connector;
if(jmxUser != null && jmxPassword != null){
Map<String,Object> env = new HashMap<>();
String[] credentials = new String[] { jmxUser, jmxPassword };
env.put(JMXConnector.CREDENTIALS, credentials);
connector = JMXConnectorFactory.connect(url,env);
}else{
connector = JMXConnectorFactory.connect(url,null);
}
if (!blockingQueue.offer(connector))
connector.close();
} catch (Throwable t) {
blockingQueue.offer(t);
}
});
Object result = BlockingQueueUtil.getResult(blockingQueue,timeout,unit);
blockingQueue.clear();
if (result instanceof JMXConnector){
return (JMXConnector) result;
}else if (result == null){
throw new SocketTimeoutException("Connect timed out: " + url);
}else if(result instanceof Throwable){
throw new IOException("JMX Connect Failed : " + url,((Throwable) result));
}
return null;
}
private void cancelProcessing(BlockingQueue<FileSystemAccessRequest> requestQueue, Queue<ParallelBuildContext> afterGenerateQueue, ListenableFuture<?> generatorResult) {
// make sure waiting put on the queue are processed by freeing space in the queue
requestQueue.clear();
// stop processing of resources immediately
generatorResult.cancel(true);
for (ParallelBuildContext context : afterGenerateQueue) {
try {
getGenerator2().afterGenerate(context.resource, context.synchronousFileSystemAccess, context.getGeneratorContext());
} catch (Exception e) {
logger.error("Error running afterGenerate hook", e);
}
}
}
@Test
public void shouldReadNewEventsOnly() throws InterruptedException {
BlockingQueue<PublishedEvent> publishedEvents = new LinkedBlockingDeque<>();
CdcProcessor<PublishedEvent> cdcProcessor = createCdcProcessor();
cdcProcessor.start(publishedEvent -> {
publishedEvents.add(publishedEvent);
onEventSent(publishedEvent);
});
String accountCreatedEventData = generateAccountCreatedEvent();
EntityIdVersionAndEventIds entityIdVersionAndEventIds = saveEvent(localAggregateCrud, accountCreatedEventData);
waitForEvent(publishedEvents, entityIdVersionAndEventIds.getEntityVersion(), LocalDateTime.now().plusSeconds(10), accountCreatedEventData);
cdcProcessor.stop();
Thread.sleep(10000);
publishedEvents.clear();
cdcProcessor.start(publishedEvent -> {
publishedEvents.add(publishedEvent);
onEventSent(publishedEvent);
});
List<String> excludedIds = entityIdVersionAndEventIds.getEventIds().stream().map(Int128::asString).collect(Collectors.toList());
accountCreatedEventData = generateAccountCreatedEvent();
entityIdVersionAndEventIds = saveEvent(localAggregateCrud, accountCreatedEventData);
waitForEventExcluding(publishedEvents, entityIdVersionAndEventIds.getEntityVersion(), LocalDateTime.now().plusSeconds(10), accountCreatedEventData, excludedIds);
cdcProcessor.stop();
}
@Test
public void testCleanExpiredRequestInQueue() throws Exception {
BrokerFastFailure brokerFastFailure = new BrokerFastFailure(null);
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
brokerFastFailure.cleanExpiredRequestInQueue(queue, 1);
assertThat(queue.size()).isZero();
//Normal Runnable
Runnable runnable = new Runnable() {
@Override
public void run() {
}
};
queue.add(runnable);
assertThat(queue.size()).isEqualTo(1);
brokerFastFailure.cleanExpiredRequestInQueue(queue, 1);
assertThat(queue.size()).isEqualTo(1);
queue.clear();
//With expired request
RequestTask expiredRequest = new RequestTask(runnable, null, null);
queue.add(new FutureTaskExt<>(expiredRequest, null));
TimeUnit.MILLISECONDS.sleep(100);
RequestTask requestTask = new RequestTask(runnable, null, null);
queue.add(new FutureTaskExt<>(requestTask, null));
assertThat(queue.size()).isEqualTo(2);
brokerFastFailure.cleanExpiredRequestInQueue(queue, 100);
assertThat(queue.size()).isEqualTo(1);
assertThat(((FutureTaskExt) queue.peek()).getRunnable()).isEqualTo(requestTask);
}
@Test
public void growArray() throws Exception {
BlockingQueue<Integer> queue = new GrowableArrayBlockingQueue<>(4);
assertNull(queue.poll());
assertTrue(queue.offer(1));
assertTrue(queue.offer(2));
assertTrue(queue.offer(3));
assertTrue(queue.offer(4));
assertTrue(queue.offer(5));
assertEquals(queue.size(), 5);
queue.clear();
assertEquals(queue.size(), 0);
assertTrue(queue.offer(1, 1, TimeUnit.SECONDS));
assertTrue(queue.offer(2, 1, TimeUnit.SECONDS));
assertTrue(queue.offer(3, 1, TimeUnit.SECONDS));
assertEquals(queue.size(), 3);
List<Integer> list = new ArrayList<>();
queue.drainTo(list);
assertEquals(queue.size(), 0);
assertEquals(list, Lists.newArrayList(1, 2, 3));
}
protected void clearAllTempQueueForShadowPR(final int bucketId) {
List<PartitionedRegion> colocatedWithList = ColocationHelper
.getColocatedChildRegions(partitionedRegion);
for (PartitionedRegion childRegion : colocatedWithList) {
if (childRegion.isShadowPR()) {
ParallelGatewaySenderImpl sender = childRegion
.getParallelGatewaySender();
if (sender == null) {
return;
}
AbstractGatewaySenderEventProcessor eventProcessor = sender
.getEventProcessor();
if (eventProcessor == null) {
return;
}
ConcurrentParallelGatewaySenderQueue queue = (ConcurrentParallelGatewaySenderQueue)eventProcessor
.getQueue();
if (queue == null) {
return;
}
BlockingQueue<GatewaySenderEventImpl> tempQueue = queue
.getBucketTmpQueue(bucketId);
if (tempQueue != null) {
synchronized (tempQueue) {
for (GatewaySenderEventImpl event : tempQueue) {
event.release();
}
tempQueue.clear();
}
}
}
}
}
/**
* Tests that messages on all levels are logged, when level="TRACE" in syslog handler.
*/
@Test
public void testAllLevelLogs() throws Exception {
final BlockingQueue<SyslogServerEventIF> queue = BlockedSyslogServerEventHandler.getQueue();
executeOperation(Operations.createWriteAttributeOperation(SYSLOG_HANDLER_ADDR, "level", "TRACE"));
queue.clear();
makeLogs();
for (Level level : LoggingServiceActivator.LOG_LEVELS) {
testLog(queue, level);
}
Assert.assertTrue("No other message was expected in syslog.", queue.isEmpty());
}
/**
* 获取容器信息
* @param containerId
* @return
*/
public static ContainerInfo getContainerInfo(String containerId){
String cacheKey = "containerInfoCacheKey" + containerId;
final ContainerInfo[] containerInfo = {(ContainerInfo) getCache(cacheKey)};
if (containerInfo[0] != null){
return containerInfo[0];
}else {
synchronized (containerId.intern()) {
try {
int timeOut = 45;
final BlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(1);
//阻塞队列异步执行
ExecuteThreadUtil.execute(() -> {
try {
containerInfo[0] = docker.inspectContainer(containerId);
setCache(cacheKey, containerInfo[0]);
blockingQueue.offer(containerInfo[0]);
} catch (Throwable t) {
blockingQueue.offer(t);
}
});
//超时45秒
Object result = BlockingQueueUtil.getResult(blockingQueue, timeOut, TimeUnit.SECONDS);
blockingQueue.clear();
if (result instanceof ContainerInfo) {
return (ContainerInfo) result;
}else if (result == null) {
log.error("docker 容器Info获取{}秒超时:{}",timeOut,containerId);
return null;
}else if (result instanceof Throwable) {
log.error("docker 容器Info获取异常",result);
return null;
}else {
log.error("未知结果类型:{}",result);
return null;
}
} catch (Exception e) {
log.error("",e);
return null;
}
}
}
}
/**
* 获取容器列表
* @param containersParam
* @return
*/
public static List<Container> getContainers(DockerClient.ListContainersParam containersParam) {
String cacheKey = "getContainer" + containersParam.value();
final List<Container> containers = (List<Container>) CacheByTimeUtil.getCache(cacheKey);
if (containers != null) {
return containers;
}
try {
int timeOut = 45;
final BlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(1);
//阻塞队列异步执行
ExecuteThreadUtil.execute(() -> {
try {
List<Container> containerList = docker.listContainers(containersParam);
setCache(cacheKey, containerList);
blockingQueue.offer(containerList);
} catch (Throwable t) {
blockingQueue.offer(t);
}
});
//超时
Object result = BlockingQueueUtil.getResult(blockingQueue, timeOut, TimeUnit.SECONDS);
blockingQueue.clear();
if (result instanceof List) {
return (List<Container>) result;
}else if (result == null) {
log.error("docker 容器 List 获取{}秒超",timeOut);
return new ArrayList<>();
}else if (result instanceof Throwable) {
log.error("docker 容器 List 获取异常",result);
return new ArrayList<>();
}else {
log.error("未知结果类型:{}",result);
return new ArrayList<>();
}
} catch (Exception e) {
log.error("",e);
return new ArrayList<>();
}
}