下面列出了java.util.concurrent.BlockingQueue#contains ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private static ScheduledEventHandle wakeUpAt(long timestamp, final BlockingQueue<ScheduledEvent> events) {
final ScheduledEvent event = new ScheduledEvent(timestamp);
if(!events.offer(event)) {
throw new IllegalStateException("Event queue at capacity");
}
return new ScheduledEventHandle() {
@Override
public boolean isPending() {
return events.contains(event);
}
@Override
public boolean cancel() {
return events.remove(event);
}
@Override
public boolean isReferencedEvent(RuleEvent other) {
return other == event;
}
};
}
@POST
@Path("/lock/{address}")
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "[锁账户] 清除缓存的锁定账户", notes = "Clear the cache unlock account.")
public RpcClientResult lock(@ApiParam(name = "address", value = "账户地址", required = true) @PathParam("address") String address) {
Account account = accountService.getAccount(address).getData();
if (null == account) {
return Result.getFailed(AccountErrorCode.ACCOUNT_NOT_EXIST).toRpcClientResult();
}
accountCacheService.removeAccount(account.getAddress());
BlockingQueue<Runnable> queue = scheduler.getQueue();
String addr = account.getAddress().toString();
Runnable scheduledFuture = (Runnable) accountUnlockSchedulerMap.get(addr);
if (queue.contains(scheduledFuture)) {
scheduler.remove(scheduledFuture);
accountUnlockSchedulerMap.remove(addr);
}
Map<String, Boolean> map = new HashMap<>();
map.put("value", true);
return Result.getSuccess().setData(map).toRpcClientResult();
}
private void putBack(SparkServer sparkServer, BlockingQueue<SparkServer> servers) {
if (!servers.contains(sparkServer)) {
LOGGER.info("Spark server put back. Pool size: {}", servers.size());
LOGGER.info("PUT state: {}", sparkServer.getEndpoint());
logServers();
long start = System.currentTimeMillis();
sparkServer.stop();
sparkServer.awaitStop();
LOGGER.info("spark server has been cleared in {}ms.", System.currentTimeMillis() - start);
try {
servers.add(sparkServer);
} catch (Exception e) {
LOGGER.error("Can't add spark server", e);
throw new TestFailException("Can't add spark server");
}
}
}
private void fetchListing(final ProcessContext context, final ProcessSession session, final FileTransfer transfer) throws IOException {
BlockingQueue<FileInfo> queue = fileQueueRef.get();
if (queue == null) {
final boolean useNaturalOrdering = context.getProperty(FileTransfer.USE_NATURAL_ORDERING).asBoolean();
queue = useNaturalOrdering ? new PriorityBlockingQueue<FileInfo>(25000) : new LinkedBlockingQueue<FileInfo>(25000);
fileQueueRef.set(queue);
}
final StopWatch stopWatch = new StopWatch(true);
final List<FileInfo> listing = transfer.getListing();
final long millis = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
int newItems = 0;
mutuallyExclusiveTransferLock.lock();
try {
for (final FileInfo file : listing) {
if (!queue.contains(file) && !processing.contains(file)) {
if (!queue.offer(file)) {
break;
}
newItems++;
}
}
} finally {
mutuallyExclusiveTransferLock.unlock();
}
getLogger().info("Obtained file listing in {} milliseconds; listing had {} items, {} of which were new",
new Object[]{millis, listing.size(), newItems});
}
private void fetchListing(final ProcessContext context, final ProcessSession session, final FileTransfer transfer) throws IOException {
BlockingQueue<FileInfo> queue = fileQueueRef.get();
if (queue == null) {
final boolean useNaturalOrdering = context.getProperty(FileTransfer.USE_NATURAL_ORDERING).asBoolean();
queue = useNaturalOrdering ? new PriorityBlockingQueue<FileInfo>(25000) : new LinkedBlockingQueue<FileInfo>(25000);
fileQueueRef.set(queue);
}
final StopWatch stopWatch = new StopWatch(true);
final List<FileInfo> listing = transfer.getListing();
final long millis = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
int newItems = 0;
mutuallyExclusiveTransferLock.lock();
try {
for (final FileInfo file : listing) {
if (!queue.contains(file) && !processing.contains(file)) {
if (!queue.offer(file)) {
break;
}
newItems++;
}
}
} finally {
mutuallyExclusiveTransferLock.unlock();
}
getLogger().info("Obtained file listing in {} milliseconds; listing had {} items, {} of which were new",
new Object[]{millis, listing.size(), newItems});
}
@POST
@Path("/unlock/{address}")
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "[解锁] 解锁账户")
public RpcClientResult unlock(@ApiParam(name = "address", value = "账户地址", required = true)
@PathParam("address") String address,
@ApiParam(name = "form", value = "解锁表单数据", required = true)
AccountUnlockForm form) {
Account account = accountService.getAccount(address).getData();
if (null == account) {
return Result.getFailed(AccountErrorCode.ACCOUNT_NOT_EXIST).toRpcClientResult();
}
String addr = account.getAddress().toString();
//如果存在定时加锁任务, 先删除之前的任务
if (accountUnlockSchedulerMap.containsKey(addr)) {
BlockingQueue<Runnable> queue = scheduler.getQueue();
Runnable sf = (Runnable) accountUnlockSchedulerMap.get(addr);
if (queue.contains(sf)) {
scheduler.remove(sf);
accountUnlockSchedulerMap.remove(addr);
}
}
String password = form.getPassword();
Integer unlockTime = form.getUnlockTime();
try {
account.unlock(password);
accountCacheService.putAccount(account);
if (null == unlockTime || unlockTime > AccountConstant.ACCOUNT_MAX_UNLOCK_TIME) {
unlockTime = AccountConstant.ACCOUNT_MAX_UNLOCK_TIME;
}
if (unlockTime < 0) {
unlockTime = 0;
}
// 一定时间后自动锁定
ScheduledFuture scheduledFuture = scheduler.schedule(() -> {
accountCacheService.removeAccount(account.getAddress());
}, unlockTime, TimeUnit.SECONDS);
accountUnlockSchedulerMap.put(addr, scheduledFuture);
} catch (NulsException e) {
return Result.getFailed(AccountErrorCode.PASSWORD_IS_WRONG).toRpcClientResult();
}
Map<String, Boolean> map = new HashMap<>();
map.put("value", true);
return Result.getSuccess().setData(map).toRpcClientResult();
}