下面列出了怎么用org.apache.commons.lang.mutable.MutableDouble的API类实例代码及写法,或者点击链接到github查看源代码。
/** Creates a new instance using a specific initial permutation and specified number of threads.
*
* <p>If <code>exact</code> is true, the final permutation is
* <em>exactly</em> the same as if you first permute the graph with <code>startPerm</code> and
* then apply LLP with an {@code null} starting permutation.
*
* @param symGraph a symmetric, loopless graph.
* @param startPerm an initial permutation of the graph, or {@code null} for no permutation.
* @param numberOfThreads the number of threads to be used (0 for automatic sizing).
* @param seed a random seed.
* @param exact a boolean flag that forces the algorithm to run exactly.
*/
public LayeredLabelPropagation(final ImmutableGraph symGraph, final int[] startPerm, final int numberOfThreads, final long seed, final boolean exact) throws IOException {
this.symGraph = symGraph;
this.n = symGraph.numNodes();
this.startPerm = startPerm;
this.seed = seed;
this.r = new XoRoShiRo128PlusRandom(seed);
this.exact = exact;
this.label = new AtomicIntegerArray(n);
this.volume = new AtomicIntegerArray(n);
cumulativeOutdegrees = new EliasFanoCumulativeOutdegreeList(symGraph, symGraph.numArcs(), 1);
this.gapCost = new MutableDouble();
this.updateList = Util.identity(n);
simpleUncaughtExceptionHandler = new SimpleUncaughtExceptionHandler();
labelling = File.createTempFile(this.getClass().getName(), "labelling");
labelling.deleteOnExit();
this.numberOfThreads = numberOfThreads != 0 ? numberOfThreads : Runtime.getRuntime().availableProcessors();
this.canChange = new boolean[n];
this.modified = new AtomicInteger(0);
this.objectiveFunction = new double[this.numberOfThreads];
}
private void processTuple(KeyValPair<MerchantKey, Long> tuple)
{
MerchantKey merchantKey = tuple.getKey();
MutableDouble lastSma = lastSMAMap.get(tuple.getKey());
long txValue = tuple.getValue();
if (lastSma != null && txValue > lastSma.doubleValue()) {
double lastSmaValue = lastSma.doubleValue();
double change = txValue - lastSmaValue;
if (change > threshold) { // generate an alert
AverageAlertData data = getOutputData(merchantKey, txValue, change, lastSmaValue);
alerts.add(data);
//if (userGenerated) { // if its user generated only the pass it to WebSocket
if (merchantKey.merchantType == MerchantTransaction.MerchantType.BRICK_AND_MORTAR) {
avgAlertNotificationPort.emit(getOutputData(data, String.format(brickMortarAlertMsg, txValue, change, lastSmaValue, merchantKey.merchantId, merchantKey.terminalId)));
} else { // its internet based
avgAlertNotificationPort.emit(getOutputData(data, String.format(internetAlertMsg, txValue, change, lastSmaValue, merchantKey.merchantId)));
}
//}
}
}
}
/**
* Generates tuples for each key and emits them. Only keys that are in the
* denominator are iterated on If the key is only in the numerator, it gets
* ignored (cannot do divide by 0) Clears internal data
*/
@Override
public void endWindow()
{
HashMap<K, Double> tuples = new HashMap<K, Double>();
for (Map.Entry<K, MutableDouble> e : denominators.entrySet()) {
MutableDouble nval = numerators.get(e.getKey());
if (nval == null) {
tuples.put(e.getKey(), new Double(0.0));
} else {
tuples.put(e.getKey(), new Double((nval.doubleValue() / e.getValue()
.doubleValue()) * mult_by));
}
}
if (!tuples.isEmpty()) {
quotient.emit(tuples);
}
numerators.clear();
denominators.clear();
}
/**
* Emits average for each key in end window. Data is computed during process
* on input port Clears the internal data before return.
*/
@Override
public void endWindow()
{
for (Map.Entry<K, MutableDouble> e : sums.entrySet()) {
K key = e.getKey();
double d = e.getValue().doubleValue();
if (doubleAverage.isConnected()) {
doubleAverage.emit(new KeyValPair<K, Double>(key, d / counts.get(key).doubleValue()));
}
if (intAverage.isConnected()) {
intAverage.emit(new KeyValPair<K, Integer>(key, (int)d));
}
if (longAverage.isConnected()) {
longAverage.emit(new KeyValPair<K, Long>(key, (long)d));
}
}
sums.clear();
counts.clear();
}
/**
* For each tuple (a key value pair) Adds the values for each key.
*/
@Override
public void process(KeyValPair<K, V> tuple)
{
K key = tuple.getKey();
if (!doprocessKey(key)) {
return;
}
SumEntry val = sums.get(key);
if (val == null) {
val = new SumEntry(new MutableDouble(tuple.getValue().doubleValue()), true);
} else {
val.sum.add(tuple.getValue().doubleValue());
val.changed = true;
}
sums.put(cloneKey(key), val);
}
@Override
public void endWindow()
{
int totalWindowsOccupied = cacheOject.size();
for (Map.Entry<String, Map<String, KeyValPair<MutableDouble, Integer>>> e : outputMap.entrySet()) {
for (Map.Entry<String, KeyValPair<MutableDouble, Integer>> dimensionValObj : e.getValue().entrySet()) {
Map<String, DimensionObject<String>> outputData = new HashMap<String, DimensionObject<String>>();
KeyValPair<MutableDouble, Integer> keyVal = dimensionValObj.getValue();
if (operationType == AggregateOperation.SUM) {
outputData.put(e.getKey(), new DimensionObject<String>(keyVal.getKey(), dimensionValObj.getKey()));
} else if (operationType == AggregateOperation.AVERAGE) {
if (keyVal.getValue() != 0) {
double totalCount = ((double)(totalWindowsOccupied * applicationWindowSize)) / 1000;
outputData.put(e.getKey(), new DimensionObject<String>(new MutableDouble(keyVal.getKey().doubleValue() / totalCount), dimensionValObj.getKey()));
}
}
if (!outputData.isEmpty()) {
output.emit(outputData);
}
}
}
currentWindow = (currentWindow + 1) % windowSize;
}
@Override
public void process(Map<String, DimensionObject<String>> tuple)
{
for (Map.Entry<String, DimensionObject<String>> e : tuple.entrySet()) {
Map<String, MutableDouble> obj = dataMap.get(e.getKey());
DimensionObject<String> eObj = e.getValue();
if (obj == null) {
obj = new HashMap<String, MutableDouble>();
obj.put(eObj.getVal(), new MutableDouble(eObj.getCount()));
dataMap.put(e.getKey(), obj);
} else {
MutableDouble n = obj.get(eObj.getVal());
if (n == null) {
obj.put(eObj.getVal(), new MutableDouble(eObj.getCount()));
} else {
n.add(eObj.getCount());
}
}
}
}
@Override
public void process(String timeBucket, String key, String field, Number value)
{
String finalKey = timeBucket + "|" + key;
Map<String, Number> m = dataMap.get(finalKey);
if (value == null) {
return;
}
if (m == null) {
m = new HashMap<String, Number>();
m.put(field, new MutableDouble(value));
dataMap.put(finalKey, m);
} else {
Number n = m.get(field);
if (n == null) {
m.put(field, new MutableDouble(value));
} else {
((MutableDouble)n).add(value);
}
}
}
public ContaminationModel(List<PileupSummary> sites) {
errorRate = calculateErrorRate(sites);
// partition genome into minor allele fraction (MAF) segments to better distinguish hom alts from LoH hets.
segments = ContaminationSegmenter.findSegments(sites);
final int numSegments = segments.size();
final List<Double> minorAlleleFractionsGuess = new ArrayList<>(Collections.nCopies(segments.size(), 0.5));
final MutableDouble contaminationGuess = new MutableDouble(0);
for (int n = 0; n < NUM_ITERATIONS; n++) {
IntStream.range(0, numSegments).forEach(s -> minorAlleleFractionsGuess.set(s, calculateMinorAlleleFraction(contaminationGuess.doubleValue(), errorRate, segments.get(s))));
final Pair<List<List<PileupSummary>>, List<Double>> nonLOHSegmentsAndMafs = getNonLOHSegments(segments, minorAlleleFractionsGuess);
contaminationGuess.setValue(calculateContamination(errorRate, nonLOHSegmentsAndMafs.getLeft(), nonLOHSegmentsAndMafs.getRight()));
}
minorAlleleFractions = minorAlleleFractionsGuess;
contamination = contaminationGuess.doubleValue();
}
@Override
public void process(KeyValPair<MerchantKey, Double> tuple)
{
MutableDouble currentSma = currentSMAMap.get(tuple.getKey());
if (currentSma == null) { // first sma for the given key
double sma = tuple.getValue();
currentSMAMap.put(tuple.getKey(), new MutableDouble(sma));
//lastSMAMap.put(tuple.getKey(), new MutableDouble(sma));
} else { // move the current SMA value to the last SMA Map
//lastSMAMap.get(tuple.getKey()).setValue(currentSma.getValue());
currentSma.setValue(tuple.getValue()); // update the current SMA value
}
}
protected Number convertToNumber(Object o)
{
if (o == null) {
return null;
} else if (o instanceof MutableDouble || o instanceof MutableLong) {
return (Number)o;
} else if (o instanceof Double || o instanceof Float) {
return new MutableDouble((Number)o);
} else if (o instanceof Number) {
return new MutableLong((Number)o);
} else {
return new MutableDouble(o.toString());
}
}
/**
* Process each key, compute change or percent, and emit it.
*/
@Override
public void process(KeyValPair<K, V> tuple)
{
K key = tuple.getKey();
if (!doprocessKey(key)) {
return;
}
MutableDouble bval = basemap.get(key);
if (bval != null) { // Only process keys that are in the basemap
double cval = tuple.getValue().doubleValue() - bval.doubleValue();
change.emit(new KeyValPair<K, V>(cloneKey(key), getValue(cval)));
percent.emit(new KeyValPair<K, Double>(cloneKey(key), (cval / bval.doubleValue()) * 100));
}
}
@Override
public void endWindow()
{
Map<String, DimensionObject<String>> outputAggregationsObject;
for (Entry<String, Map<String, Map<AggregateOperation, Number>>> keys : unifiedCache.entrySet()) {
String key = keys.getKey();
Map<String, Map<AggregateOperation, Number>> dimValues = keys.getValue();
for (Entry<String, Map<AggregateOperation, Number>> dimValue : dimValues.entrySet()) {
String dimValueName = dimValue.getKey();
Map<AggregateOperation, Number> operations = dimValue.getValue();
outputAggregationsObject = new HashMap<String, DimensionObject<String>>();
for (Entry<AggregateOperation, Number> operation : operations.entrySet()) {
AggregateOperation aggrOperationType = operation.getKey();
Number aggr = operation.getValue();
String outKey = key + "." + aggrOperationType.name();
DimensionObject<String> outDimObj = new DimensionObject<String>(new MutableDouble(aggr), dimValueName);
outputAggregationsObject.put(outKey, outDimObj);
}
aggregationsOutput.emit(outputAggregationsObject);
}
}
}
@Override
public void setup(OperatorContext arg0)
{
if (arg0 != null) {
applicationWindowSize = arg0.getValue(OperatorContext.APPLICATION_WINDOW_COUNT);
}
if (cacheOject == null) {
cacheOject = new HashMap<Integer, Map<String, Map<String, Number>>>(windowSize);
}
if (outputMap == null) {
outputMap = new HashMap<String, Map<String, KeyValPair<MutableDouble, Integer>>>();
}
setUpPatternList();
}
@Override
public void beginWindow(long windowId)
{
dataMap = new HashMap<String, Map<String, MutableDouble>>();
// TODO Auto-generated method stub
}
@Override
public void endWindow()
{
for (Map.Entry<String, Map<String, MutableDouble>> e : dataMap.entrySet()) {
for (Map.Entry<String, MutableDouble> dimensionValObj : e.getValue().entrySet()) {
Map<String, DimensionObject<String>> outputData = new HashMap<String, DimensionObject<String>>();
outputData.put(e.getKey(), new DimensionObject<String>(dimensionValObj.getValue(), dimensionValObj.getKey()));
output.emit(outputData);
}
}
dataMap.clear();
}
@SuppressWarnings({"rawtypes", "unchecked"})
public void testNodeProcessingSchema(TopNUnique oper)
{
CollectorTestSink sortSink = new CollectorTestSink();
oper.top.setSink(sortSink);
oper.setN(3);
oper.beginWindow(0);
HashMap<String, DimensionObject<String>> input = new HashMap<String, DimensionObject<String>>();
input.put("url", new DimensionObject<String>(new MutableDouble(10), "abc"));
oper.data.process(input);
input.clear();
input.put("url", new DimensionObject<String>(new MutableDouble(1), "def"));
input.put("url1", new DimensionObject<String>(new MutableDouble(1), "def"));
oper.data.process(input);
input.clear();
input.put("url", new DimensionObject<String>(new MutableDouble(101), "ghi"));
input.put("url1", new DimensionObject<String>(new MutableDouble(101), "ghi"));
oper.data.process(input);
input.clear();
input.put("url", new DimensionObject<String>(new MutableDouble(50), "jkl"));
oper.data.process(input);
input.clear();
input.put("url", new DimensionObject<String>(new MutableDouble(50), "jkl"));
input.put("url3", new DimensionObject<String>(new MutableDouble(50), "jkl"));
oper.data.process(input);
oper.endWindow();
Assert.assertEquals("number emitted tuples", 3, sortSink.collectedTuples.size());
for (Object o : sortSink.collectedTuples) {
log.debug(o.toString());
}
log.debug("Done testing round\n");
}
@Test
public void testBasicCounters() throws InstantiationException, IllegalAccessException
{
BasicCounters<MutableDouble> doubleBasicCounters = new BasicCounters<MutableDouble>(MutableDouble.class);
MutableDouble counterA = doubleBasicCounters.findCounter(CounterKeys.A);
counterA.increment();
MutableDouble counterAInCounters = doubleBasicCounters.getCounter(CounterKeys.A);
Assert.assertNotNull("null", doubleBasicCounters.getCounter(CounterKeys.A));
Assert.assertTrue("equality", counterAInCounters.equals(counterA));
Assert.assertEquals(counterA.doubleValue(), 1.0, 0);
}
/**
* Emits on all ports that are connected. Data is precomputed during process
* on input port endWindow just emits it for each key Clears the internal data
* before return
*/
@Override
public void endWindow()
{
// Should allow users to send each key as a separate tuple to load balance
// This is an aggregate node, so load balancing would most likely not be
// needed
HashMap<K, V> tuples = new HashMap<K, V>();
HashMap<K, Integer> ctuples = new HashMap<K, Integer>();
HashMap<K, Double> dtuples = new HashMap<K, Double>();
HashMap<K, Integer> ituples = new HashMap<K, Integer>();
HashMap<K, Float> ftuples = new HashMap<K, Float>();
HashMap<K, Long> ltuples = new HashMap<K, Long>();
HashMap<K, Short> stuples = new HashMap<K, Short>();
for (Map.Entry<K, MutableDouble> e : sums.entrySet()) {
K key = e.getKey();
MutableDouble val = e.getValue();
tuples.put(key, getValue(val.doubleValue()));
dtuples.put(key, val.doubleValue());
ituples.put(key, val.intValue());
ftuples.put(key, val.floatValue());
ltuples.put(key, val.longValue());
stuples.put(key, val.shortValue());
// ctuples.put(key, counts.get(e.getKey()).toInteger());
MutableInt c = counts.get(e.getKey());
if (c != null) {
ctuples.put(key, c.toInteger());
}
}
sum.emit(tuples);
sumDouble.emit(dtuples);
sumInteger.emit(ituples);
sumLong.emit(ltuples);
sumShort.emit(stuples);
sumFloat.emit(ftuples);
count.emit(ctuples);
clearCache();
}
@Test
@SuppressWarnings("unchecked")
public void testOperator()
{
DimensionOperatorUnifier unifier = new DimensionOperatorUnifier();
CollectorTestSink sink = new CollectorTestSink();
unifier.aggregationsOutput.setSink(sink);
unifier.beginWindow(1);
Map<String, DimensionObject<String>> tuple1 = new HashMap<String, DimensionObject<String>>();
tuple1.put("m|201402121900|0|65537|131074|bytes.AVERAGE", new DimensionObject<String>(new MutableDouble(75), "a"));
tuple1.put("m|201402121900|0|65537|131074|bytes.COUNT", new DimensionObject<String>(new MutableDouble(3.0), "a"));
tuple1.put("m|201402121900|0|65537|131074|bytes.SUM", new DimensionObject<String>(new MutableDouble(225), "a"));
Map<String, DimensionObject<String>> tuple2 = new HashMap<String, DimensionObject<String>>();
tuple2.put("m|201402121900|0|65537|131074|bytes.AVERAGE", new DimensionObject<String>(new MutableDouble(50), "a"));
tuple2.put("m|201402121900|0|65537|131074|bytes.COUNT", new DimensionObject<String>(new MutableDouble(2.0), "a"));
tuple2.put("m|201402121900|0|65537|131074|bytes.SUM", new DimensionObject<String>(new MutableDouble(100), "a"));
Map<String, DimensionObject<String>> tuple3 = new HashMap<String, DimensionObject<String>>();
tuple3.put("m|201402121900|0|65537|131074|bytes.AVERAGE", new DimensionObject<String>(new MutableDouble(50), "z"));
tuple3.put("m|201402121900|0|65537|131074|bytes.COUNT", new DimensionObject<String>(new MutableDouble(2.0), "z"));
tuple3.put("m|201402121900|0|65537|131074|bytes.SUM", new DimensionObject<String>(new MutableDouble(100), "z"));
Map<String, DimensionObject<String>> tuple4 = new HashMap<String, DimensionObject<String>>();
tuple4.put("m|201402121900|0|65537|131075|bytes.AVERAGE", new DimensionObject<String>(new MutableDouble(14290.5), "b"));
tuple4.put("m|201402121900|0|65537|131075|bytes.COUNT", new DimensionObject<String>(new MutableDouble(2.0), "b"));
tuple4.put("m|201402121900|0|65537|131075|bytes.SUM", new DimensionObject<String>(new MutableDouble(28581.0), "b"));
Map<String, DimensionObject<String>> tuple5 = new HashMap<String, DimensionObject<String>>();
tuple5.put("m|201402121900|0|65537|131076|bytes.AVERAGE", new DimensionObject<String>(new MutableDouble(290.75), "c"));
tuple5.put("m|201402121900|0|65537|131076|bytes.COUNT", new DimensionObject<String>(new MutableDouble(10.0), "c"));
tuple5.put("m|201402121900|0|65537|131076|bytes.SUM", new DimensionObject<String>(new MutableDouble(8581.0), "c"));
unifier.process(tuple1);
unifier.process(tuple2);
unifier.process(tuple3);
unifier.process(tuple4);
unifier.process(tuple5);
unifier.endWindow();
@SuppressWarnings("unchecked")
List<Map<String, DimensionObject<String>>> tuples = sink.collectedTuples;
Assert.assertEquals("Tuple Count", 4, tuples.size());
for (Map<String, DimensionObject<String>> map : tuples) {
for (Entry<String, DimensionObject<String>> entry : map.entrySet()) {
String key = entry.getKey();
DimensionObject<String> dimObj = entry.getValue();
if (key.equals("m|201402121900|0|65537|131074|bytes.AVERAGE") && dimObj.getVal().equals("a")) {
Assert.assertEquals("average for key " + key + " and dimension key " + "a", new MutableDouble(65), dimObj.getCount());
}
if (key.equals("m|201402121900|0|65537|131074|bytes.SUM") && dimObj.getVal().equals("z")) {
Assert.assertEquals("sum for key " + key + " and dimension key " + "z", new MutableDouble(100), dimObj.getCount());
}
if (key.equals("m|201402121900|0|65537|131076|bytes.COUNT") && dimObj.getVal().equals("c")) {
Assert.assertEquals("count for key " + key + " and dimension key " + "c", new MutableDouble(10), dimObj.getCount());
}
}
}
}
@Test
@SuppressWarnings("unchecked")
public void testOperator()
{
LogstreamTopN oper = new LogstreamTopN();
LogstreamPropertyRegistry registry = new LogstreamPropertyRegistry();
registry.bind(LogstreamUtil.LOG_TYPE, "apache");
registry.bind(LogstreamUtil.FILTER, "default");
oper.setRegistry(registry);
oper.setN(5);
CollectorTestSink mapSink = new CollectorTestSink();
oper.top.setSink(mapSink);
oper.beginWindow(0);
Map<String, DimensionObject<String>> tuple1 = new HashMap<String, DimensionObject<String>>();
DimensionObject<String> dimObja = new DimensionObject<String>(new MutableDouble(10), "a");
tuple1.put("m|201402121900|0|65535|131075|val.COUNT", dimObja);
oper.data.process(tuple1);
DimensionObject<String> dimObjb = new DimensionObject<String>(new MutableDouble(1), "b");
tuple1.put("m|201402121900|0|65535|131075|val.COUNT", dimObjb);
oper.data.process(tuple1);
DimensionObject<String> dimObjc = new DimensionObject<String>(new MutableDouble(5), "c");
tuple1.put("m|201402121900|0|65535|131075|val.COUNT", dimObjc);
oper.data.process(tuple1);
DimensionObject<String> dimObjd = new DimensionObject<String>(new MutableDouble(2), "d");
tuple1.put("m|201402121900|0|65535|131075|val.COUNT", dimObjd);
oper.data.process(tuple1);
DimensionObject<String> dimObje = new DimensionObject<String>(new MutableDouble(15), "e");
tuple1.put("m|201402121900|0|65535|131075|val.COUNT", dimObje);
oper.data.process(tuple1);
DimensionObject<String> dimObjf = new DimensionObject<String>(new MutableDouble(4), "f");
tuple1.put("m|201402121900|0|65535|131075|val.COUNT", dimObjf);
oper.data.process(tuple1);
oper.endWindow();
@SuppressWarnings("unchecked")
Map<String, List<DimensionObject<String>>> tuples = (Map<String, List<DimensionObject<String>>>)mapSink.collectedTuples.get(0);
List<DimensionObject<String>> outList = tuples.get("m|201402121900|0|65535|131075|val.COUNT");
List<DimensionObject<String>> expectedList = new ArrayList<DimensionObject<String>>();
expectedList.add(dimObje);
expectedList.add(dimObja);
expectedList.add(dimObjc);
expectedList.add(dimObjf);
expectedList.add(dimObjd);
Assert.assertEquals("Size", expectedList.size(), outList.size());
Assert.assertEquals("compare list", expectedList, outList);
}
SumEntry(MutableDouble sum, boolean changed)
{
this.sum = sum;
this.changed = changed;
}
@Override
public void beginWindow(long arg0)
{
Map<String, Map<String, Number>> currentWindowMap = cacheOject.get(currentWindow);
if (currentWindowMap == null) {
currentWindowMap = new HashMap<String, Map<String, Number>>();
} else {
for (Map.Entry<String, Map<String, Number>> tupleEntry : currentWindowMap.entrySet()) {
String tupleKey = tupleEntry.getKey();
Map<String, Number> tupleValue = tupleEntry.getValue();
int currentPattern = 0;
for (Pattern pattern : patternList) {
Matcher matcher = pattern.matcher(tupleKey);
if (matcher.matches()) {
String currentPatternString = dimensionArrayString.get(currentPattern);
Map<String, KeyValPair<MutableDouble, Integer>> currentPatternMap = outputMap.get(currentPatternString);
if (currentPatternMap != null) {
StringBuilder builder = new StringBuilder(matcher.group(2));
for (int i = 1; i < dimensionArray.get(currentPattern).length; i++) {
builder.append("," + matcher.group(i + 2));
}
KeyValPair<MutableDouble, Integer> currentDimensionKeyValPair = currentPatternMap.get(builder.toString());
if (currentDimensionKeyValPair != null) {
currentDimensionKeyValPair.getKey().add(0 - tupleValue.get(dimensionKeyVal).doubleValue());
currentDimensionKeyValPair.setValue(currentDimensionKeyValPair.getValue() - 1);
if (currentDimensionKeyValPair.getKey().doubleValue() == 0.0) {
currentPatternMap.remove(builder.toString());
}
}
}
break;
}
currentPattern++;
}
}
}
currentWindowMap.clear();
if (patternList == null || patternList.isEmpty()) {
setUpPatternList();
}
}
public DimensionObject(MutableDouble count, T s)
{
this.count = count;
val = s;
}
public MutableDouble getCount()
{
return count;
}
public void setCount(MutableDouble count)
{
this.count = count;
}
public SimpleMovingAverageObject()
{
sum = new MutableDouble(0);
count = new MutableInt(0);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public void testNodeProcessingSchema(MultiWindowDimensionAggregation oper)
{
oper.setWindowSize(3);
List<int[]> dimensionArrayList = new ArrayList<int[]>();
int[] dimensionArray = {0, 1};
int[] dimensionArray_2 = {0};
dimensionArrayList.add(dimensionArray);
dimensionArrayList.add(dimensionArray_2);
oper.setDimensionArray(dimensionArrayList);
oper.setTimeBucket("m");
oper.setDimensionKeyVal("0");
oper.setOperationType(AggregateOperation.AVERAGE);
oper.setup(null);
CollectorTestSink sortSink = new CollectorTestSink();
oper.output.setSink(sortSink);
oper.beginWindow(0);
Map<String, Map<String, Number>> data_0 = new HashMap<String, Map<String, Number>>();
Map<String, Number> input_0 = new HashMap<String, Number>();
input_0.put("0", new MutableDouble(9));
input_0.put("1", new MutableDouble(9));
input_0.put("2", new MutableDouble(9));
data_0.put("m|20130823131512|0:abc|1:ff", input_0);
data_0.put("m|20130823131512|0:abc", input_0);
data_0.put("m|20130823131512|0:abc|1:ie", input_0);
Map<String, Number> input_new = new HashMap<String, Number>();
input_new.put("0", new MutableDouble(19));
input_new.put("1", new MutableDouble(19));
input_new.put("2", new MutableDouble(19));
data_0.put("m|20130823131512|0:def|1:ie", input_new);
oper.data.process(data_0);
oper.endWindow();
Map<String, Map<String, Number>> data_1 = new HashMap<String, Map<String, Number>>();
Map<String, Number> input_1 = new HashMap<String, Number>();
oper.beginWindow(1);
input_1.put("0", new MutableDouble(9));
input_1.put("1", new MutableDouble(9));
input_1.put("2", new MutableDouble(9));
data_1.put("m|20130823131513|0:def|1:ff", input_1);
data_1.put("m|20130823131513|0:abc|1:ie", input_1);
oper.data.process(data_1);
oper.endWindow();
Map<String, Map<String, Number>> data_2 = new HashMap<String, Map<String, Number>>();
Map<String, Number> input_2 = new HashMap<String, Number>();
oper.beginWindow(2);
input_2.put("0", new MutableDouble(19));
input_2.put("1", new MutableDouble(19));
input_2.put("2", new MutableDouble(19));
data_2.put("m|20130823131514|0:def|1:ff", input_2);
data_2.put("m|20130823131514|0:abc|1:ie", input_2);
oper.data.process(data_2);
oper.endWindow();
Map<String, Map<String, Number>> data_3 = new HashMap<String, Map<String, Number>>();
Map<String, Number> input_3 = new HashMap<String, Number>();
oper.beginWindow(3);
input_3.put("0", new MutableDouble(19));
input_3.put("1", new MutableDouble(19));
input_3.put("2", new MutableDouble(19));
data_3.put("m|20130823131514|0:def|1:ff", input_3);
data_3.put("m|20130823131514|0:abc|1:ie", input_3);
oper.data.process(data_3);
oper.endWindow();
Assert.assertEquals("number emitted tuples", 16, sortSink.collectedTuples.size());
for (Object o : sortSink.collectedTuples) {
logger.debug(o.toString());
}
logger.debug("Done testing round\n");
}
@Test
public void testDimensionTimeBucket() throws InterruptedException
{
DimensionTimeBucketSumOperator oper = new DimensionTimeBucketSumOperator();
CollectorTestSink sortSink = new CollectorTestSink();
oper.out.setSink(sortSink);
oper.addDimensionKeyName("ipAddr");
oper.addDimensionKeyName("url");
oper.addDimensionKeyName("status");
oper.addDimensionKeyName("agent");
oper.addValueKeyName("bytes");
Set<String> dimensionKey = new HashSet<String>();
dimensionKey.add("ipAddr");
dimensionKey.add("url");
try {
oper.addCombination(dimensionKey);
} catch (NoSuchFieldException e) {
//ignored
}
oper.setTimeBucketFlags(AbstractDimensionTimeBucketOperator.TIMEBUCKET_MINUTE);
oper.setup(null);
oper.beginWindow(0);
oper.in.process(getMap("10.10.1.1", "/movies", "200", "FF", 20));
oper.in.process(getMap("10.10.1.2", "/movies", "200", "FF", 20));
oper.in.process(getMap("10.10.1.2", "/movies", "200", "FF", 20));
oper.endWindow();
Map<String, Map<String, Number>> outputMap = Maps.newHashMap();
Map<String, Number> key1 = Maps.newHashMap();
key1.put("1", new MutableDouble(40.0));
key1.put("0", new MutableDouble(2.0));
outputMap.put("m|197001010000|0:10.10.1.2|1:/movies", key1);
Map<String, Number> key2 = Maps.newHashMap();
key2.put("0", new MutableDouble(1.0));
key2.put("1", new MutableDouble(20.0));
outputMap.put("m|197001010000|0:10.10.1.1|1:/movies", key2);
Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
for (Object o : sortSink.collectedTuples) {
Assert.assertEquals("content of tuple ", outputMap, o);
logger.debug(o.toString());
}
logger.debug("Done testing round\n");
}
/**
* Add tuple to nval/dval map.
*
* @param tuple
* key/value map on input port.
* @param map
* key/summed value map.
*/
public void addTuple(Map<K, V> tuple, Map<K, MutableDouble> map)
{
for (Map.Entry<K, V> e : tuple.entrySet()) {
addEntry(e.getKey(), e.getValue(), map);
}
}