下面列出了org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer#org.apache.accumulo.core.data.Value 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
// delegates partitioning
public int getPartition(BulkIngestKey key, Value value, int numPartitions) {
Text tableName = key.getTableName();
Partitioner<BulkIngestKey,Value> partitioner = partitionerCache.getPartitioner(tableName);
int partition = partitioner.getPartition(key, value, numPartitions);
Integer offset = this.tableOffsets.get(tableName);
if (null != offset) {
return (offset + partition) % numPartitions;
} else {
return partition % numPartitions;
}
}
@Override
public void init(
final SortedKeyValueIterator<Key, Value> source,
final Map<String, String> options,
final IteratorEnvironment env) throws IOException {
super.init(source, options, env);
if ((options == null) || (!options.containsKey(PRIMARY_INDEX_ID))) {
throw new IllegalArgumentException(
"Arguments must be set for " + SecondaryIndexQueryFilterIterator.class.getName());
}
if (options.containsKey(FILTERS)) {
final String filterStr = options.get(FILTERS);
final byte[] filterBytes = ByteArrayUtils.byteArrayFromString(filterStr);
filter = (QueryFilter) URLClassloaderUtils.fromBinary(filterBytes);
}
primaryIndexId = options.get(PRIMARY_INDEX_ID);
}
@Override
public boolean hasNext() {
// Assign the next key. Ideally, the first iteration has already been performed.
final Entry<Key,Value> next = this.nextSeek;
// If not, however, perform the first iteration to find out if there's at least one applicable key
if (next == UNINITIALIZED_KEY) {
try {
this.seekNext(true);
} catch (IOException e) {
QueryException qe = new QueryException(DatawaveErrorCode.HAS_NEXT_ELEMENT_ERROR, e);
throw new DatawaveFatalQueryException(qe);
}
}
return (this.nextSeek != ITERATOR_COMPLETE_KEY);
}
/**
* Applies the table configuration if one is specified.
*
* @param topIter
* @param conf
* @return
* @throws IOException
*/
protected SortedKeyValueIterator<Key,Value> applyTableIterators(SortedKeyValueIterator<Key,Value> topIter, Configuration conf) throws IOException {
if (null != acuTableConf) {
// don't need to be populated as we'll do this later.
RFileEnvironment iterEnv = new RFileEnvironment();
List<IterInfo> serverSideIteratorList = Collections.emptyList();
Map<String,Map<String,String>> serverSideIteratorOptions = Collections.emptyMap();
byte[] defaultSecurityLabel;
ColumnVisibility cv = new ColumnVisibility(acuTableConf.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY));
defaultSecurityLabel = cv.getExpression();
SortedKeyValueIterator<Key,Value> visFilter = VisibilityFilter.wrap(topIter, auths, defaultSecurityLabel);
return IteratorUtil.loadIterators(IteratorScope.scan, visFilter, null, acuTableConf, serverSideIteratorList, serverSideIteratorOptions, iterEnv,
false);
}
return topIter;
}
public static KeyValue getDateIndexEntry(String shardDate, int[] shardIndicies, String dataType, String type, String dateField, String dateValue,
ColumnVisibility visibility) throws ParseException {
// The row is the date to index yyyyMMdd
// the colf is the type (e.g. LOAD or ACTIVITY)
// the colq is the event date yyyyMMdd \0 the datatype \0 the field name
String colq = shardDate + '\0' + dataType + '\0' + dateField;
// the value is a bitset denoting the shard
BitSet bits = DateIndexUtil.getBits(shardIndicies[0]);
for (int i = 1; i < shardIndicies.length; i++) {
bits = DateIndexUtil.merge(bits, DateIndexUtil.getBits(shardIndicies[i]));
}
Value shardList = new Value(bits.toByteArray());
// create the key
Key key = new Key(dateValue, type, colq, visibility, DateIndexUtil.getBeginDate(dateValue).getTime());
return new KeyValue(key, shardList);
}
@Test
public void assertNexts() throws IOException {
SourceManager manager = new SourceManager(counter);
manager.setInitialSize(10);
Collection<SortedKeyValueIterator<Key,Value>> kvList = Lists.newArrayList();
for (int i = 0; i < 500; i++) {
kvList.add(manager.deepCopy(null));
}
for (SortedKeyValueIterator<Key,Value> kv : kvList) {
kv.seek(new Range(), Collections.emptyList(), false);
kv.next();
}
assertEquals(500, counter.nextCalls);
assertEquals(10, counter.counter);
}
public static Mutation createMutation(TripleRow tripleRow) {
Mutation mutation = new Mutation(new Text(tripleRow.getRow()));
byte[] columnVisibility = tripleRow.getColumnVisibility();
ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new ColumnVisibility(columnVisibility);
Long timestamp = tripleRow.getTimestamp();
boolean hasts = timestamp != null;
timestamp = timestamp == null ? 0l : timestamp;
byte[] value = tripleRow.getValue();
Value v = value == null ? EMPTY_VALUE : new Value(value);
byte[] columnQualifier = tripleRow.getColumnQualifier();
Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier);
byte[] columnFamily = tripleRow.getColumnFamily();
Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily);
if (hasts) {
mutation.put(cfText, cqText, cv, timestamp, v);
} else {
mutation.put(cfText, cqText, cv, v);
}
return mutation;
}
private void runTest(boolean reseek, TreeMap<Key,Value> source, TreeMap<Key,Value> result,
Collection<ByteSequence> cols) throws IOException {
MapIterator src = new MapIterator(source);
SortedKeyValueIterator<Key,Value> iter = new ChunkCombiner();
iter.init(src, null, null);
iter = iter.deepCopy(null);
iter.seek(new Range(), cols, true);
TreeMap<Key,Value> seen = new TreeMap<>();
while (iter.hasTop()) {
assertFalse("already contains " + iter.getTopKey(), seen.containsKey(iter.getTopKey()));
seen.put(new Key(iter.getTopKey()), new Value(iter.getTopValue()));
if (reseek)
iter.seek(new Range(iter.getTopKey().followingKey(PartialKey.ROW_COLFAM_COLQUAL), true,
null, true), cols, true);
else
iter.next();
}
assertEquals(result, seen);
}
@Test
public void testInvalidValueType() throws Exception {
Logger log = Logger.getLogger(GlobalIndexUidAggregator.class);
Level origLevel = log.getLevel();
log.setLevel(Level.FATAL);
Collection<Value> values = Lists.newArrayList();
agg.reset();
Value val = new Value(UUID.randomUUID().toString().getBytes());
values.add(val);
Value result = agg.reduce(new Key("key"), values.iterator());
Uid.List resultList = Uid.List.parseFrom(result.get());
assertEquals(false, resultList.getIGNORE());
assertEquals(0, resultList.getUIDCount());
assertEquals(0, resultList.getCOUNT());
log.setLevel(origLevel);
}
/**
* Get a source copy from the source pool.
*
* @return a source
*/
protected SortedKeyValueIterator<Key,Value> takePoolSource() {
final SortedKeyValueIterator<Key,Value> source;
try {
source = ivaratorSourcePool.borrowObject();
} catch (Exception e) {
throw new IterationInterruptedException("Unable to borrow object from ivarator source pool. " + e.getMessage());
}
return source;
}
@Test
public void dataIntegrity_alternatingTest() throws IOException {
SourceManager manager = new SourceManager(dataIterator);
manager.setInitialSize(1);
// pre-seek both iterators
SortedKeyValueIterator<Key,Value> copy1 = manager.deepCopy(null);
copy1.seek(new Range(), Collections.emptyList(), false);
SortedKeyValueIterator<Key,Value> copy2 = manager.deepCopy(null);
copy2.seek(new Range(), Collections.emptyList(), false);
// re-seek
copy1.seek(new Range(), Collections.emptyList(), false);
copy2.seek(new Range(), Collections.emptyList(), false);
// alternating next loops
int alternatingCount = 0;
while (copy1.hasTop() && copy2.hasTop()) {
assertTrue(copy1.getTopKey().equals(copy2.getTopKey()));
alternatingCount++;
copy1.next();
copy2.next();
}
assertFalse(copy1.hasTop());
assertFalse(copy2.hasTop());
assertEquals(26, alternatingCount);
}
@Test
public void visitAnd_ExceededValueThresholdMarker_RegexLeadingWildcardTest() throws Exception {
ASTJexlScript script = JexlASTHelper.parseJexlQuery("BAZ == 'woot' && ((ExceededValueThresholdMarkerJexlNode = true) && (FOO =~ '.*foo'))");
Key hit = new Key("row", "dataType" + Constants.NULL + "123.345.456");
List<Map.Entry<Key,Value>> source = new ArrayList<>();
source.add(new AbstractMap.SimpleEntry(new Key("row", "fi" + Constants.NULL + "BAZ", "woot" + Constants.NULL + "dataType" + Constants.NULL
+ "123.345.456"), new Value()));
source.add(new AbstractMap.SimpleEntry(new Key("row", "tf", "dataType" + Constants.NULL + "123.345.456" + Constants.NULL + "cd" + Constants.NULL
+ "FOO"), new Value()));
source.add(new AbstractMap.SimpleEntry(new Key("row", "tf", "dataType" + Constants.NULL + "123.345.456" + Constants.NULL + "de" + Constants.NULL
+ "FOO"), new Value()));
source.add(new AbstractMap.SimpleEntry(new Key("row", "tf", "dataType" + Constants.NULL + "123.345.456" + Constants.NULL + "ddfoo" + Constants.NULL
+ "FOO"), new Value()));
source.add(new AbstractMap.SimpleEntry(new Key("row", "tf", "dataType" + Constants.NULL + "123.345.456" + Constants.NULL + "dzzzzfoo" + Constants.NULL
+ "FOO"), new Value()));
source.add(new AbstractMap.SimpleEntry(
new Key("row", "tf", "dataType" + Constants.NULL + "123.345.456" + Constants.NULL + "e" + Constants.NULL + "FOO"), new Value()));
Map<String,List<String>> expectedDocValues = new HashMap<>();
List<String> expectedValues = new ArrayList<>();
expectedValues.add("ddfoo");
expectedValues.add("dzzzzfoo");
expectedDocValues.put("FOO", expectedValues);
// leading wildcard match foo values must have doc including those values
vistAnd_ExceededValueThesholdMarkerJexlNode_termFrequencyTest(script, hit, source, true, expectedDocValues, false);
}
@Test
public void testHappyPath() throws Throwable {
Connector con = new InMemoryInstance("DiscoveryIteratorTest").getConnector("root", new PasswordToken(""));
con.tableOperations().create("index");
writeSample(con.createBatchWriter("index", new BatchWriterConfig().setMaxLatency(0, TimeUnit.SECONDS).setMaxMemory(0).setMaxWriteThreads(1)));
Scanner s = con.createScanner("index", new Authorizations("FOO"));
s.addScanIterator(new IteratorSetting(50, DiscoveryIterator.class));
s.setRange(new Range());
Iterator<Map.Entry<Key,Value>> itr = s.iterator();
assertTrue(itr.hasNext());
Map.Entry<Key,Value> e = itr.next();
assertFalse(itr.hasNext());
Key key = e.getKey();
assertEquals("term", key.getRow().toString());
assertEquals("field", key.getColumnFamily().toString());
// see DiscoveryIterator for why this has a max unsigned char tacked on the end
assertEquals("20130101\uffff", key.getColumnQualifier().toString());
Value value = e.getValue();
assertTrue(value.getSize() > 0);
DataInputBuffer in = new DataInputBuffer();
in.reset(value.get(), value.getSize());
ArrayWritable valWrapper = new ArrayWritable(DiscoveredThing.class);
valWrapper.readFields(in);
Writable[] values = valWrapper.get();
assertEquals(3, values.length);
Set<String> types = Sets.newHashSet("t1", "t2", "t3");
for (int i = 0; i < 3; ++i) {
DiscoveredThing thing = (DiscoveredThing) values[i];
assertEquals("term", thing.getTerm());
assertEquals("field", thing.getField());
assertTrue(types.remove(thing.getType()));
assertEquals("20130101", thing.getDate());
assertEquals("FOO", thing.getColumnVisibility());
assertEquals(240L, thing.getCount());
}
}
@Test
public void testFamilyMatch() throws IOException {
baseValues.add(new AbstractMap.SimpleImmutableEntry<>(generateFiKey("a.1"), new Value()));
baseIterator = new IteratorToSortedKeyValueIterator(baseValues.iterator());
iterator = new AncestorChildExpansionIterator(baseIterator, children, equality);
iterator.seek(new Range(), Collections.EMPTY_LIST, false);
Assert.assertTrue(iterator.hasTop());
Key topKey = iterator.getTopKey();
assertKey(topKey, "a.1");
Value topValue = iterator.getTopValue();
Assert.assertNotNull(topValue);
iterator.next();
Assert.assertTrue(iterator.hasTop());
topKey = iterator.getTopKey();
assertKey(topKey, "a.1.1");
topValue = iterator.getTopValue();
Assert.assertNotNull(topValue);
iterator.next();
Assert.assertTrue(iterator.hasTop());
topKey = iterator.getTopKey();
assertKey(topKey, "a.1.2");
topValue = iterator.getTopValue();
Assert.assertNotNull(topValue);
iterator.next();
Assert.assertTrue(iterator.hasTop());
topKey = iterator.getTopKey();
assertKey(topKey, "a.1.2.1");
topValue = iterator.getTopValue();
Assert.assertNotNull(topValue);
iterator.next();
Assert.assertFalse(iterator.hasTop());
}
/**
* allows a document specific range
*
* @param in
* @return
*/
private static Value getValueForBuilderFor(String... in) {
Uid.List.Builder builder = Uid.List.newBuilder();
for (String s : in) {
builder.addUID(s);
}
builder.setCOUNT(in.length);
builder.setIGNORE(false);
return new Value(builder.build().toByteArray());
}
@Test
public void apply_buildDocKeepFilteredOut() throws IOException, ParseException {
Document doc = new Document();
AttributeFactory attributeFactory = new AttributeFactory(new TypeMetadata());
TreeMap<Key,Value> treeMap = Maps.newTreeMap();
treeMap.put(getTF("123", "FIELD1", "VALUE1", "dataType1", "123.345.456", 10), new Value());
treeMap.put(getTF("123", "NEXT_DOC_FIELD", "VALUE1", "dataType1", "124.345.456", 10), new Value());
SortedKeyValueIterator<Key,Value> itr = new SortedMapIterator(treeMap);
itr.seek(new Range(), null, true);
Set<String> keepFields = new HashSet<>();
keepFields.add("FIELD2");
EventDataQueryFilter filter = new EventDataQueryFieldFilter(JexlASTHelper.parseJexlQuery("FIELD2 == 'VALUE1'"), Collections.EMPTY_SET);
aggregator = new TermFrequencyAggregator(keepFields, filter, -1);
Key result = aggregator.apply(itr, doc, attributeFactory);
// test result key
assertTrue(result == null);
// test that the doc is empty
assertTrue(doc.size() == 0);
// test that the iterator is in the correct position
assertTrue(itr.hasTop());
assertTrue(itr.getTopKey().equals(getTF("123", "NEXT_DOC_FIELD", "VALUE1", "dataType1", "124.345.456", 10)));
}
@Override
public Value getTopValue() {
try {
long count = 0;
while (source.hasTop()) {
count++;
source.next();
}
return new Value(LongCombiner.FIXED_LEN_ENCODER.encode(count));
} catch (IOException e) {
throw new VertexiumAccumuloIteratorException("could not iterate", e);
}
}
private void nextIterator() {
if (iterator.hasNext()) {
Entry<Key,Value> val = iterator.next();
topValue = val.getValue();
topKey = val.getKey();
} else
topKey = null;
}
private Value combineStandardKey(Key key, Iterator<Value> iter) {
EdgeValueBuilder builder = EdgeValue.newBuilder();
int combineCount = 0;
while (iter.hasNext()) {
Value value = iter.next();
try {
EdgeData.EdgeValue protoEdgeValue = EdgeData.EdgeValue.parseFrom(value.get());
if (protoEdgeValue.hasCount()) {
builder.setCount(protoEdgeValue.getCount() + builder.getCount());
}
if (protoEdgeValue.hasHourBitmask()) {
builder.combineBitmask(protoEdgeValue.getHourBitmask());
}
useEarliestLoadDate(key, builder, protoEdgeValue);
combineSourceAndSink(builder, protoEdgeValue);
useEarliestUuid(builder, protoEdgeValue);
combineBadActivityDate(builder, protoEdgeValue);
} catch (InvalidProtocolBufferException e) {
// Try to decode an old varint value
long count = new VarLenEncoder().decode(value.get());
builder.setCount(builder.getCount() + count);
}
combineCount++;
}
if (log.isTraceEnabled())
log.debug("Combined " + combineCount + " values.");
return builder.build().encode();
}
@Test
public void shouldHandleNullRawData() throws IOException, InterruptedException {
// some RecordReaders may null out raw data entirely because they pass data to their
// handlers in other ways. Verify that the EventMapper can handle this case.
record.setRawData(null);
eventMapper.setup(mapContext);
eventMapper.map(new LongWritable(1), record, mapContext);
eventMapper.cleanup(mapContext);
Multimap<BulkIngestKey,Value> written = TestContextWriter.getWritten();
// two fields mutations + LOAD_DATE + ORIG_FILE + RAW_FILE
assertEquals(5, written.size());
}
/**
* If passing the logic in, then the model being used by the logic then capture the reverse field mapping
*
* @param logic
* @param fields
*/
public UniqueTransform(BaseQueryLogic<Entry<Key,Value>> logic, Set<String> fields) {
this(fields);
QueryModel model = ((ShardQueryLogic) logic).getQueryModel();
if (model != null) {
modelMapping = HashMultimap.create();
// reverse the reverse query mapping which will give us a mapping from the final field name to the original field name(s)
for (Map.Entry<String,String> entry : model.getReverseQueryMapping().entrySet()) {
modelMapping.put(entry.getValue(), entry.getKey());
}
}
}
protected SortedKeyValueIterator<Key,Value> createIndexOnlyKey(ASTEQNode node) throws IOException {
Key newStartKey = getKey(node);
IdentifierOpLiteral op = JexlASTHelper.getIdentifierOpLiteral(node);
if (null == op || null == op.getLiteralValue()) {
// deep copy since this is likely a null literal
return source.deepCopy(env);
}
String fn = op.deconstructIdentifier();
String literal = String.valueOf(op.getLiteralValue());
if (log.isTraceEnabled()) {
log.trace("createIndexOnlyKey for " + fn + " " + literal + " " + newStartKey);
}
List<Entry<Key,Value>> kv = Lists.newArrayList();
if (null != limitedMap.get(Maps.immutableEntry(fn, literal))) {
kv.add(limitedMap.get(Maps.immutableEntry(fn, literal)));
} else {
SortedKeyValueIterator<Key,Value> mySource = limitedSource;
// if source size > 0, we are free to use up to that number for this query
if (source.getSourceSize() > 0)
mySource = source.deepCopy(env);
mySource.seek(new Range(newStartKey, true, newStartKey.followingKey(PartialKey.ROW_COLFAM_COLQUAL), false), Collections.emptyList(), false);
if (mySource.hasTop()) {
kv.add(Maps.immutableEntry(mySource.getTopKey(), Constants.NULL_VALUE));
}
}
return new IteratorToSortedKeyValueIterator(kv.iterator());
}
public static <T extends Query> T deserialize(String queryImplClassName, Text columnVisibility, Value value) throws InvalidProtocolBufferException,
ClassNotFoundException {
@SuppressWarnings("unchecked")
Class<T> queryClass = (Class<T>) Class.forName(queryImplClassName);
byte[] b = value.get();
Schema<T> schema = RuntimeSchema.getSchema(queryClass);
T queryImpl = schema.newMessage();
ProtobufIOUtil.mergeFrom(b, queryImpl, schema);
queryImpl.setColumnVisibility(columnVisibility.toString());
return queryImpl;
}
private void persistTestMutations(int numRows, int entriesPerRow, int numColQs) throws TableNotFoundException, MutationsRejectedException {
BatchWriter writer = connector.createBatchWriter("test", 1000, 1000, 1);
for (int j = 0; j < numRows; j++) {
Mutation m = new Mutation(Integer.toString(j));
for (int i = 0; i < entriesPerRow; i++) {
for(int q = 0; q < numColQs; q++)
m.put(new Text(Integer.toString(i)), new Text(Integer.toString(q)), new Value("".getBytes()));
}
writer.addMutation(m);
}
writer.flush();
}
@Test
public void visitAnd_ExceededValueThresholdMarkerJexlNode_RangeUpperBoundaryTest() throws Exception {
ASTJexlScript script = JexlASTHelper.parseJexlQuery("BAZ == 'woot' && ((ExceededValueThresholdMarkerJexlNode = true) && (FOO >= 'e' && FOO <= 'm'))");
Key hit = new Key("row", "dataType" + Constants.NULL + "123.345.456");
List<Map.Entry<Key,Value>> source = new ArrayList<>();
source.add(new AbstractMap.SimpleEntry(new Key("row", "fi" + Constants.NULL + "BAZ", "woot" + Constants.NULL + "dataType" + Constants.NULL
+ "123.345.456"), new Value()));
source.add(new AbstractMap.SimpleEntry(
new Key("row", "tf", "dataType" + Constants.NULL + "123.345.456" + Constants.NULL + "m" + Constants.NULL + "FOO"), new Value()));
Set<String> termFrequencyFields = new HashSet<>();
termFrequencyFields.add("FOO");
// must have doc to get tf field values are within the bounds
// aggregation fields are not set so no document is created
vistAnd_ExceededValueThesholdMarkerJexlNode_termFrequencyTest(script, hit, source, false, null, termFrequencyFields, Collections.EMPTY_SET,
Collections.emptySet());
List<String> expected = new ArrayList<>();
expected.add("m");
Map<String,List<String>> fooMap = new HashMap<>();
fooMap.put("FOO", expected);
// turn on aggregation and see the document
vistAnd_ExceededValueThesholdMarkerJexlNode_termFrequencyTest(script, hit, source, true, fooMap, false);
}
public static BasePoolableObjectFactory<SortedKeyValueIterator<Key,Value>> createIvaratorSourceFactory() {
return new BasePoolableObjectFactory<SortedKeyValueIterator<Key,Value>>() {
@Override
public SortedKeyValueIterator<Key,Value> makeObject() throws Exception {
return new SortedListKeyValueIterator(sourceList);
}
};
}
/**
* Add the index which will be managed by the versioning iterator and the data rows to scan from the index
*
* @param group
* @param entry
*/
@Override
public void put(String group, Event entry) {
checkNotNull(group);
checkNotNull(entry);
// first put the main index pointing to the contextId (The column family is prefixed with the NULL_BYTE to guarantee it shows up first
Mutation indexMutation = new Mutation(group);
indexMutation.put(NULL_BYTE + "INDEX", "", new ColumnVisibility(), entry.getTimestamp(), new Value((entry.getType() + ONE_BYTE + entry.getId()).getBytes()));
for (Attribute attribute : entry.getAttributes()) {
String fam = String.format("%s%s%s%s", END_BYTE, entry.getType(), ONE_BYTE, entry.getId());
Object value = attribute.getValue();
try {
String serialize = typeRegistry.encode(value);
String aliasForType = typeRegistry.getAlias(value);
String qual = String.format("%s%s%s%s%s", attribute.getKey(), NULL_BYTE, serialize, NULL_BYTE, aliasForType);
indexMutation.put(fam, qual, new ColumnVisibility(getVisibility(attribute, "")), entry.getTimestamp(),
new Value("".getBytes()));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
try {
writer.addMutation(indexMutation);
} catch (MutationsRejectedException ex) {
throw new RuntimeException("There was an error writing the mutation for [index=" + group + ",entryId=" + entry.getId() + "]", ex);
}
}
/**
* Set splits on a table based on quantile distribution and fixed number of splits
*/
public static void setSplitsByQuantile(
final BaseDataStore dataStore,
final Connector connector,
final String namespace,
final Index index,
final int quantile)
throws AccumuloException, AccumuloSecurityException, IOException, TableNotFoundException {
final long count = getEntries(dataStore, connector, namespace, index);
try (final CloseableIterator<Entry<Key, Value>> iterator =
getIterator(connector, namespace, index)) {
if (iterator == null) {
LOGGER.error("Could not get iterator instance, getIterator returned null");
throw new IOException("Could not get iterator instance, getIterator returned null");
}
long ii = 0;
final long splitInterval = (long) Math.ceil((double) count / (double) quantile);
final SortedSet<Text> splits = new TreeSet<>();
while (iterator.hasNext()) {
final Entry<Key, Value> entry = iterator.next();
ii++;
if (ii >= splitInterval) {
ii = 0;
splits.add(entry.getKey().getRow());
}
}
final String tableName = AccumuloUtils.getQualifiedTableName(namespace, index.getName());
connector.tableOperations().addSplits(tableName, splits);
connector.tableOperations().compact(tableName, null, null, true, true);
}
}
public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException {
validateOptions(options);
this.source = source;
queue = new ArrayBlockingQueue<QueueElement>(queueSize);
thread = new ProducerThread(this.source);
t = new Thread(thread, "ReadAheadIterator-SourceThread");
t.start();
}
/**
* forces a shard range
*
* @return
*/
private static Value getValueForNuthinAndYourHitsForFree() {
Uid.List.Builder builder = Uid.List.newBuilder();
builder.setCOUNT(50); // better not be zero!!!!
builder.setIGNORE(true); // better be true!!!
return new Value(builder.build().toByteArray());
}