下面列出了java.util.concurrent.ConcurrentLinkedDeque#size ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private DedupValue updateCount(EventUniq eventEniq) {
ConcurrentLinkedDeque<DedupValue> dedupValues = events.get(eventEniq);
if (dedupValues == null || dedupValues.size() <= 0) {
LOG.warn("{} No dedup values found for {}, cannot update count", this.publishName, eventEniq);
return null;
} else {
DedupValue dedupValue = dedupValues.getLast();
dedupValue.setCount(dedupValue.getCount() + 1);
String updateMsg = String.format(
"%s Update count for dedup key %s, value %s and count %s", this.publishName, eventEniq,
dedupValue.getStateFieldValue(), dedupValue.getCount());
if (LOG.isDebugEnabled()) {
LOG.debug(updateMsg);
}
return dedupValue;
}
}
/**
* This ensures custom span handlers can see the actual exception thrown, not just the "error"
* tag value.
*/
void spanHandlerSeesError(Callable<Void> get) throws IOException {
ConcurrentLinkedDeque<Throwable> caughtThrowables = new ConcurrentLinkedDeque<>();
closeClient(client);
httpTracing = HttpTracing.create(tracingBuilder(Sampler.ALWAYS_SAMPLE)
.clearSpanHandlers()
.addSpanHandler(new SpanHandler() {
@Override public boolean end(TraceContext context, MutableSpan span, Cause cause) {
Throwable error = span.error();
if (error != null) {
caughtThrowables.add(error);
} else {
caughtThrowables.add(new RuntimeException("Unexpected additional call to end"));
}
return true;
}
})
// The blocking span handler goes after the error catcher, so we can assert on the errors.
.addSpanHandler(testSpanHandler)
.build());
client = newClient(server.getPort());
// If this passes, a span was reported with an error
checkReportsSpanOnTransportException(get);
assertThat(caughtThrowables)
.withFailMessage("Span finished with error, but caughtThrowables empty")
.isNotEmpty();
if (caughtThrowables.size() > 1) {
for (Throwable throwable : caughtThrowables) {
Logger.getAnonymousLogger().log(Level.SEVERE, "multiple calls to finish", throwable);
}
assertThat(caughtThrowables).hasSize(1);
}
}
void spanHandlerSeesError(String path) throws IOException {
ConcurrentLinkedDeque<Throwable> caughtThrowables = new ConcurrentLinkedDeque<>();
httpTracing = HttpTracing.create(tracingBuilder(Sampler.ALWAYS_SAMPLE)
.clearSpanHandlers()
.addSpanHandler(new SpanHandler() {
@Override public boolean end(TraceContext context, MutableSpan span, Cause cause) {
Throwable error = span.error();
if (error != null) {
caughtThrowables.add(error);
} else {
caughtThrowables.add(new RuntimeException("Unexpected additional call to end"));
}
return true;
}
})
// The blocking span handler goes after the error catcher, so we can assert on the errors.
.addSpanHandler(testSpanHandler)
.build());
init();
// If this passes, a span was reported with an error
httpStatusCodeTagMatchesResponse_onUncaughtException(path, ".*not ready");
assertThat(caughtThrowables)
.withFailMessage("Span finished with error, but caughtThrowables empty")
.isNotEmpty();
if (caughtThrowables.size() > 1) {
for (Throwable throwable : caughtThrowables) {
Logger.getAnonymousLogger().log(Level.SEVERE, "multiple calls to finish", throwable);
}
assertThat(caughtThrowables).hasSize(1);
}
}
private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
if (!offloadMutex.tryLock()) {
scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
100, TimeUnit.MILLISECONDS);
} else {
CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
unlockingPromise.whenComplete((res, ex) -> {
offloadMutex.unlock();
if (ex != null) {
finalPromise.completeExceptionally(ex);
} else {
finalPromise.complete(res);
}
});
if (config.getLedgerOffloader() != null && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
&& config.getLedgerOffloader().getOffloadPolicies() != null) {
long threshold = config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes();
long sizeSummed = 0;
long alreadyOffloadedSize = 0;
long toOffloadSize = 0;
ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
// go through ledger list from newest to oldest and build a list to offload in oldest to newest order
for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
long size = e.getValue().getSize();
sizeSummed += size;
boolean alreadyOffloaded = e.getValue().hasOffloadContext()
&& e.getValue().getOffloadContext().getComplete();
if (alreadyOffloaded) {
alreadyOffloadedSize += size;
} else if (sizeSummed > threshold) {
toOffloadSize += size;
toOffload.addFirst(e.getValue());
}
}
if (toOffload.size() > 0) {
log.info("[{}] Going to automatically offload ledgers {}"
+ ", total size = {}, already offloaded = {}, to offload = {}",
name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
sizeSummed, alreadyOffloadedSize, toOffloadSize);
} else {
// offloadLoop will complete immediately with an empty list to offload
log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
name, sizeSummed, alreadyOffloadedSize, threshold);
}
offloadLoop(unlockingPromise, toOffload, PositionImpl.latest, Optional.empty());
}
}
}