下面列出了com.google.common.collect.Multiset#remove ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Creates execution sequence for pending task groups by interleaving batches of requested size of
* their occurrences. For example: {G1, G1, G1, G2, G2} with batch size of 2 task per group will
* be converted into {G1, G1, G2, G2, G1}.
*
* @param groups Multiset of task groups.
* @param batchSize The batch size of tasks from each group to sequence together.
* @return A task group execution sequence.
*/
@VisibleForTesting
static List<TaskGroupKey> getPreemptionSequence(
Multiset<TaskGroupKey> groups,
int batchSize) {
Preconditions.checkArgument(batchSize > 0, "batchSize should be positive.");
Multiset<TaskGroupKey> mutableGroups = HashMultiset.create(groups);
List<TaskGroupKey> instructions = Lists.newLinkedList();
Set<TaskGroupKey> keys = ImmutableSet.copyOf(groups.elementSet());
while (!mutableGroups.isEmpty()) {
for (TaskGroupKey key : keys) {
if (mutableGroups.contains(key)) {
int elementCount = mutableGroups.remove(key, batchSize);
int removedCount = Math.min(elementCount, batchSize);
instructions.addAll(Collections.nCopies(removedCount, key));
}
}
}
return instructions;
}
/**
* There may be multiple child injectors blacklisting a certain key so only remove the source
* that's relevant.
*/
private void cleanUpForCollectedState(Set<KeyAndSource> keysAndSources) {
synchronized (lock) {
for (KeyAndSource keyAndSource : keysAndSources) {
Multiset<Object> set = backingMap.get(keyAndSource.key);
if (set != null) {
set.remove(keyAndSource.source);
if (set.isEmpty()) {
backingMap.remove(keyAndSource.key);
}
}
}
}
}
@ExpectWarning(value="GC", num=4)
public static void testMultiset(Multiset<String> ms) {
ms.contains(1);
ms.count(1);
ms.remove(1);
ms.remove(1, 2);
}
@NoWarning("GC")
public static void testMultisetOK(Multiset<String> ms) {
ms.contains("x");
ms.count("x");
ms.remove("x");
ms.remove("x", 2);
}
/**
* Verify that mapreduce output (across all files) is as expected.
*
* @param directory Mapreduce output directory
* @param lines Expected lines
* @throws IOException
*/
public static void assertMapreduceOutput(FileSystem fs, String directory, String... lines) throws IOException {
Multiset<String> setLines = HashMultiset.create(Arrays.asList(lines));
List<String> notFound = new LinkedList<String>();
Path[] files = HdfsUtils.getOutputMapreduceFiles(fs, directory);
for(Path file : files) {
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(file)));
String line;
while ((line = br.readLine()) != null) {
if (!setLines.remove(line)) {
notFound.add(line);
}
}
br.close();
}
if(!setLines.isEmpty() || !notFound.isEmpty()) {
LOG.error("Output do not match expectations.");
LOG.error("Expected lines that weren't present in the files:");
LOG.error("\t'" + StringUtils.join(setLines, "'\n\t'") + "'");
LOG.error("Extra lines in files that weren't expected:");
LOG.error("\t'" + StringUtils.join(notFound, "'\n\t'") + "'");
fail("Output do not match expectations.");
}
}
private void checkEvents(Member member, GroupMembershipEvent... types) throws InterruptedException {
Multiset<GroupMembershipEvent> events = HashMultiset.create(Arrays.asList(types));
for (int i = 0; i < types.length; i++) {
GroupMembershipEvent event = nextEvent(member);
if (!events.remove(event)) {
throw new AssertionError();
}
}
}
static boolean unorderedEquals(List<FlamdexDocument> l1, List<FlamdexDocument> l2) {
if (l1.size() != l2.size()) return false;
Multiset<FlamdexDocumentWrapper> s1 = HashMultiset.create(Lists.transform(l1, new Function<FlamdexDocument, FlamdexDocumentWrapper>() {
@Override
public FlamdexDocumentWrapper apply(FlamdexDocument input) {
return new FlamdexDocumentWrapper(input);
}
}));
for (final FlamdexDocument doc : l2) {
final FlamdexDocumentWrapper w = new FlamdexDocumentWrapper(doc);
if (!s1.remove(w)) return false;
}
return s1.isEmpty();
}
private boolean shouldKillIncrementalBounceTask(
SingularityRequest request,
SingularityTaskCleanup taskCleanup,
String matchingTasksDeployId,
List<SingularityTaskId> matchingTasks,
SingularityDeployKey key,
Multiset<SingularityDeployKey> incrementalCleaningTasks
) {
int healthyReplacementTasks = getNumHealthyTasks(
request,
matchingTasksDeployId,
matchingTasks
);
if (
healthyReplacementTasks +
incrementalCleaningTasks.count(key) <=
request.getInstancesSafe()
) {
LOG.trace(
"Not killing a task {} yet, only {} matching out of a required {}",
taskCleanup,
matchingTasks.size(),
request.getInstancesSafe() - incrementalCleaningTasks.count(key)
);
return false;
} else {
LOG.debug(
"Killing a task {}, {} replacement tasks are healthy",
taskCleanup,
healthyReplacementTasks
);
incrementalCleaningTasks.remove(key);
return true;
}
}
private boolean shouldKillIncrementalDeployCleanupTask(
SingularityRequest request,
SingularityTaskCleanup taskCleanup,
String matchingTasksDeployId,
List<SingularityTaskId> matchingTasks,
SingularityDeployKey key,
Multiset<SingularityDeployKey> incrementalCleaningTasks
) {
int healthyActiveDeployTasks = getNumHealthyTasks(
request,
matchingTasksDeployId,
matchingTasks
);
if (healthyActiveDeployTasks < request.getInstancesSafe()) {
LOG.trace(
"Not killing a task {} yet, only {} matching out of a required {}",
taskCleanup,
matchingTasks.size(),
request.getInstancesSafe() - incrementalCleaningTasks.count(key)
);
return false;
} else {
LOG.debug(
"Killing a task {}, {} active deploy tasks are healthy",
taskCleanup,
healthyActiveDeployTasks
);
incrementalCleaningTasks.remove(key);
return true;
}
}
@Test
public void givenMultiSet_whenRemovingValues_shouldReturnCorrectCount() {
Multiset<String> bookStore = HashMultiset.create();
bookStore.add("Potter");
bookStore.add("Potter");
bookStore.remove("Potter");
assertThat(bookStore.contains("Potter")).isTrue();
assertThat(bookStore.count("Potter")).isEqualTo(1);
}
/**
* Taken from https://en.wikipedia.org/wiki/Topological_sorting#Depth-first_search
*
* <p>
* function visit(node n, list L)
* mark n as being "part of the current chain"
* for each node m with an edge from n to m do
* if n is not already "part of the current chain"
* if m is has not been visited yet then
* visit(m)
* else
* //A cycle should not be possible at this point
* mark n as being visited
* n is no longer "part of the current chain"
* add n to tail of L
* </p>
* @param current the next unvisited node from input
* @param result the sorted result
* @param currentChain the complete list of input
* @param lookup a lookup table to link URI's to their tripleMap objects
* @param onCycle code to handle a cycle if it occurs
* @param errorConsumer code to handle errors reported by this method (i.e. log it, throw it, do what you want)
*/
private static void sortStep(TriplesMapBuilder current, LinkedList<TriplesMapBuilder> result,
Multiset<TriplesMapBuilder> currentChain, Map<String, TriplesMapBuilder> lookup,
Set<TriplesMapBuilder> unvisited, TopologicalSorter.CycleConsumer onCycle,
Consumer<String> errorConsumer) {
// mark n as being "part of the current chain"
currentChain.add(current);
// for each node m with an edge from n to m do
for (String uriOfDependency : current.getReferencedTriplesMaps()) {
TriplesMapBuilder dependentBuilder = lookup.get(uriOfDependency);
if (dependentBuilder == null) {
errorConsumer.accept("No triplesMap with uri " + uriOfDependency + " was found");
continue;
}
// if n is already "part of the current chain"
if (currentChain.contains(dependentBuilder)) {
onCycle.handleCycle(currentChain, current, dependentBuilder);
} else {
//if m is has not been visited yet then
if (unvisited.contains(dependentBuilder)) {
// visit(m)
sortStep(dependentBuilder, result, currentChain, lookup, unvisited, onCycle, errorConsumer);
}
}
}
// mark n as being visited
unvisited.remove(current);
// n is no longer "part of the current chain"
currentChain.remove(current);
// add n to head of L
result.add(current);
}
private static Multimap<String, BucketAssignment> computeAssignmentChanges(ClusterState clusterState)
{
Multimap<String, BucketAssignment> sourceToAllocationChanges = HashMultimap.create();
Map<String, Long> allocationBytes = new HashMap<>(clusterState.getAssignedBytes());
Set<String> activeNodes = clusterState.getActiveNodes();
for (Distribution distribution : clusterState.getDistributionAssignments().keySet()) {
// number of buckets in this distribution assigned to a node
Multiset<String> allocationCounts = HashMultiset.create();
Collection<BucketAssignment> distributionAssignments = clusterState.getDistributionAssignments().get(distribution);
distributionAssignments.stream()
.map(BucketAssignment::getNodeIdentifier)
.forEach(allocationCounts::add);
int currentMin = allocationBytes.keySet().stream()
.mapToInt(allocationCounts::count)
.min()
.getAsInt();
int currentMax = allocationBytes.keySet().stream()
.mapToInt(allocationCounts::count)
.max()
.getAsInt();
int numBuckets = distributionAssignments.size();
int targetMin = (int) Math.floor((numBuckets * 1.0) / clusterState.getActiveNodes().size());
int targetMax = (int) Math.ceil((numBuckets * 1.0) / clusterState.getActiveNodes().size());
log.info("Distribution %s: Current bucket skew: min %s, max %s. Target bucket skew: min %s, max %s", distribution.getId(), currentMin, currentMax, targetMin, targetMax);
for (String source : ImmutableSet.copyOf(allocationCounts)) {
List<BucketAssignment> existingAssignments = distributionAssignments.stream()
.filter(assignment -> assignment.getNodeIdentifier().equals(source))
.collect(toList());
for (BucketAssignment existingAssignment : existingAssignments) {
if (activeNodes.contains(source) && allocationCounts.count(source) <= targetMin) {
break;
}
// identify nodes with bucket counts lower than the computed target, and greedily select from this set based on projected disk utilization.
// greediness means that this may produce decidedly non-optimal results if one looks at the global distribution of buckets->nodes.
// also, this assumes that nodes in a cluster have identical storage capacity
String target = activeNodes.stream()
.filter(candidate -> !candidate.equals(source) && allocationCounts.count(candidate) < targetMax)
.sorted(comparingInt(allocationCounts::count))
.min(Comparator.comparingDouble(allocationBytes::get))
.orElseThrow(() -> new VerifyException("unable to find target for rebalancing"));
long bucketSize = clusterState.getDistributionBucketSize().get(distribution);
// only move bucket if it reduces imbalance
if (activeNodes.contains(source) && (allocationCounts.count(source) == targetMax && allocationCounts.count(target) == targetMin)) {
break;
}
allocationCounts.remove(source);
allocationCounts.add(target);
allocationBytes.compute(source, (k, v) -> v - bucketSize);
allocationBytes.compute(target, (k, v) -> v + bucketSize);
sourceToAllocationChanges.put(
existingAssignment.getNodeIdentifier(),
new BucketAssignment(existingAssignment.getDistributionId(), existingAssignment.getBucketNumber(), target));
}
}
}
return sourceToAllocationChanges;
}
@Override
public boolean remove(Object element) {
final Multiset<E> multiset = get();
return (multiset == null) ? EMPTY_MULTISET.remove(element)
: multiset.remove(element);
}
@Override
public int remove(Object element, int occurrences) {
final Multiset<E> multiset = get();
return (multiset == null) ? EMPTY_MULTISET.remove(element, occurrences)
: multiset.remove(element, occurrences);
}