下面列出了怎么用org.apache.commons.lang3.mutable.MutableLong的API类实例代码及写法,或者点击链接到github查看源代码。
protected Session newSessionWithBlockableMutableLong(final CountDownLatch opStarted,
final CountDownLatch opBlocker, final CountDownLatch opCompleted) {
// unfortunately, the Maintenance implementation compels us to
// use MySession rather than an interface.
String uuid = nextUuid();
final MutableLong expirationTimeSuggestion = new MutableLong(System.currentTimeMillis()) {
@Override
public long longValue() {
Callable<Long> callback = new Callable<Long>() {
public Long call() throws Exception {
return superLongValue();
}
};
Long result =
execBlockableSessionOp(opStarted, opBlocker, opCompleted, callback);
return result;
}
private long superLongValue() {
return super.longValue();
}
};
final MySession session = new MySession(sessionComponent,uuid,threadLocalManager,idManager,
sessionComponent,sessionListener,sessionComponent.getInactiveInterval(),new MyNonPortableSession(),
expirationTimeSuggestion, null);
return session;
}
@Override
public <STREAM extends WindowedStream<Tuple.WindowedTuple<Long>>> STREAM count(Option... opts)
{
Function.MapFunction<T, Tuple<Long>> kVMap = new Function.MapFunction<T, Tuple<Long>>()
{
@Override
public Tuple<Long> f(T input)
{
if (input instanceof Tuple.TimestampedTuple) {
return new Tuple.TimestampedTuple<>(((Tuple.TimestampedTuple)input).getTimestamp(), 1L);
} else {
return new Tuple.TimestampedTuple<>(System.currentTimeMillis(), 1L);
}
}
};
WindowedStream<Tuple<Long>> innerstream = map(kVMap);
WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createWindowedOperator(new SumLong());
return innerstream.addOperator(windowedOperator, windowedOperator.input, windowedOperator.output, opts);
}
public boolean canPush(Entry entry, Cell firstCellInEdit) throws IOException {
String encodedNameAsString = Bytes.toString(entry.getKey().getEncodedRegionName());
long seqId = entry.getKey().getSequenceId();
Long canReplicateUnderSeqId = canPushUnder.getIfPresent(encodedNameAsString);
if (canReplicateUnderSeqId != null) {
if (seqId < canReplicateUnderSeqId.longValue()) {
LOG.trace("{} is before the end barrier {}, pass", entry, canReplicateUnderSeqId);
return true;
}
LOG.debug("{} is beyond the previous end barrier {}, remove from cache", entry,
canReplicateUnderSeqId);
// we are already beyond the last safe point, remove
canPushUnder.invalidate(encodedNameAsString);
}
// This is for the case where the region is currently opened on us, if the sequence id is
// continuous then we are safe to replicate. If there is a breakpoint, then maybe the region
// has been moved to another RS and then back, so we need to check the barrier.
MutableLong previousPushedSeqId = pushed.getUnchecked(encodedNameAsString);
if (seqId == previousPushedSeqId.longValue() + 1) {
LOG.trace("The sequence id for {} is continuous, pass", entry);
previousPushedSeqId.increment();
return true;
}
return canPush(entry, CellUtil.cloneRow(firstCellInEdit));
}
@Override
public long freeMemory() {
MutableLong totalBytesReleased = new MutableLong(0);
ifNotClosed(() -> {
for (FileBucket bucket : fileBuckets) {
bucket.lockRead();
for (FileInfo fileInfo : bucket.getFiles()) {
long bytesReleased = fileInfo.discardFileContents();
updateSizeOfCachedFileContents(-bytesReleased);
totalBytesReleased.add(bytesReleased);
}
bucket.unlockRead();
}
});
return totalBytesReleased.longValue();
}
protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
throws IOException {
if (this.closed) {
throw new IOException(
"Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
}
MutableLong txidHolder = new MutableLong();
MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {
txidHolder.setValue(ringBuffer.next());
});
long txid = txidHolder.longValue();
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
entry.stampRegionSequenceId(we);
ringBuffer.get(txid).load(entry);
} finally {
ringBuffer.publish(txid);
}
return txid;
}
protected Session newSessionWithBlockableGetLastAccessedTimeImpl(final CountDownLatch opStarted,
final CountDownLatch opBlocker, final CountDownLatch opCompleted) {
// unfortunately, the getActiveUserCount() implementation compels us to
// use MySession rather than an interface.
String uuid = nextUuid();
final MySession session = new MySession(sessionComponent,uuid,threadLocalManager,idManager,
sessionComponent,sessionListener,sessionComponent.getInactiveInterval(),new MyNonPortableSession(),
new MutableLong(System.currentTimeMillis()), null) {
private long superGetLastAccessedTime() {
return super.getLastAccessedTime();
}
@Override
public long getLastAccessedTime()
{
Callable<Long> callback = new Callable<Long>() {
public Long call() throws Exception {
return superGetLastAccessedTime();
}
};
Long result =
execBlockableSessionOp(opStarted, opBlocker, opCompleted, callback);
return result;
}
};
return session;
}
@Test
public void testSimpleRemoveEmpty()
{
WindowEndQueueManager<Query, Void> wqqm = new WindowEndQueueManager<>();
wqqm.setup(null);
wqqm.beginWindow(0);
QueryBundle<Query, Void, MutableLong> qb = wqqm.dequeue();
Query queryD = qb == null ? null : qb.getQuery();
Assert.assertEquals("The queries must match.", null, queryD);
qb = wqqm.dequeue();
queryD = qb == null ? null : qb.getQuery();
Assert.assertEquals("The queries must match.", null, queryD);
wqqm.endWindow();
wqqm.teardown();
}
@Test
public void testSimpleAddOneRemove()
{
WindowEndQueueManager<Query, Void> wqqm = new WindowEndQueueManager<>();
wqqm.setup(null);
wqqm.beginWindow(0);
Query query = new MockQuery("1");
wqqm.enqueue(query, null, new MutableLong(1L));
Query queryD = wqqm.dequeue().getQuery();
QueryBundle<Query, Void, MutableLong> qb = wqqm.dequeue();
Query queryD1 = qb == null ? null : qb.getQuery();
wqqm.endWindow();
wqqm.teardown();
Assert.assertEquals("The queries must match.", query, queryD);
Assert.assertEquals("The queries must match.", null, queryD1);
}
@Override
public void run()
{
int numLoops = totalTuples / batchSize;
for (int loopCounter = 0, tupleCounter = 0; loopCounter < numLoops; loopCounter++, tupleCounter++) {
for (int batchCounter = 0; batchCounter < batchSize; batchCounter++, tupleCounter++) {
queueManager.enqueue(new MockQuery(tupleCounter + ""), null, new MutableLong(1L));
if (rand.nextDouble() < waitMillisProb) {
try {
Thread.sleep(1);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
}
}
private WindowedOperatorImpl<Long, MutableLong, Long> createDefaultWindowedOperator()
{
WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = new WindowedOperatorImpl<>();
if (useSpillable) {
sccImpl = new SpillableComplexComponentImpl(testMeta.timeStore);
// TODO: We don't yet support Spillable data structures for window state storage because SpillableMapImpl does not yet support iterating over all keys.
windowStateStorage = new InMemoryWindowedStorage<>();
SpillableWindowedPlainStorage<MutableLong> pds = new SpillableWindowedPlainStorage<>();
pds.setSpillableComplexComponent(sccImpl);
plainDataStorage = pds;
SpillableWindowedPlainStorage<Long> prs = new SpillableWindowedPlainStorage<>();
prs.setSpillableComplexComponent(sccImpl);
plainRetractionStorage = prs;
windowedOperator.addComponent("SpillableComplexComponent", sccImpl);
} else {
windowStateStorage = new InMemoryWindowedStorage<>();
plainDataStorage = new InMemoryWindowedStorage<>();
plainRetractionStorage = new InMemoryWindowedStorage<>();
}
windowedOperator.setDataStorage(plainDataStorage);
windowedOperator.setRetractionStorage(plainRetractionStorage);
windowedOperator.setWindowStateStorage(windowStateStorage);
windowedOperator.setAccumulation(new SumAccumulation());
return windowedOperator;
}
@Test
public void testValidation() throws Exception
{
WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = new WindowedOperatorImpl<>();
verifyValidationFailure(windowedOperator, "nothing is configured");
windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>());
verifyValidationFailure(windowedOperator, "data storage is not set");
windowedOperator.setDataStorage(new InMemoryWindowedStorage<MutableLong>());
verifyValidationFailure(windowedOperator, "accumulation is not set");
windowedOperator.setAccumulation(new SumAccumulation());
windowedOperator.validate();
windowedOperator.setTriggerOption(new TriggerOption().accumulatingAndRetractingFiredPanes());
verifyValidationFailure(windowedOperator, "retracting storage is not set for ACCUMULATING_AND_RETRACTING");
windowedOperator.setRetractionStorage(new InMemoryWindowedStorage<Long>());
windowedOperator.validate();
windowedOperator.setTriggerOption(new TriggerOption().discardingFiredPanes().firingOnlyUpdatedPanes());
verifyValidationFailure(windowedOperator, "DISCARDING is not valid for option firingOnlyUpdatedPanes");
windowedOperator.setTriggerOption(new TriggerOption().accumulatingFiredPanes().firingOnlyUpdatedPanes());
windowedOperator.setRetractionStorage(null);
verifyValidationFailure(windowedOperator, "retracting storage is not set for option firingOnlyUpdatedPanes");
}
/**
* Compute gauss mask 2.
*
* @param num
* the num
* @param sigma
* the sigma
* @return the double[]
*/
/*
* num ist eigentlich pointer - aufrufende Funkion nimmt an, dass num geändert
* wird. Übergebe es deswegen als MutableDouble aus CommonsLang
*/
public double[] compute_gauss_mask_2(MutableLong num, double sigma) {
int i, n;
double limit;
double[] h;
limit = LinesUtil.MASK_SIZE(LinesUtil.MAX_SIZE_MASK_2, sigma); /* Error < 0.001 on each side */
n = (int) limit;
h = new double[2 * n + 1];
for (i = -n + 1; i <= n - 1; i++)
h[n + i] = phi2(-i + 0.5, sigma) - phi2(-i - 0.5, sigma);
h[0] = -phi2(n - 0.5, sigma);
h[2 * n] = phi2(-n + 0.5, sigma);
num.setValue(n);
return h;
}
@Test
public void testSlidingWindowAssignment()
{
WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
windowedOperator.setWindowOption(new WindowOption.SlidingTimeWindows(Duration.millis(1000), Duration.millis(200)));
windowedOperator.setup(testMeta.operatorContext);
Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(BASE + 1600L, 2L));
Collection<? extends Window> windows = windowedValue.getWindows();
Window[] winArray = windows.toArray(new Window[]{});
Assert.assertEquals(5, winArray.length);
Assert.assertEquals(BASE + 800, winArray[0].getBeginTimestamp());
Assert.assertEquals(1000, winArray[0].getDurationMillis());
Assert.assertEquals(BASE + 1000, winArray[1].getBeginTimestamp());
Assert.assertEquals(1000, winArray[1].getDurationMillis());
Assert.assertEquals(BASE + 1200, winArray[2].getBeginTimestamp());
Assert.assertEquals(1000, winArray[2].getDurationMillis());
Assert.assertEquals(BASE + 1400, winArray[3].getBeginTimestamp());
Assert.assertEquals(1000, winArray[3].getDurationMillis());
Assert.assertEquals(BASE + 1600, winArray[4].getBeginTimestamp());
Assert.assertEquals(1000, winArray[4].getDurationMillis());
windowedOperator.teardown();
}
@Override
public void populateDAG(DAG dag, Configuration configuration)
{
WordGenerator inputOperator = new WordGenerator();
KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = new KeyedWindowedOperatorImpl<>();
Accumulation<Long, MutableLong, Long> sum = new SumAccumulation();
windowedOperator.setAccumulation(sum);
windowedOperator.setDataStorage(new InMemoryWindowedKeyedStorage<String, MutableLong>());
windowedOperator.setRetractionStorage(new InMemoryWindowedKeyedStorage<String, Long>());
windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>());
windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.standardMinutes(1)));
windowedOperator.setTriggerOption(TriggerOption.AtWatermark().withEarlyFiringsAtEvery(Duration.millis(1000)).accumulatingAndRetractingFiredPanes());
//windowedOperator.setAllowedLateness(Duration.millis(14000));
ConsoleOutputOperator outputOperator = new ConsoleOutputOperator();
dag.addOperator("inputOperator", inputOperator);
dag.addOperator("windowedOperator", windowedOperator);
dag.addOperator("outputOperator", outputOperator);
dag.addStream("input_windowed", inputOperator.output, windowedOperator.input);
dag.addStream("windowed_output", windowedOperator.output, outputOperator.input);
}
@Override
public void populateDAG(DAG dag, Configuration configuration)
{
RandomNumberPairGenerator inputOperator = new RandomNumberPairGenerator();
WindowedOperatorImpl<MutablePair<Double, Double>, MutablePair<MutableLong, MutableLong>, Double> windowedOperator = new WindowedOperatorImpl<>();
Accumulation<MutablePair<Double, Double>, MutablePair<MutableLong, MutableLong>, Double> piAccumulation = new PiAccumulation();
windowedOperator.setAccumulation(piAccumulation);
windowedOperator.setDataStorage(new InMemoryWindowedStorage<MutablePair<MutableLong, MutableLong>>());
windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>());
windowedOperator.setWindowOption(new WindowOption.GlobalWindow());
windowedOperator.setTriggerOption(TriggerOption.AtWatermark().withEarlyFiringsAtEvery(Duration.millis(1000)).accumulatingFiredPanes());
ConsoleOutputOperator outputOperator = new ConsoleOutputOperator();
dag.addOperator("inputOperator", inputOperator);
dag.addOperator("windowedOperator", windowedOperator);
dag.addOperator("outputOperator", outputOperator);
dag.addStream("input_windowed", inputOperator.output, windowedOperator.input);
dag.addStream("windowed_output", windowedOperator.output, outputOperator.input);
}
@Test
public void SumTest()
{
SumInt si = new SumInt();
SumLong sl = new SumLong();
SumFloat sf = new SumFloat();
SumDouble sd = new SumDouble();
Assert.assertEquals(new MutableInt(10), si.accumulate(si.defaultAccumulatedValue(), 10));
Assert.assertEquals(new MutableInt(11), si.accumulate(new MutableInt(1), 10));
Assert.assertEquals(new MutableInt(22), si.merge(new MutableInt(1), new MutableInt(21)));
Assert.assertEquals(new MutableLong(10L), sl.accumulate(sl.defaultAccumulatedValue(), 10L));
Assert.assertEquals(new MutableLong(22L), sl.accumulate(new MutableLong(2L), 20L));
Assert.assertEquals(new MutableLong(41L), sl.merge(new MutableLong(32L), new MutableLong(9L)));
Assert.assertEquals(new MutableFloat(9.0F), sf.accumulate(sf.defaultAccumulatedValue(), 9.0F));
Assert.assertEquals(new MutableFloat(22.5F), sf.accumulate(new MutableFloat(2.5F), 20F));
Assert.assertEquals(new MutableFloat(41.0F), sf.merge(new MutableFloat(33.1F), new MutableFloat(7.9F)));
Assert.assertEquals(new MutableDouble(9.0), sd.accumulate(sd.defaultAccumulatedValue(), 9.0));
Assert.assertEquals(new MutableDouble(22.5), sd.accumulate(new MutableDouble(2.5), 20.0));
Assert.assertEquals(new MutableDouble(41.0), sd.merge(new MutableDouble(33.1), new MutableDouble(7.9)));
}
public MySession(SessionManager sessionManager, String id, ThreadLocalManager threadLocalManager,
IdManager idManager, SessionStore sessionStore, SessionAttributeListener sessionListener,
int inactiveInterval, NonPortableSession nonPortableSession, MutableLong expirationTimeSuggestion,
RebuildBreakdownService rebuildBreakdownService)
{
this.sessionManager = sessionManager;
m_id = id;
this.threadLocalManager = threadLocalManager;
this.idManager = idManager;
this.sessionStore = sessionStore;
this.sessionListener = sessionListener;
m_inactiveInterval = inactiveInterval;
m_nonPortalSession = nonPortableSession;
m_created = System.currentTimeMillis();
m_accessed = m_created;
this.expirationTimeSuggestion = expirationTimeSuggestion;
resetExpirationTimeSuggestion();
// set the TERRACOTTA_CLUSTER flag
resolveTerracottaClusterProperty();
this.rebuildBreakdownService = rebuildBreakdownService;
}
/**
* Compute gauss mask 0.
*
* @param num
* the num
* @param sigma
* the sigma
* @return the double[]
*/
/*
* num ist eigentlich pointer - aufrufende Funkion nimmt an, dass num geändert
* wird. Übergebe es deswegen als MutableDouble aus CommonsLang
*/
public double[] compute_gauss_mask_0(MutableLong num, double sigma) {
int i, n;
double limit;
double[] h;
limit = LinesUtil.MASK_SIZE(LinesUtil.MAX_SIZE_MASK_0, sigma); /* Error < 0.001 on each side */
n = (int) limit;
h = new double[2 * n + 1];
for (i = -n + 1; i <= n - 1; i++)
h[n + i] = phi0(-i + 0.5, sigma) - phi0(-i - 0.5, sigma);
h[0] = 1.0 - phi0(n - 0.5, sigma);
h[2 * n] = phi0(-n + 0.5, sigma);
num.setValue(n);
return h;
}
protected Session newSessionWithBlockableMutableLong(final CountDownLatch opStarted,
final CountDownLatch opBlocker, final CountDownLatch opCompleted) {
// unfortunately, the Maintenance implementation compels us to
// use MySession rather than an interface.
String uuid = nextUuid();
final MutableLong expirationTimeSuggestion = new MutableLong(System.currentTimeMillis()) {
@Override
public long longValue() {
Callable<Long> callback = new Callable<Long>() {
public Long call() throws Exception {
return superLongValue();
}
};
Long result =
execBlockableSessionOp(opStarted, opBlocker, opCompleted, callback);
return result;
}
private long superLongValue() {
return super.longValue();
}
};
final MySession session = new MySession(sessionComponent,uuid,threadLocalManager,idManager,
sessionComponent,sessionListener,sessionComponent.getInactiveInterval(),new MyNonPortableSession(),
expirationTimeSuggestion, null);
return session;
}
protected Session newSessionWithBlockableGetLastAccessedTimeImpl(final CountDownLatch opStarted,
final CountDownLatch opBlocker, final CountDownLatch opCompleted) {
// unfortunately, the getActiveUserCount() implementation compels us to
// use MySession rather than an interface.
String uuid = nextUuid();
final MySession session = new MySession(sessionComponent,uuid,threadLocalManager,idManager,
sessionComponent,sessionListener,sessionComponent.getInactiveInterval(),new MyNonPortableSession(),
new MutableLong(System.currentTimeMillis()), null) {
private long superGetLastAccessedTime() {
return super.getLastAccessedTime();
}
@Override
public long getLastAccessedTime()
{
Callable<Long> callback = new Callable<Long>() {
public Long call() throws Exception {
return superGetLastAccessedTime();
}
};
Long result =
execBlockableSessionOp(opStarted, opBlocker, opCompleted, callback);
return result;
}
};
return session;
}
private void testBatchWritingAndReading(DataInterfaceFactory factory, DatabaseCachingType cachingType, int numberOfThreads, final long numberOfItems) throws FileNotFoundException, InterruptedException {
final BaseDataInterface dataInterface = createDataInterface(cachingType, factory);
dataInterface.dropAllData();
MutableLong numberOfItemsWritten = new MutableLong(0);
long startOfWrite = System.nanoTime();
CountDownLatch countDownLatch = new CountDownLatch(numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
new UniformDataTestsThread(numberOfItemsWritten, numberOfItems, dataInterface, countDownLatch, true).start();
}
countDownLatch.await();
dataInterface.flush();
long endOfWrite = System.nanoTime();
double writesPerSecond = numberOfItemsWritten.longValue() * 1e9 / (endOfWrite - startOfWrite);
countDownLatch = new CountDownLatch(numberOfThreads);
long startOfRead = System.nanoTime();
dataInterface.optimizeForReading();
MutableLong numberOfItemsRead = new MutableLong(0);
for (int i = 0; i < numberOfThreads; i++) {
new UniformDataTestsThread(numberOfItemsRead, numberOfItems, dataInterface, countDownLatch, false).start();
}
countDownLatch.await();
long endOfRead = System.nanoTime();
double readsPerSecond = numberOfItemsRead.longValue() * 1e9 / (endOfRead - startOfRead);
Log.i(factory.getClass().getSimpleName() + " threads " + numberOfThreads + " items " + numberOfItems + " write " + NumUtils.fmt(writesPerSecond) + " read " + NumUtils.fmt(readsPerSecond));
dataInterface.close();
}
@Test
public void testSort() throws Exception {
EquivalenceSet e1 = new EquivalenceSet();
e1.addTerm("a", 10);
e1.addTerm("b", 20);
EquivalenceSet e2 = new EquivalenceSet();
e2.addTerm("a", 5);
e2.addTerm("b", 10);
List<EquivalenceSet> sets = new ArrayList<>();
sets.add(e1);
sets.add(e2);
Collections.shuffle(sets);
Collections.sort(sets, Collections.reverseOrder());
assertEquals(new MutableLong(10), sets.get(0).getSortedMap().get("a"));
e2.addTerm("c", 1);
Collections.shuffle(sets);
Collections.sort(sets, Collections.reverseOrder());
assertEquals(new MutableLong(5), sets.get(0).getSortedMap().get("a"));
long first = -1;
for (Map.Entry<String, MutableLong> e : e1.getSortedMap().entrySet()) {
first = e.getValue().longValue();
break;
}
assertEquals(20, first);
}
public void incBlockId(final int blockId, final int data)
{
IdDataPair key = new IdDataPair(blockId, data);
MutableLong count = blockIdCounts.get(key);
if (count != null)
count.increment();
else
blockIdCounts.put(key, new MutableLong(1L));
}
public UniformDataTestsThread(MutableLong numberOfItemsProcessed, long numberOfItems, DataInterface dataInterface, CountDownLatch countDownLatch, boolean writeValues) {
super("UniformDataTestThread", false);
this.numberOfItemsProcessed = numberOfItemsProcessed;
this.numberOfItems = numberOfItems;
this.dataInterface = dataInterface;
this.countDownLatch = countDownLatch;
this.writeValues = writeValues;
}
@Override
public void add(String value) {
if (values.containsKey(value)) {
values.get(value).increment();
} else {
values.put(value, new MutableLong(1));
}
}
@Override
public void merge(Accumulator<String, ConcurrentSkipListMap<String, MutableLong>> other) {
other.getLocalValue().forEach((k, v) -> {
if (values.containsKey(k)) {
values.get(k).add(v.longValue());
} else {
values.put(k, v);
}
});
}
private void updateCollectives(Map<String, MutableLong> collectives, String name, int value){
MutableLong v = collectives.get(name);
if (v == null) {
collectives.put(name, new MutableLong(value));
} else {
v.add(value);
}
}
@Override
public void endWindow()
{
for (QueueListNode<QueryBundle<QUERY_TYPE, META_QUERY, MutableLong>> tempNode = queryQueue.getHead();
tempNode != null; tempNode = tempNode.getNext()) {
MutableLong qc = tempNode.getPayload().getQueueContext();
qc.decrement();
}
}
@Override
public boolean enqueue(QUERY query, META_QUERY metaQuery, MutableLong context)
{
if (context != null) {
query.setCountdown(context.getValue());
}
if (query.isOneTime()) {
return super.enqueue(query, metaQuery, new MutableLong(1L));
} else {
return super.enqueue(query, metaQuery, new MutableLong(query.getCountdown()));
}
}
@Override
public boolean addingFilter(QueryBundle<QUERY, META_QUERY, MutableLong> queryBundle)
{
QueueListNode<QueryBundle<QUERY, META_QUERY, MutableLong>> queryNode =
queryIDToNode.get(queryBundle.getQuery().getId());
if (queryNode == null) {
return true;
}
queryNode.setPayload(queryBundle);
return false;
}