下面列出了怎么用org.apache.commons.lang.mutable.MutableLong的API类实例代码及写法,或者点击链接到github查看源代码。
protected AppDataSingleSchemaDimensionStoreHDHT createStore(DAG dag, Configuration conf, String eventSchema)
{
AppDataSingleSchemaDimensionStoreHDHT store = dag.addOperator("Store", ProcessTimeAwareStore.class);
store.setUpdateEnumValues(true);
String basePath = Preconditions.checkNotNull(conf.get(PROP_STORE_PATH),
"base path should be specified in the properties.xml");
TFileImpl hdsFile = new TFileImpl.DTFileImpl();
basePath += System.currentTimeMillis();
hdsFile.setBasePath(basePath);
store.setFileStore(hdsFile);
dag.setAttribute(store, Context.OperatorContext.COUNTERS_AGGREGATOR,
new BasicCounters.LongAggregator<MutableLong>());
store.setConfigurationSchemaJSON(eventSchema);
store.setPartitionCount(storePartitionCount);
if(storePartitionCount > 1)
{
store.setPartitionCount(storePartitionCount);
store.setQueryResultUnifier(new DimensionStoreHDHTNonEmptyQueryResultUnifier());
}
return store;
}
@Override
public void setup(OperatorContext context)
{
try {
fs = getHDFSInstance();
} catch (IOException ex) {
throw new RuntimeException(ex);
}
this.context = context;
lastTimeStamp = System.currentTimeMillis();
fileCounters.setCounter(Counters.TOTAL_BYTES_WRITTEN, new MutableLong());
fileCounters.setCounter(Counters.TOTAL_TIME_ELAPSED, new MutableLong());
super.setup(context);
}
@Override
public void populateDAG(DAG dag, Configuration conf)
{
String filePath = "HDFSOutputOperatorBenchmarkingApp/"
+ System.currentTimeMillis();
dag.setAttribute(DAG.STREAMING_WINDOW_SIZE_MILLIS, 1000);
RandomWordGenerator wordGenerator = dag.addOperator("wordGenerator", RandomWordGenerator.class);
dag.getOperatorMeta("wordGenerator").getMeta(wordGenerator.output)
.getAttributes().put(PortContext.QUEUE_CAPACITY, 10000);
dag.getOperatorMeta("wordGenerator").getAttributes()
.put(OperatorContext.APPLICATION_WINDOW_COUNT, 1);
FSByteOutputOperator hdfsOutputOperator = dag.addOperator("hdfsOutputOperator", new FSByteOutputOperator());
hdfsOutputOperator.setFilePath(filePath);
dag.getOperatorMeta("hdfsOutputOperator").getAttributes()
.put(OperatorContext.COUNTERS_AGGREGATOR, new BasicCounters.LongAggregator<MutableLong>());
dag.addStream("Generator2HDFSOutput", wordGenerator.output, hdfsOutputOperator.input);
}
@Override
@SuppressWarnings("unchecked")
public Response processStats(BatchedOperatorStats batchedOperatorStats)
{
BasicCounters<MutableLong> fileCounters = null;
for (OperatorStats operatorStats : batchedOperatorStats.getLastWindowedStats()) {
if (operatorStats.counters != null) {
fileCounters = (BasicCounters<MutableLong>)operatorStats.counters;
}
}
Response response = new Response();
if (fileCounters != null &&
fileCounters.getCounter(FileCounters.PENDING_FILES).longValue() > 0L ||
System.currentTimeMillis() - repartitionInterval <= lastRepartition) {
response.repartitionRequired = false;
return response;
}
response.repartitionRequired = true;
return response;
}
private long updateTimestamps() {
MutableLong ts = new MutableLong(Long.MAX_VALUE);
for (Attribute<?> attribute : getAttributes()) {
mergeTimestamps(attribute, ts);
}
return ts.longValue();
}
public ScanSessionStats() {
timers = new EnumMap<>(TIMERS.class);
mergedTimers = new EnumMap<>(TIMERS.class);
for (TIMERS timer : TIMERS.values()) {
timers.put(timer, new StopWatch());
mergedTimers.put(timer, new MutableLong());
}
keysSeen = new MutableLong();
}
public void addTermInfo(TermInfo info/* , Set<ColumnVisibility> columnVisibilities */) throws IOException {
if (!isCompatible(info)) {
throw new IllegalArgumentException("Attempting to add term info for " + info.fieldName + "=" + info.fieldValue + ", " + info.date
+ " to the summary for " + fieldName + "=" + fieldValue + ", " + date);
}
// Merge the columnVisibilities
// Do not count the record if we can't parse its ColumnVisibility
Set<ColumnVisibility> columnVisibilities = columnVisibilitiesMap.get(info.datatype);
if (columnVisibilities == null) {
columnVisibilities = Sets.newHashSet();
}
try {
if (info.vis.getExpression().length != 0) {
columnVisibilities.add(info.vis);
}
MutableLong count = summary.get(info.datatype);
if (count == null) {
summary.put(info.datatype, new MutableLong(info.count));
columnVisibilitiesMap.put(info.datatype, columnVisibilities);
} else {
count.add(info.count);
}
} catch (Exception e) {
// We want to stop the scan when we cannot properly combine ColumnVisibility
String message = "Error parsing ColumnVisibility of key";
log.error(message, e);
throw new IOException(message, e);
}
}
public Map<Key,Value> getKeyValues() throws IOException {
Map<Key,Value> resultsMap = new HashMap<>();
for (Entry<String,MutableLong> entry : summary.entrySet()) {
// Key: row = fieldName, colf = datatype, colq = date
// Value: count
String datatype = entry.getKey();
long count = entry.getValue().longValue();
try {
// Calculate the ColumnVisibility for this key from the combiner.
Set<ColumnVisibility> columnVisibilities = this.columnVisibilitiesMap.get(datatype);
// Note that the access controls found in the combined ColumnVisibility will be pulled out appropriately here
ColumnVisibility cv = markingFunctions.combine(columnVisibilities);
// Create a new Key compatible with the shardIndex key format
Key k = new Key(this.fieldValue, this.fieldName, this.date + '\0' + datatype, new String(cv.getExpression()));
// Create a UID object with just the count for the Value
Builder uidBuilder = Uid.List.newBuilder();
uidBuilder.setIGNORE(false);
uidBuilder.setCOUNT(count);
Uid.List uidList = uidBuilder.build();
org.apache.accumulo.core.data.Value v = new org.apache.accumulo.core.data.Value(uidList.toByteArray());
resultsMap.put(k, v);
} catch (Exception e) {
// We want to stop the scan when we cannot properly combine ColumnVisibility
String message = "Could not create combined ColumnVisibility";
log.error(message, e);
throw new IOException(message, e);
}
}
return resultsMap;
}
protected PubSubWebSocketAppDataResult createQueryResult(DAG dag, Configuration conf, AppDataSingleSchemaDimensionStoreHDHT store)
{
PubSubWebSocketAppDataResult wsOut = new PubSubWebSocketAppDataResult();
URI queryUri = getQueryUri(dag, conf);
wsOut.setUri(queryUri);
dag.addOperator("QueryResult", wsOut);
// Set remaining dag options
dag.setAttribute(store, Context.OperatorContext.COUNTERS_AGGREGATOR,
new BasicCounters.LongAggregator<MutableLong>());
return wsOut;
}
@Override
public void setup(OperatorContext context)
{
this.context = context;
commandCounters.setCounter(CommandCounters.ADD, new MutableLong());
commandCounters.setCounter(CommandCounters.ADD_RANGE, new MutableLong());
commandCounters.setCounter(CommandCounters.DELETE, new MutableLong());
commandCounters.setCounter(CommandCounters.CLEAR, new MutableLong());
}
protected Number convertToNumber(Object o)
{
if (o == null) {
return null;
} else if (o instanceof MutableDouble || o instanceof MutableLong) {
return (Number)o;
} else if (o instanceof Double || o instanceof Float) {
return new MutableDouble((Number)o);
} else if (o instanceof Number) {
return new MutableLong((Number)o);
} else {
return new MutableDouble(o.toString());
}
}
@Override
@SuppressWarnings("unchecked")
public Object aggregate(Collection<?> countersList)
{
if (countersList.isEmpty()) {
return null;
}
BasicCounters<MutableLong> tempFileCounters = (BasicCounters<MutableLong>)countersList.iterator().next();
MutableLong globalProcessedFiles = tempFileCounters.getCounter(FileCounters.GLOBAL_PROCESSED_FILES);
MutableLong globalNumberOfFailures = tempFileCounters.getCounter(FileCounters.GLOBAL_NUMBER_OF_FAILURES);
MutableLong globalNumberOfRetries = tempFileCounters.getCounter(FileCounters.GLOBAL_NUMBER_OF_RETRIES);
totalLocalProcessedFiles.setValue(0);
pendingFiles.setValue(0);
totalLocalNumberOfFailures.setValue(0);
totalLocalNumberOfRetries.setValue(0);
for (Object fileCounters : countersList) {
BasicCounters<MutableLong> basicFileCounters = (BasicCounters<MutableLong>)fileCounters;
totalLocalProcessedFiles.add(basicFileCounters.getCounter(FileCounters.LOCAL_PROCESSED_FILES));
pendingFiles.add(basicFileCounters.getCounter(FileCounters.PENDING_FILES));
totalLocalNumberOfFailures.add(basicFileCounters.getCounter(FileCounters.LOCAL_NUMBER_OF_FAILURES));
totalLocalNumberOfRetries.add(basicFileCounters.getCounter(FileCounters.LOCAL_NUMBER_OF_RETRIES));
}
globalProcessedFiles.add(totalLocalProcessedFiles);
globalProcessedFiles.subtract(pendingFiles);
globalNumberOfFailures.add(totalLocalNumberOfFailures);
globalNumberOfRetries.add(totalLocalNumberOfRetries);
BasicCounters<MutableLong> aggregatedCounters = new BasicCounters<MutableLong>(MutableLong.class);
aggregatedCounters.setCounter(AggregatedFileCounters.PROCESSED_FILES, globalProcessedFiles);
aggregatedCounters.setCounter(AggregatedFileCounters.PENDING_FILES, pendingFiles);
aggregatedCounters.setCounter(AggregatedFileCounters.NUMBER_OF_ERRORS, totalLocalNumberOfFailures);
aggregatedCounters.setCounter(AggregatedFileCounters.NUMBER_OF_RETRIES, totalLocalNumberOfRetries);
return aggregatedCounters;
}
public FileSplitter()
{
currentWindowRecoveryState = Lists.newLinkedList();
fileCounters = new BasicCounters<MutableLong>(MutableLong.class);
windowDataManager = new WindowDataManager.NoopWindowDataManager();
scanner = new TimeBasedDirectoryScanner();
blocksThreshold = Integer.MAX_VALUE;
}
@Override
public void setup(Context.OperatorContext context)
{
Preconditions.checkArgument(!scanner.files.isEmpty(), "empty files");
Preconditions.checkArgument(blockSize == null || blockSize > 0, "invalid block size");
operatorId = context.getId();
this.context = context;
fileCounters.setCounter(Counters.PROCESSED_FILES, new MutableLong());
windowDataManager.setup(context);
try {
fs = scanner.getFSInstance();
} catch (IOException e) {
throw new RuntimeException("creating fs", e);
}
if (blockSize == null) {
blockSize = fs.getDefaultBlockSize(new Path(scanner.files.iterator().next()));
}
if (context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) < windowDataManager.getLargestCompletedWindow()) {
blockMetadataIterator = null;
} else {
//don't setup scanner while recovery
scanner.setup(context);
}
}
public AbstractBlockReader()
{
maxReaders = 16;
minReaders = 1;
intervalMillis = 2 * 60 * 1000L;
response = new StatsListener.Response();
backlogPerOperator = Maps.newHashMap();
partitionCount = 1;
counters = new BasicCounters<>(MutableLong.class);
collectStats = true;
lastBlockOpenTime = -1;
}
@Override
public void setup(Context.OperatorContext context)
{
operatorId = context.getId();
LOG.debug("{}: partition keys {} mask {}", operatorId, partitionKeys, partitionMask);
this.context = context;
counters.setCounter(ReaderCounterKeys.BLOCKS, new MutableLong());
counters.setCounter(ReaderCounterKeys.RECORDS, new MutableLong());
counters.setCounter(ReaderCounterKeys.BYTES, new MutableLong());
counters.setCounter(ReaderCounterKeys.TIME, new MutableLong());
sleepTimeMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
}
public AbstractJMSInputOperator()
{
counters = new BasicCounters<MutableLong>(MutableLong.class);
throwable = new AtomicReference<Throwable>();
pendingAck = Sets.newHashSet();
windowDataManager = new FSWindowDataManager();
lock = new Lock();
//Recovery state is a linked hash map to maintain the order of tuples.
currentWindowRecoveryState = Maps.newLinkedHashMap();
holdingBuffer = new ArrayBlockingQueue<Message>(bufferSize)
{
private static final long serialVersionUID = 201411151139L;
@SuppressWarnings("Contract")
@Override
public boolean add(Message message)
{
synchronized (lock) {
try {
return messageConsumed(message) && super.add(message);
} catch (JMSException e) {
LOG.error("message consumption", e);
throwable.set(e);
throw new RuntimeException(e);
}
}
}
};
}
@Override
public void setup(OperatorContext context)
{
this.context = context;
spinMillis = context.getValue(OperatorContext.SPIN_MILLIS);
counters.setCounter(CounterKeys.RECEIVED, new MutableLong());
counters.setCounter(CounterKeys.REDELIVERED, new MutableLong());
windowDataManager.setup(context);
}
/**
* This method is called when a message is added to {@link #holdingBuffer} and can be overwritten by subclasses
* if required. This is called by the JMS thread not Operator thread.
*
* @param message
* @return message is accepted.
* @throws javax.jms.JMSException
*/
protected boolean messageConsumed(Message message) throws JMSException
{
if (message.getJMSRedelivered() && pendingAck.contains(message.getJMSMessageID())) {
counters.getCounter(CounterKeys.REDELIVERED).increment();
LOG.warn("IGNORING: Redelivered Message {}", message.getJMSMessageID());
return false;
}
pendingAck.add(message.getJMSMessageID());
MutableLong receivedCt = counters.getCounter(CounterKeys.RECEIVED);
receivedCt.increment();
LOG.debug("message id: {} buffer size: {} received: {}", message.getJMSMessageID(), holdingBuffer.size(),
receivedCt.longValue());
return true;
}
ReaderStats(int backlog, long readBlocks, long bytes, long time)
{
BasicCounters<MutableLong> bc = new BasicCounters<>(MutableLong.class);
bc.setCounter(AbstractBlockReader.ReaderCounterKeys.BLOCKS, new MutableLong(readBlocks));
bc.setCounter(AbstractBlockReader.ReaderCounterKeys.BYTES, new MutableLong(bytes));
bc.setCounter(AbstractBlockReader.ReaderCounterKeys.TIME, new MutableLong(time));
counters = bc;
PortStats portStats = new PortStats("blocks");
portStats.queueSize = backlog;
inputPorts = Lists.newArrayList(portStats);
}
@Override
public void onTraversalStart() {
Set<VCFHeaderLine> defaultToolHeaderLines = getDefaultToolVCFHeaderLines();
for (final ConcordanceState state : ConcordanceState.values()) {
snpCounts.put(state, new MutableLong(0));
indelCounts.put(state, new MutableLong(0));
}
final VCFHeader evalHeader = getEvalHeader();
if (truePositivesAndFalseNegativesVcf != null) {
truePositivesAndFalseNegativesVcfWriter = createVCFWriter(truePositivesAndFalseNegativesVcf);
final VCFHeader truthHeader = getTruthHeader();
truthHeader.addMetaDataLine(TRUTH_STATUS_HEADER_LINE);
defaultToolHeaderLines.forEach(truthHeader::addMetaDataLine);
truePositivesAndFalseNegativesVcfWriter.writeHeader(truthHeader);
}
if (truePositivesAndFalsePositivesVcf != null) {
truePositivesAndFalsePositivesVcfWriter = createVCFWriter(truePositivesAndFalsePositivesVcf);
defaultToolHeaderLines.forEach(evalHeader::addMetaDataLine);
evalHeader.addMetaDataLine(TRUTH_STATUS_HEADER_LINE);
truePositivesAndFalsePositivesVcfWriter.writeHeader(evalHeader);
}
if (filteredTrueNegativesAndFalseNegativesVcf != null) {
filteredTrueNegativesAndFalseNegativesVcfWriter = createVCFWriter(filteredTrueNegativesAndFalseNegativesVcf);
evalHeader.addMetaDataLine(TRUTH_STATUS_HEADER_LINE);
defaultToolHeaderLines.forEach(evalHeader::addMetaDataLine);
filteredTrueNegativesAndFalseNegativesVcfWriter.writeHeader(evalHeader);
}
final List<String> filtersInVcf = evalHeader.getFilterLines().stream().map(VCFFilterHeaderLine::getID).collect(Collectors.toList());
filtersInVcf.forEach(filter -> filterAnalysisRecords.put(filter, new FilterAnalysisRecord(filter, 0,0,0,0)));
}
private void mergeTimestamps(Attribute<?> other, MutableLong ts) {
// if this is a set of attributes, then examine each one. Note not recursing on a Document as it should have already applied the shard time.
if (other instanceof AttributeBag) {
// recurse on the sub attributes
for (Attribute<?> attribute : ((AttributeBag<?>) other).getAttributes()) {
mergeTimestamps(attribute, ts);
}
} else if (other.isMetadataSet()) {
// if this is the first attribute being merged
if (ts.longValue() == Long.MAX_VALUE) {
// if we know the shard time
if (shardTimestamp != Long.MAX_VALUE) {
// if the timestamp is outside of the shard's date, then set to the end of the day
if (other.getTimestamp() < shardTimestamp) {
log.error("Found an attribute of a document with a timestamp prior to the shardId date! " + shardTimestamp + " vs " + other);
ts.setValue(shardTimestamp + ONE_DAY_MS - 1);
} else if (other.getTimestamp() >= (shardTimestamp + ONE_DAY_MS)) {
log.debug("Found an attribute of a document with a timestamp ofter the shardId date, ignoring: " + shardTimestamp + " vs " + other);
ts.setValue(shardTimestamp + ONE_DAY_MS - 1);
}
// else simply use the new timestamp
else {
ts.setValue(other.getTimestamp());
}
} else {
ts.setValue(other.getTimestamp());
}
}
// else this is not the first attribute being merged
else {
// if we know the shard time
if (shardTimestamp != Long.MAX_VALUE) {
// if the new timestamp is before the shard's date, then ignore it
if (other.getTimestamp() < shardTimestamp) {
log.error("Found an attribute of a document with a timestamp prior to the shardId date! " + other);
}
// else update the timestamp with the min value
else {
ts.setValue(Math.min(ts.longValue(), other.getTimestamp()));
}
}
// else update the timestamp with the min value
else {
ts.setValue(Math.min(ts.longValue(), other.getTimestamp()));
}
}
}
}
@Override
protected MutableLong initialValue() {
return new MutableLong();
}
@Override
protected MutableLong initialValue() {
return new MutableLong();
}
@Override
public void populateDAG(DAG dag, Configuration conf)
{
String eventSchema = SchemaUtils.jarResourceFileToString(EVENT_SCHEMA);
if (inputGenerator == null) {
JsonSalesGenerator input = dag.addOperator("InputGenerator", JsonSalesGenerator.class);
input.setEventSchemaJSON(eventSchema);
inputGenerator = input;
} else {
dag.addOperator("InputGenerator", inputGenerator);
}
JsonToMapConverter converter = dag.addOperator("Converter", JsonToMapConverter.class);
EnrichmentOperator enrichmentOperator = dag.addOperator("Enrichment", EnrichmentOperator.class);
DimensionsComputationFlexibleSingleSchemaMap dimensions = dag.addOperator("DimensionsComputation",
DimensionsComputationFlexibleSingleSchemaMap.class);
dag.getMeta(dimensions).getAttributes().put(Context.OperatorContext.APPLICATION_WINDOW_COUNT, 4);
AppDataSingleSchemaDimensionStoreHDHT store = dag.addOperator("Store", AppDataSingleSchemaDimensionStoreHDHT.class);
String basePath = conf.get(PROP_STORE_PATH);
TFileImpl hdsFile = new TFileImpl.DTFileImpl();
basePath += System.currentTimeMillis();
hdsFile.setBasePath(basePath);
store.setFileStore(hdsFile);
dag.setAttribute(store, Context.OperatorContext.COUNTERS_AGGREGATOR,
new BasicCounters.LongAggregator<MutableLong>());
String dimensionalSchema = SchemaUtils.jarResourceFileToString(DIMENSIONAL_SCHEMA);
dimensions.setConfigurationSchemaJSON(eventSchema);
Map<String, String> fieldToMapField = Maps.newHashMap();
dimensions.setValueNameAliases(fieldToMapField);
dimensions.setUnifier(new DimensionsComputationUnifierImpl<InputEvent, Aggregate>());
dag.getMeta(dimensions).getMeta(dimensions.output).getUnifierMeta().getAttributes().put(OperatorContext.MEMORY_MB,
8092);
store.setConfigurationSchemaJSON(eventSchema);
store.setDimensionalSchemaStubJSON(dimensionalSchema);
PubSubWebSocketAppDataQuery wsIn = new PubSubWebSocketAppDataQuery();
store.setEmbeddableQueryInfoProvider(wsIn);
PubSubWebSocketAppDataResult wsOut = dag.addOperator("QueryResult", new PubSubWebSocketAppDataResult());
dag.addStream("InputStream", inputGenerator.getOutputPort(), converter.input);
dag.addStream("EnrichmentStream", converter.outputMap, enrichmentOperator.inputPort);
dag.addStream("ConvertStream", enrichmentOperator.outputPort, dimensions.input);
dag.addStream("DimensionalData", dimensions.output, store.input);
dag.addStream("QueryResult", store.queryResult, wsOut.input).setLocality(Locality.CONTAINER_LOCAL);
}
protected void populateCdrGeoDAG(DAG dag, Configuration conf,
List<DefaultInputPort<? super EnrichedCDR>> enrichedStreamSinks)
{
// dimension
DimensionsComputationFlexibleSingleSchemaPOJO dimensions = dag.addOperator("TagNetworkGeoLocations",
DimensionsComputationFlexibleSingleSchemaPOJO.class);
dag.getMeta(dimensions).getAttributes().put(Context.OperatorContext.APPLICATION_WINDOW_COUNT, 4);
dag.getMeta(dimensions).getAttributes().put(Context.OperatorContext.CHECKPOINT_WINDOW_COUNT, 4);
enrichedStreamSinks.add(dimensions.input);
// Set operator properties
// key expression: Point( Lat, Lon )
{
Map<String, String> keyToExpression = Maps.newHashMap();
keyToExpression.put("zipcode", "getZipCode()");
keyToExpression.put("region", "getRegionZip2()");
keyToExpression.put("time", "getTime()");
dimensions.setKeyToExpression(keyToExpression);
}
// aggregate expression: disconnect and downloads
{
Map<String, String> aggregateToExpression = Maps.newHashMap();
aggregateToExpression.put("disconnectCount", "getDisconnectCount()");
aggregateToExpression.put("downloadBytes", "getBytes()");
aggregateToExpression.put("lat", "getLat()");
aggregateToExpression.put("lon", "getLon()");
dimensions.setAggregateToExpression(aggregateToExpression);
}
// event schema
String cdrGeoSchema = SchemaUtils.jarResourceFileToString(cdrGeoSchemaLocation);
dimensions.setConfigurationSchemaJSON(cdrGeoSchema);
dimensions.setUnifier(new DimensionsComputationUnifierImpl<InputEvent, Aggregate>());
dag.getMeta(dimensions).getMeta(dimensions.output).getUnifierMeta().getAttributes().put(OperatorContext.MEMORY_MB,
8092);
// store
//AppDataSingleSchemaDimensionStoreHDHT store = dag.addOperator("StoreNetworkTaggedGeoLocations", AppDataSingleSchemaDimensionStoreHDHT.class);
GeoDimensionStore store = dag.addOperator("StoreNetworkTaggedGeoLocations", GeoDimensionStore.class);
store.setUpdateEnumValues(true);
String basePath = Preconditions.checkNotNull(conf.get(PROP_GEO_STORE_PATH),
"GEO base path should be specified in the properties.xml");
TFileImpl hdsFile = new TFileImpl.DTFileImpl();
basePath += System.currentTimeMillis();
hdsFile.setBasePath(basePath);
store.setFileStore(hdsFile);
store.setConfigurationSchemaJSON(cdrGeoSchema);
dag.setAttribute(store, Context.OperatorContext.COUNTERS_AGGREGATOR,
new BasicCounters.LongAggregator<MutableLong>());
PubSubWebSocketAppDataQuery query = createAppDataQuery();
URI queryUri = ConfigUtil.getAppDataQueryPubSubURI(dag, conf);
query.setUri(queryUri);
store.setEmbeddableQueryInfoProvider(query);
if (cdrGeoStorePartitionCount > 1) {
store.setPartitionCount(cdrGeoStorePartitionCount);
store.setQueryResultUnifier(new DimensionStoreHDHTNonEmptyQueryResultUnifier());
}
// wsOut
PubSubWebSocketAppDataResult wsOut = createAppDataResult();
wsOut.setUri(queryUri);
dag.addOperator("CDRGeoQueryResult", wsOut);
// Set remaining dag options
dag.setAttribute(store, Context.OperatorContext.COUNTERS_AGGREGATOR,
new BasicCounters.LongAggregator<MutableLong>());
dag.addStream("CDRGeoStream", dimensions.output, store.input);
dag.addStream("CDRGeoQueryResult", store.queryResult, wsOut.input);
}
protected void populateCsGeoDAG(DAG dag, Configuration conf,
List<DefaultInputPort<? super EnrichedCustomerService>> customerServiceStreamSinks)
{
// dimension
DimensionsComputationFlexibleSingleSchemaPOJO dimensions = dag.addOperator("TagServiceGeoLocations",
DimensionsComputationFlexibleSingleSchemaPOJO.class);
dag.getMeta(dimensions).getAttributes().put(Context.OperatorContext.APPLICATION_WINDOW_COUNT, 4);
dag.getMeta(dimensions).getAttributes().put(Context.OperatorContext.CHECKPOINT_WINDOW_COUNT, 4);
customerServiceStreamSinks.add(dimensions.input);
// Set operator properties
// key expression: Point( Lat, Lon )
{
Map<String, String> keyToExpression = Maps.newHashMap();
keyToExpression.put("zipcode", "getZipCode()");
keyToExpression.put("region", "getRegionZip2()");
keyToExpression.put("time", "getTime()");
dimensions.setKeyToExpression(keyToExpression);
}
// aggregate expression: disconnect and downloads
{
Map<String, String> aggregateToExpression = Maps.newHashMap();
aggregateToExpression.put("wait", "getWait()");
aggregateToExpression.put("lat", "getLat()");
aggregateToExpression.put("lon", "getLon()");
dimensions.setAggregateToExpression(aggregateToExpression);
}
// event schema
String geoSchema = SchemaUtils.jarResourceFileToString(csGeoSchemaLocation);
dimensions.setConfigurationSchemaJSON(geoSchema);
dimensions.setUnifier(new DimensionsComputationUnifierImpl<InputEvent, Aggregate>());
dag.getMeta(dimensions).getMeta(dimensions.output).getUnifierMeta().getAttributes().put(OperatorContext.MEMORY_MB,
8092);
// store
//AppDataSingleSchemaDimensionStoreHDHT store = dag.addOperator("StoreTaggedServiceGeoLocations", AppDataSingleSchemaDimensionStoreHDHT.class);
GeoDimensionStore store = dag.addOperator("StoreTaggedServiceGeoLocations", GeoDimensionStore.class);
store.setUpdateEnumValues(true);
String basePath = Preconditions.checkNotNull(conf.get(PROP_GEO_STORE_PATH),
"GEO base path should be specified in the properties.xml");
TFileImpl hdsFile = new TFileImpl.DTFileImpl();
basePath += System.currentTimeMillis();
hdsFile.setBasePath(basePath);
store.setFileStore(hdsFile);
store.setConfigurationSchemaJSON(geoSchema);
dag.setAttribute(store, Context.OperatorContext.COUNTERS_AGGREGATOR,
new BasicCounters.LongAggregator<MutableLong>());
PubSubWebSocketAppDataQuery query = createAppDataQuery();
URI queryUri = ConfigUtil.getAppDataQueryPubSubURI(dag, conf);
query.setUri(queryUri);
store.setEmbeddableQueryInfoProvider(query);
if (csGeoStorePartitionCount > 1) {
store.setPartitionCount(csGeoStorePartitionCount);
store.setQueryResultUnifier(new DimensionStoreHDHTNonEmptyQueryResultUnifier());
}
// wsOut
PubSubWebSocketAppDataResult wsOut = createAppDataResult();
wsOut.setUri(queryUri);
dag.addOperator("CSGeoQueryResult", wsOut);
// Set remaining dag options
dag.setAttribute(store, Context.OperatorContext.COUNTERS_AGGREGATOR,
new BasicCounters.LongAggregator<MutableLong>());
dag.addStream("CSGeoStream", dimensions.output, store.input);
dag.addStream("CSGeoQueryResult", store.queryResult, wsOut.input);
}
@Override
public void populateDAG(DAG dag, Configuration conf)
{
//Set input properties
String eventSchema = SchemaUtils.jarResourceFileToString(eventSchemaLocation);
//input
if (inputOperator == null) {
inputOperator = new EnrichedCDRHbaseInputOperator();
}
dag.addOperator("InputGenerator", inputOperator);
//dimension
DimensionsComputationFlexibleSingleSchemaPOJO dimensions = dag.addOperator("DimensionsComputation",
DimensionsComputationFlexibleSingleSchemaPOJO.class);
dag.getMeta(dimensions).getAttributes().put(Context.OperatorContext.APPLICATION_WINDOW_COUNT, 4);
dag.getMeta(dimensions).getAttributes().put(Context.OperatorContext.CHECKPOINT_WINDOW_COUNT, 4);
//Set operator properties
//key expression
{
Map<String, String> keyToExpression = Maps.newHashMap();
keyToExpression.put("imsi", "getImsi()");
keyToExpression.put("Carrier", "getOperatorCode()");
keyToExpression.put("imei", "getImei()");
dimensions.setKeyToExpression(keyToExpression);
}
EnrichedCDR cdr = new EnrichedCDR();
cdr.getOperatorCode();
cdr.getDuration();
//aggregate expression
{
Map<String, String> aggregateToExpression = Maps.newHashMap();
aggregateToExpression.put("duration", "getDuration()");
aggregateToExpression.put("terminatedAbnomally", "getTerminatedAbnomally()");
aggregateToExpression.put("terminatedNomally", "getTerminatedNomally()");
aggregateToExpression.put("called", "getCalled()");
dimensions.setAggregateToExpression(aggregateToExpression);
}
//event schema
dimensions.setConfigurationSchemaJSON(eventSchema);
dimensions.setUnifier(new DimensionsComputationUnifierImpl<InputEvent, Aggregate>());
dag.getMeta(dimensions).getMeta(dimensions.output).getUnifierMeta().getAttributes().put(OperatorContext.MEMORY_MB,
8092);
//store
AppDataSingleSchemaDimensionStoreHDHT store = dag.addOperator("Store", AppDataSingleSchemaDimensionStoreHDHT.class);
String basePath = conf.get(PROP_STORE_PATH);
if (basePath == null || basePath.isEmpty()) {
basePath = Preconditions.checkNotNull(conf.get(PROP_STORE_PATH),
"base path should be specified in the properties.xml");
}
TFileImpl hdsFile = new TFileImpl.DTFileImpl();
basePath += System.currentTimeMillis();
hdsFile.setBasePath(basePath);
store.setFileStore(hdsFile);
dag.setAttribute(store, Context.OperatorContext.COUNTERS_AGGREGATOR,
new BasicCounters.LongAggregator<MutableLong>());
store.setConfigurationSchemaJSON(eventSchema);
//store.setDimensionalSchemaStubJSON(eventSchema);
PubSubWebSocketAppDataQuery query = createAppDataQuery();
store.setEmbeddableQueryInfoProvider(query);
//wsOut
PubSubWebSocketAppDataResult wsOut = createAppDataResult();
dag.addOperator("QueryResult", wsOut);
//Set remaining dag options
dag.setAttribute(store, Context.OperatorContext.COUNTERS_AGGREGATOR,
new BasicCounters.LongAggregator<MutableLong>());
dag.addStream("InputStream", inputOperator.outputPort, dimensions.input).setLocality(Locality.CONTAINER_LOCAL);
dag.addStream("DimensionalData", dimensions.output, store.input);
dag.addStream("QueryResult", store.queryResult, wsOut.input);
}
@Override
public void populateDAG(DAG dag, Configuration conf)
{
String propStorePath = "dt.application." + appName + ".operator.Store.fileStore.basePathPrefix";
//Declare operators
//Set input properties
String eventSchema = SchemaUtils.jarResourceFileToString(eventSchemaLocation);
if (inputOperator == null) {
InputItemGenerator input = dag.addOperator("InputGenerator", InputItemGenerator.class);
input.advertiserName = advertisers;
input.setEventSchemaJSON(eventSchema);
inputOperator = input;
} else {
dag.addOperator("InputGenerator", inputOperator);
}
DimensionsComputationFlexibleSingleSchemaPOJO dimensions = dag.addOperator("DimensionsComputation", DimensionsComputationFlexibleSingleSchemaPOJO.class);
dag.getMeta(dimensions).getAttributes().put(Context.OperatorContext.APPLICATION_WINDOW_COUNT, 4);
dag.getMeta(dimensions).getAttributes().put(Context.OperatorContext.CHECKPOINT_WINDOW_COUNT, 4);
AppDataSingleSchemaDimensionStoreHDHT store = dag.addOperator("Store", AppDataSingleSchemaDimensionStoreHDHT.class);
//Set operator properties
Map<String, String> keyToExpression = Maps.newHashMap();
keyToExpression.put("publisher", "getPublisher()");
keyToExpression.put("advertiser", "getAdvertiser()");
keyToExpression.put("location", "getLocation()");
keyToExpression.put("time", "getTime()");
Map<String, String> aggregateToExpression = Maps.newHashMap();
aggregateToExpression.put("cost", "getCost()");
aggregateToExpression.put("revenue", "getRevenue()");
aggregateToExpression.put("impressions", "getImpressions()");
aggregateToExpression.put("clicks", "getClicks()");
dimensions.setKeyToExpression(keyToExpression);
dimensions.setAggregateToExpression(aggregateToExpression);
dimensions.setConfigurationSchemaJSON(eventSchema);
dimensions.setUnifier(new DimensionsComputationUnifierImpl<InputEvent, Aggregate>());
dag.getMeta(dimensions).getMeta(dimensions.output).getUnifierMeta().getAttributes().put(OperatorContext.MEMORY_MB, 8092);
//Set store properties
String basePath = Preconditions.checkNotNull(conf.get(propStorePath),
"a base path should be specified in the properties.xml");
TFileImpl hdsFile = new TFileImpl.DTFileImpl();
basePath += Path.SEPARATOR + System.currentTimeMillis();
hdsFile.setBasePath(basePath);
store.setFileStore(hdsFile);
store.getResultFormatter().setContinuousFormatString("#.00");
store.setConfigurationSchemaJSON(eventSchema);
store.setEmbeddableQueryInfoProvider(new PubSubWebSocketAppDataQuery());
PubSubWebSocketAppDataResult wsOut = dag.addOperator("QueryResult", new PubSubWebSocketAppDataResult());
//Set remaining dag options
dag.setAttribute(store, Context.OperatorContext.COUNTERS_AGGREGATOR, new BasicCounters.LongAggregator<MutableLong>());
dag.addStream("InputStream", inputOperator.getOutputPort(), dimensions.input).setLocality(Locality.CONTAINER_LOCAL);
dag.addStream("DimensionalData", dimensions.output, store.input);
dag.addStream("QueryResult", store.queryResult, wsOut.input);
}
@Override
public void populateDAG(DAG dag, Configuration conf)
{
//Declare operators
InputItemGenerator input = dag.addOperator("InputGenerator", InputItemGenerator.class);
DimensionsComputation<AdInfo, AdInfo.AdInfoAggregateEvent> dimensions = dag.addOperator("DimensionsComputation", new DimensionsComputation<AdInfo, AdInfo.AdInfoAggregateEvent>());
DimensionsComputationUnifierImpl<AdInfo, AdInfo.AdInfoAggregateEvent> unifier = new DimensionsComputationUnifierImpl<AdInfo, AdInfo.AdInfoAggregateEvent>();
dimensions.setUnifier(unifier);
dag.getMeta(dimensions).getAttributes().put(Context.OperatorContext.APPLICATION_WINDOW_COUNT, 10);
AdsConverter adsConverter = dag.addOperator("AdsConverter", new AdsConverter());
AppDataSingleSchemaDimensionStoreHDHT store = dag.addOperator("Store", AppDataSingleSchemaDimensionStoreHDHT.class);
String eventSchema = SchemaUtils.jarResourceFileToString(EVENT_SCHEMA);
input.setEventSchemaJSON(eventSchema);
String[] dimensionSpecs = new String[] {
"time=" + TimeUnit.MINUTES, "time=" + TimeUnit.MINUTES + ":location",
"time=" + TimeUnit.MINUTES + ":advertiser", "time=" + TimeUnit.MINUTES + ":publisher",
"time=" + TimeUnit.MINUTES + ":advertiser:location", "time=" + TimeUnit.MINUTES + ":publisher:location",
"time=" + TimeUnit.MINUTES + ":publisher:advertiser",
"time=" + TimeUnit.MINUTES + ":publisher:advertiser:location" };
//Set operator properties
AdInfoAggregator[] aggregators = new AdInfoAggregator[dimensionSpecs.length];
//Set input properties
input.setEventSchemaJSON(eventSchema);
for (int index = 0; index < dimensionSpecs.length; index++) {
String dimensionSpec = dimensionSpecs[index];
AdInfoAggregator aggregator = new AdInfoAggregator();
aggregator.init(dimensionSpec, index);
aggregators[index] = aggregator;
}
unifier.setAggregators(aggregators);
dimensions.setAggregators(aggregators);
dag.getMeta(dimensions).getMeta(dimensions.output).getUnifierMeta().getAttributes().put(OperatorContext.MEMORY_MB, 8092);
//Configuring the converter
adsConverter.setEventSchemaJSON(eventSchema);
adsConverter.setDimensionSpecs(dimensionSpecs);
//Set store properties
String basePath = Preconditions.checkNotNull(conf.get(PROP_STORE_PATH),
"a base path should be specified in the properties.xml");
TFileImpl hdsFile = new TFileImpl.DTFileImpl();
basePath += Path.SEPARATOR + System.currentTimeMillis();
hdsFile.setBasePath(basePath);
store.setFileStore(hdsFile);
store.getResultFormatter().setContinuousFormatString("#.00");
store.setConfigurationSchemaJSON(eventSchema);
PubSubWebSocketAppDataQuery wsIn = new PubSubWebSocketAppDataQuery();
store.setEmbeddableQueryInfoProvider(wsIn);
PubSubWebSocketAppDataResult wsOut = dag.addOperator("QueryResult", new PubSubWebSocketAppDataResult());
//Set remaining dag options
dag.setAttribute(store, Context.OperatorContext.COUNTERS_AGGREGATOR, new BasicCounters.LongAggregator<MutableLong>());
dag.addStream("InputStream", input.outputPort, dimensions.data).setLocality(Locality.CONTAINER_LOCAL);
dag.addStream("DimensionalData", dimensions.output, adsConverter.inputPort);
dag.addStream("Converter", adsConverter.outputPort, store.input);
dag.addStream("QueryResult", store.queryResult, wsOut.input);
}