下面列出了com.datastax.driver.core.TokenRange#org.apache.cassandra.dht.Range 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public synchronized ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
{
List<SSTableReader> repairedSSTables = new ArrayList<>();
List<SSTableReader> unrepairedSSTables = new ArrayList<>();
for (SSTableReader sstable : sstables)
if (sstable.isRepaired())
repairedSSTables.add(sstable);
else
unrepairedSSTables.add(sstable);
ScannerList repairedScanners = repaired.getScanners(repairedSSTables, range);
ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, range);
List<ISSTableScanner> scanners = new ArrayList<>(repairedScanners.scanners.size() + unrepairedScanners.scanners.size());
scanners.addAll(repairedScanners.scanners);
scanners.addAll(unrepairedScanners.scanners);
return new ScannerList(scanners);
}
/** @return the number of nodes bootstrapping into source's primary range */
public int pendingRangeChanges(InetAddress source)
{
int n = 0;
Collection<Range<Token>> sourceRanges = getPrimaryRangesFor(getTokens(source));
lock.readLock().lock();
try
{
for (Token token : bootstrapTokens.keySet())
for (Range<Token> range : sourceRanges)
if (range.contains(token))
n++;
}
finally
{
lock.readLock().unlock();
}
return n;
}
public LeveledScanner(Collection<SSTableReader> sstables, Range<Token> range)
{
this.range = range;
// add only sstables that intersect our range, and estimate how much data that involves
this.sstables = new ArrayList<SSTableReader>(sstables.size());
long length = 0;
for (SSTableReader sstable : sstables)
{
this.sstables.add(sstable);
long estimatedKeys = sstable.estimatedKeys();
double estKeysInRangeRatio = 1.0;
if (estimatedKeys > 0 && range != null)
estKeysInRangeRatio = ((double) sstable.estimatedKeysForRanges(Collections.singleton(range))) / estimatedKeys;
length += sstable.uncompressedLength() * estKeysInRangeRatio;
}
totalLength = length;
Collections.sort(this.sstables, SSTableReader.sstableComparator);
sstableIterator = this.sstables.iterator();
assert sstableIterator.hasNext(); // caller should check intersecting first
currentScanner = sstableIterator.next().getScanner(range, CompactionManager.instance.getRateLimiter());
}
public MerkleTree deserialize(DataInput in, int version) throws IOException
{
byte hashdepth = in.readByte();
long maxsize = in.readLong();
long size = in.readLong();
IPartitioner partitioner;
try
{
partitioner = FBUtilities.newPartitioner(in.readUTF());
}
catch (ConfigurationException e)
{
throw new IOException(e);
}
// full range
Token left = Token.serializer.deserialize(in);
Token right = Token.serializer.deserialize(in);
Range<Token> fullRange = new Range<>(left, right, partitioner);
MerkleTree mt = new MerkleTree(partitioner, fullRange, hashdepth, maxsize);
mt.size = size;
mt.root = Hashable.serializer.deserialize(in, version);
return mt;
}
TreeRange getHelper(Hashable hashable, Token pleft, Token pright, byte depth, Token t)
{
if (hashable instanceof Leaf)
{
// we've reached a hash: wrap it up and deliver it
return new TreeRange(this, pleft, pright, depth, hashable);
}
// else: node.
Inner node = (Inner)hashable;
if (Range.contains(pleft, node.token, t))
// left child contains token
return getHelper(node.lchild, pleft, node.token, inc(depth), t);
// else: right child contains token
return getHelper(node.rchild, node.token, pright, inc(depth), t);
}
@Test
public void shouldSkipAntiCompactionForNonIntersectingRange() throws InterruptedException, ExecutionException, IOException
{
ColumnFamilyStore store = prepareColumnFamilyStore();
Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
assertEquals(store.getSSTables().size(), sstables.size());
Range<Token> range = new Range<Token>(new BytesToken("-10".getBytes()), new BytesToken("-1".getBytes()));
List<Range<Token>> ranges = Arrays.asList(range);
Refs<SSTableReader> refs = Refs.tryRef(sstables);
if (refs == null)
throw new IllegalStateException();
CompactionManager.instance.performAnticompaction(store, ranges, refs, 1);
assertThat(store.getSSTables().size(), is(1));
assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false));
assertThat(Iterables.get(store.getSSTables(), 0).selfRef().globalCount(), is(1));
assertThat(store.getDataTracker().getCompacting().size(), is(0));
}
public int forceRepairAsync(String keyspace, int parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies)
{
if (parallelismDegree < 0 || parallelismDegree > RepairParallelism.values().length - 1)
{
throw new IllegalArgumentException("Invalid parallelism degree specified: " + parallelismDegree);
}
Collection<Range<Token>> ranges;
if (primaryRange)
{
// when repairing only primary range, neither dataCenters nor hosts can be set
if (dataCenters == null && hosts == null)
ranges = getPrimaryRanges(keyspace);
// except dataCenters only contain local DC (i.e. -local)
else if (dataCenters != null && dataCenters.size() == 1 && dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
ranges = getPrimaryRangesWithinDC(keyspace);
else
throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
}
else
{
ranges = getLocalRanges(keyspace);
}
return forceRepairAsync(keyspace, RepairParallelism.values()[parallelismDegree], dataCenters, hosts, ranges, fullRepair, columnFamilies);
}
@Test
public void antiCompactionSizeTest() throws ExecutionException, InterruptedException, IOException
{
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
cfs.disableAutoCompaction();
SSTableReader s = writeFile(cfs, 1000);
cfs.addSSTable(s);
long origSize = s.bytesOnDisk();
Range<Token> range = new Range<Token>(new BytesToken(ByteBufferUtil.bytes(0)), new BytesToken(ByteBufferUtil.bytes(500)));
Collection<SSTableReader> sstables = cfs.getSSTables();
CompactionManager.instance.performAnticompaction(cfs, Arrays.asList(range), Refs.tryRef(sstables), 12345);
long sum = 0;
for (SSTableReader x : cfs.getSSTables())
sum += x.bytesOnDisk();
assertEquals(sum, cfs.metric.liveDiskSpaceUsed.count());
assertEquals(origSize, cfs.metric.liveDiskSpaceUsed.count(), 100000);
}
/**
* @return list of Token ranges (_not_ keys!) together with estimated key count,
* breaking up the data this node is responsible for into pieces of roughly keysPerSplit
*/
public List<Pair<Range<Token>, Long>> getSplits(String keyspaceName, String cfName, Range<Token> range, int keysPerSplit)
{
Keyspace t = Keyspace.open(keyspaceName);
ColumnFamilyStore cfs = t.getColumnFamilyStore(cfName);
List<DecoratedKey> keys = keySamples(Collections.singleton(cfs), range);
long totalRowCountEstimate = cfs.estimatedKeysForRange(range);
// splitCount should be much smaller than number of key samples, to avoid huge sampling error
int minSamplesPerSplit = 4;
int maxSplitCount = keys.size() / minSamplesPerSplit + 1;
int splitCount = Math.max(1, Math.min(maxSplitCount, (int)(totalRowCountEstimate / keysPerSplit)));
List<Token> tokens = keysToTokens(range, keys);
return getSplits(tokens, splitCount, cfs);
}
private List<Pair<Range<Token>, Long>> getSplits(List<Token> tokens, int splitCount, ColumnFamilyStore cfs)
{
double step = (double) (tokens.size() - 1) / splitCount;
Token prevToken = tokens.get(0);
List<Pair<Range<Token>, Long>> splits = Lists.newArrayListWithExpectedSize(splitCount);
for (int i = 1; i <= splitCount; i++)
{
int index = (int) Math.round(i * step);
Token token = tokens.get(index);
Range<Token> range = new Range<>(prevToken, token);
// always return an estimate > 0 (see CASSANDRA-7322)
splits.add(Pair.create(range, Math.max(cfs.metadata.getMinIndexInterval(), cfs.estimatedKeysForRange(range))));
prevToken = token;
}
return splits;
}
/**
* @param sstable SSTable to scan; must not be null
* @param tokenRanges A set of token ranges to scan
* @param limiter background i/o RateLimiter; may be null
*/
private SSTableScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
{
assert sstable != null;
this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
this.ifile = sstable.openIndexReader();
this.sstable = sstable;
this.dataRange = null;
List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(tokenRanges.size());
for (Range<Token> range : Range.normalize(tokenRanges))
addRange(range.toRowBounds(), boundsList);
this.rangeIterator = boundsList.iterator();
}
@Test
public void testGetNeighborsTimesTwoInSpecifiedHosts() throws Throwable
{
TokenMetadata tmd = StorageService.instance.getTokenMetadata();
// generate rf*2 nodes, and ensure that only neighbors specified by the hosts are returned
addTokens(2 * Keyspace.open(keyspaceName).getReplicationStrategy().getReplicationFactor());
AbstractReplicationStrategy ars = Keyspace.open(keyspaceName).getReplicationStrategy();
List<InetAddress> expected = new ArrayList<>();
for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddress()))
{
expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
}
expected.remove(FBUtilities.getBroadcastAddress());
Collection<String> hosts = Arrays.asList(FBUtilities.getBroadcastAddress().getCanonicalHostName(),expected.get(0).getCanonicalHostName());
assertEquals(expected.get(0), ActiveRepairService.getNeighbors(keyspaceName, StorageService.instance.getLocalRanges(keyspaceName).iterator().next(), null, hosts).iterator().next());
}
private void testRangeSliceCommandWrite() throws IOException
{
IPartitioner part = StorageService.getPartitioner();
AbstractBounds<RowPosition> bounds = new Range<Token>(part.getRandomToken(), part.getRandomToken()).toRowBounds();
RangeSliceCommand namesCmd = new RangeSliceCommand(statics.KS, "Standard1", statics.readTs, namesPred, bounds, 100);
MessageOut<RangeSliceCommand> namesCmdMsg = namesCmd.createMessage();
RangeSliceCommand emptyRangeCmd = new RangeSliceCommand(statics.KS, "Standard1", statics.readTs, emptyRangePred, bounds, 100);
MessageOut<RangeSliceCommand> emptyRangeCmdMsg = emptyRangeCmd.createMessage();
RangeSliceCommand regRangeCmd = new RangeSliceCommand(statics.KS, "Standard1", statics.readTs, nonEmptyRangePred, bounds, 100);
MessageOut<RangeSliceCommand> regRangeCmdMsg = regRangeCmd.createMessage();
RangeSliceCommand namesCmdSup = new RangeSliceCommand(statics.KS, "Super1", statics.readTs, namesSCPred, bounds, 100);
MessageOut<RangeSliceCommand> namesCmdSupMsg = namesCmdSup.createMessage();
RangeSliceCommand emptyRangeCmdSup = new RangeSliceCommand(statics.KS, "Super1", statics.readTs, emptyRangePred, bounds, 100);
MessageOut<RangeSliceCommand> emptyRangeCmdSupMsg = emptyRangeCmdSup.createMessage();
RangeSliceCommand regRangeCmdSup = new RangeSliceCommand(statics.KS, "Super1", statics.readTs, nonEmptyRangeSCPred, bounds, 100);
MessageOut<RangeSliceCommand> regRangeCmdSupMsg = regRangeCmdSup.createMessage();
DataOutputStreamAndChannel out = getOutput("db.RangeSliceCommand.bin");
namesCmdMsg.serialize(out, getVersion());
emptyRangeCmdMsg.serialize(out, getVersion());
regRangeCmdMsg.serialize(out, getVersion());
namesCmdSupMsg.serialize(out, getVersion());
emptyRangeCmdSupMsg.serialize(out, getVersion());
regRangeCmdSupMsg.serialize(out, getVersion());
out.close();
// test serializedSize
testSerializedSize(namesCmd, RangeSliceCommand.serializer);
testSerializedSize(emptyRangeCmd, RangeSliceCommand.serializer);
testSerializedSize(regRangeCmd, RangeSliceCommand.serializer);
testSerializedSize(namesCmdSup, RangeSliceCommand.serializer);
testSerializedSize(emptyRangeCmdSup, RangeSliceCommand.serializer);
testSerializedSize(regRangeCmdSup, RangeSliceCommand.serializer);
}
public List<KeyRange> getLocalKeyPartition() throws BackendException {
ensureKeyspaceExists(keySpaceName);
@SuppressWarnings("rawtypes")
Collection<Range<Token>> ranges = StorageService.instance.getPrimaryRanges(keySpaceName);
List<KeyRange> keyRanges = new ArrayList<KeyRange>(ranges.size());
for (@SuppressWarnings("rawtypes") Range<Token> range : ranges) {
keyRanges.add(CassandraHelper.transformRange(range));
}
return keyRanges;
}
/**
* Direct I/O SSTableScanner over a defined range of tokens.
*
* @param range the range of keys to cover
* @return A Scanner for seeking over the rows of the SSTable.
*/
public ISSTableScanner getScanner(Range<Token> range, RateLimiter limiter)
{
if (range == null)
return getScanner(limiter);
return getScanner(Collections.singletonList(range), limiter);
}
private Multimap<Range<Token>, InetAddress> getPendingRangesMM(String keyspaceName)
{
Multimap<Range<Token>, InetAddress> map = pendingRanges.get(keyspaceName);
if (map == null)
{
map = HashMultimap.create();
Multimap<Range<Token>, InetAddress> priorMap = pendingRanges.putIfAbsent(keyspaceName, map);
if (priorMap != null)
map = priorMap;
}
return map;
}
public long serializedSize(StreamRequest request, int version)
{
int size = TypeSizes.NATIVE.sizeof(request.keyspace);
size += TypeSizes.NATIVE.sizeof(request.repairedAt);
size += TypeSizes.NATIVE.sizeof(request.ranges.size());
for (Range<Token> range : request.ranges)
{
size += Token.serializer.serializedSize(range.left, TypeSizes.NATIVE);
size += Token.serializer.serializedSize(range.right, TypeSizes.NATIVE);
}
size += TypeSizes.NATIVE.sizeof(request.columnFamilies.size());
for (String cf : request.columnFamilies)
size += TypeSizes.NATIVE.sizeof(cf);
return size;
}
public Collection<InetAddress> pendingEndpointsFor(Token token, String keyspaceName)
{
Map<Range<Token>, Collection<InetAddress>> ranges = getPendingRanges(keyspaceName);
if (ranges.isEmpty())
return Collections.emptyList();
Set<InetAddress> endpoints = new HashSet<InetAddress>();
for (Map.Entry<Range<Token>, Collection<InetAddress>> entry : ranges.entrySet())
{
if (entry.getKey().contains(token))
endpoints.addAll(entry.getValue());
}
return endpoints;
}
private void invalidateHelper(Hashable hashable, Token pleft, Token t)
{
hashable.hash(null);
if (hashable instanceof Leaf)
return;
// else: node.
Inner node = (Inner)hashable;
if (Range.contains(pleft, node.token, t))
// left child contains token
invalidateHelper(node.lchild, pleft, t);
else
// right child contains token
invalidateHelper(node.rchild, node.token, t);
}
public Future<?> submitAntiCompaction(final ColumnFamilyStore cfs,
final Collection<Range<Token>> ranges,
final Refs<SSTableReader> sstables,
final long repairedAt)
{
Runnable runnable = new WrappedRunnable() {
@Override
public void runMayThrow() throws Exception
{
boolean success = false;
while (!success)
{
for (SSTableReader compactingSSTable : cfs.getDataTracker().getCompacting())
sstables.releaseIfHolds(compactingSSTable);
Set<SSTableReader> compactedSSTables = new HashSet<>();
for (SSTableReader sstable : sstables)
if (sstable.isMarkedCompacted())
compactedSSTables.add(sstable);
sstables.release(compactedSSTables);
success = sstables.isEmpty() || cfs.getDataTracker().markCompacting(sstables);
}
performAnticompaction(cfs, ranges, sstables, repairedAt);
}
};
if (executor.isShutdown())
{
logger.info("Compaction executor has shut down, not submitting anticompaction");
return Futures.immediateCancelledFuture();
}
return executor.submit(runnable);
}
public static List<SSTableReader> intersecting(Collection<SSTableReader> sstables, Range<Token> range)
{
ArrayList<SSTableReader> filtered = new ArrayList<SSTableReader>();
for (SSTableReader sstable : sstables)
{
Range<Token> sstableRange = new Range<Token>(sstable.first.getToken(), sstable.last.getToken(), sstable.partitioner);
if (range == null || sstableRange.intersects(range))
filtered.add(sstable);
}
return filtered;
}
/**
* @param ranges
* @return An estimate of the number of keys for given ranges in this SSTable.
*/
public long estimatedKeysForRanges(Collection<Range<Token>> ranges)
{
long sampleKeyCount = 0;
List<Pair<Integer, Integer>> sampleIndexes = getSampleIndexesForRanges(indexSummary, ranges);
for (Pair<Integer, Integer> sampleIndexRange : sampleIndexes)
sampleKeyCount += (sampleIndexRange.right - sampleIndexRange.left + 1);
// adjust for the current sampling level: (BSL / SL) * index_interval_at_full_sampling
long estimatedKeys = sampleKeyCount * ((long) Downsampling.BASE_SAMPLING_LEVEL * indexSummary.getMinIndexInterval()) / indexSummary.getSamplingLevel();
return Math.max(1, estimatedKeys);
}
public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, long repairedAt)
{
for (ColumnFamilyStore cfs : columnFamilyStores)
this.columnFamilyStores.put(cfs.metadata.cfId, cfs);
this.ranges = ranges;
this.sstableMap = new HashMap<>();
this.repairedAt = repairedAt;
}
private void recordSizeEstimates(ColumnFamilyStore table, Collection<Range<Token>> localRanges)
{
// for each local primary range, estimate (crudely) mean partition size and partitions count.
Map<Range<Token>, Pair<Long, Long>> estimates = new HashMap<>(localRanges.size());
for (Range<Token> range : localRanges)
{
// filter sstables that have partitions in this range.
Refs<SSTableReader> refs = null;
while (refs == null)
{
ColumnFamilyStore.ViewFragment view = table.select(table.viewFilter(range.toRowBounds()));
refs = Refs.tryRef(view.sstables);
}
long partitionsCount, meanPartitionSize;
try
{
// calculate the estimates.
partitionsCount = estimatePartitionsCount(refs, range);
meanPartitionSize = estimateMeanPartitionSize(refs);
}
finally
{
refs.release();
}
estimates.put(range, Pair.create(partitionsCount, meanPartitionSize));
}
// atomically update the estimates.
SystemKeyspace.updateSizeEstimates(table.metadata.ksName, table.metadata.cfName, estimates);
}
private void transferRanges(ColumnFamilyStore cfs) throws Exception
{
IPartitioner p = StorageService.getPartitioner();
List<Range<Token>> ranges = new ArrayList<>();
// wrapped range
ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), p.getToken(ByteBufferUtil.bytes("key0"))));
new StreamPlan("StreamingTransferTest").transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName()).execute().get();
}
public Map<List<String>, List<String>> getPendingRangeToEndpointMap(String keyspace)
{
// some people just want to get a visual representation of things. Allow null and set it to the first
// non-system keyspace.
if (keyspace == null)
keyspace = Schema.instance.getNonSystemKeyspaces().get(0);
Map<List<String>, List<String>> map = new HashMap<>();
for (Map.Entry<Range<Token>, Collection<InetAddress>> entry : tokenMetadata.getPendingRanges(keyspace).entrySet())
{
List<InetAddress> l = new ArrayList<>(entry.getValue());
map.put(entry.getKey().asList(), stringify(l));
}
return map;
}
private Map<Range<Token>, List<InetAddress>> getRangeToAddressMap(String keyspace, List<Token> sortedTokens)
{
// some people just want to get a visual representation of things. Allow null and set it to the first
// non-system keyspace.
if (keyspace == null)
keyspace = Schema.instance.getNonSystemKeyspaces().get(0);
List<Range<Token>> ranges = getAllRanges(sortedTokens);
return constructRangeToEndpointMap(keyspace, ranges);
}
@Test
public void testMoveAfterNextNeighbors() throws UnknownHostException
{
// moves after its next neighbor in the ring
int movingNodeIdx = 1;
int movingNodeIdxAfterMove = 2;
BigIntegerToken newToken = new BigIntegerToken("52535295865117307932921825928971026432");
BigIntegerToken[] tokens = initTokens();
BigIntegerToken[] tokensAfterMove = initTokensAfterMove(tokens, movingNodeIdx, newToken);
Pair<Set<Range<Token>>, Set<Range<Token>>> ranges = calculateStreamAndFetchRanges(tokens, tokensAfterMove, movingNodeIdx);
// sort the results, so they can be compared
Range[] toStream = ranges.left.toArray(new Range[0]);
Range[] toFetch = ranges.right.toArray(new Range[0]);
Arrays.sort(toStream);
Arrays.sort(toFetch);
// build expected ranges
Range[] toStreamExpected = new Range[1];
toStreamExpected[0] = new Range(getToken(movingNodeIdx - 2, tokens), getToken(movingNodeIdx - 1, tokens));
Arrays.sort(toStreamExpected);
Range[] toFetchExpected = new Range[2];
toFetchExpected[0] = new Range(getToken(movingNodeIdxAfterMove - 1, tokens), getToken(movingNodeIdxAfterMove, tokens));
toFetchExpected[1] = new Range(getToken(movingNodeIdxAfterMove, tokensAfterMove), getToken(movingNodeIdx, tokensAfterMove));
Arrays.sort(toFetchExpected);
assertEquals(Arrays.equals(toStream, toStreamExpected), true);
assertEquals(Arrays.equals(toFetch, toFetchExpected), true);
}
/**
* Finds living endpoints responsible for the given ranges
*
* @param keyspaceName the keyspace ranges belong to
* @param ranges the ranges to find sources for
* @return multimap of addresses to ranges the address is responsible for
*/
private Multimap<InetAddress, Range<Token>> getNewSourceRanges(String keyspaceName, Set<Range<Token>> ranges)
{
InetAddress myAddress = FBUtilities.getBroadcastAddress();
Multimap<Range<Token>, InetAddress> rangeAddresses = Keyspace.open(keyspaceName).getReplicationStrategy().getRangeAddresses(tokenMetadata.cloneOnlyTokenMap());
Multimap<InetAddress, Range<Token>> sourceRanges = HashMultimap.create();
IFailureDetector failureDetector = FailureDetector.instance;
// find alive sources for our new ranges
for (Range<Token> range : ranges)
{
Collection<InetAddress> possibleRanges = rangeAddresses.get(range);
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
List<InetAddress> sources = snitch.getSortedListByProximity(myAddress, possibleRanges);
assert (!sources.contains(myAddress));
for (InetAddress source : sources)
{
if (failureDetector.isAlive(source))
{
sourceRanges.put(source, range);
break;
}
}
}
return sourceRanges;
}
public StreamRequest(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, long repairedAt)
{
this.keyspace = keyspace;
this.ranges = ranges;
this.columnFamilies.addAll(columnFamilies);
this.repairedAt = repairedAt;
}