下面列出了com.google.common.collect.EvictingQueue#add ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void fail(T object) {
logger.info("Server {}:{} failed.", ((ThriftServer)object).getHost(),((ThriftServer)object).getPort());
boolean addToFail = false;
try {
EvictingQueue<Long> evictingQueue = failCountMap.get(object);
synchronized (evictingQueue) {
evictingQueue.add(System.currentTimeMillis());
if (evictingQueue.remainingCapacity() == 0 && evictingQueue.element() >= (System.currentTimeMillis() - failDuration)) {
addToFail = true;
}
}
} catch (ExecutionException e) {
logger.error("Ops.", e);
}
if (addToFail) {
failedList.put(object, Boolean.TRUE);
logger.info("Server {}:{} failed. Add to fail list.", ((ThriftServer)object).getHost(), ((ThriftServer)object).getPort());
}
}
/**
* <p>
* fail.
* </p>
*
* @param object a T object.
*/
public void fail(T object) {
logger.trace("server {} failed.", object);
boolean addToFail = false;
try {
EvictingQueue<Long> evictingQueue = failCountMap.get(object);
synchronized (evictingQueue) {
evictingQueue.add(System.currentTimeMillis());
if (evictingQueue.remainingCapacity() == 0
&& evictingQueue.element() >= System.currentTimeMillis() - failDuration) {
addToFail = true;
}
}
} catch (ExecutionException e) {
logger.error("Ops.", e);
}
if (addToFail) {
failedList.put(object, TRUE);
logger.trace("server {} failed. add to fail list.", object);
}
}
@Override
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
InternalHistogram histo = (InternalHistogram) aggregation;
List<? extends InternalHistogram.Bucket> buckets = histo.getBuckets();
InternalHistogram.Factory<? extends InternalHistogram.Bucket> factory = histo.getFactory();
List newBuckets = new ArrayList<>();
EvictingQueue<Double> lagWindow = EvictingQueue.create(lag);
int counter = 0;
for (InternalHistogram.Bucket bucket : buckets) {
Double thisBucketValue = resolveBucketValue(histo, bucket, bucketsPaths()[0], gapPolicy);
InternalHistogram.Bucket newBucket = bucket;
counter += 1;
// Still under the initial lag period, add nothing and move on
Double lagValue;
if (counter <= lag) {
lagValue = Double.NaN;
} else {
lagValue = lagWindow.peek(); // Peek here, because we rely on add'ing to always move the window
}
// Normalize null's to NaN
if (thisBucketValue == null) {
thisBucketValue = Double.NaN;
}
// Both have values, calculate diff and replace the "empty" bucket
if (!Double.isNaN(thisBucketValue) && !Double.isNaN(lagValue)) {
double diff = thisBucketValue - lagValue;
List<InternalAggregation> aggs = new ArrayList<>(eagerTransform(bucket.getAggregations().asList(), AGGREGATION_TRANFORM_FUNCTION));
aggs.add(new InternalSimpleValue(name(), diff, formatter, new ArrayList<PipelineAggregator>(), metaData()));
newBucket = factory.createBucket(bucket.getKey(), bucket.getDocCount(), new InternalAggregations(
aggs), bucket.getKeyed(), bucket.getFormatter());
}
newBuckets.add(newBucket);
lagWindow.add(thisBucketValue);
}
return factory.create(newBuckets, histo);
}