下面列出了java.util.PriorityQueue#iterator ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* iterator.remove removes current element
*/
public void testIteratorRemove() {
final PriorityQueue q = new PriorityQueue(3);
q.add(new Integer(2));
q.add(new Integer(1));
q.add(new Integer(3));
Iterator it = q.iterator();
it.next();
it.remove();
it = q.iterator();
assertEquals(it.next(), new Integer(2));
assertEquals(it.next(), new Integer(3));
assertFalse(it.hasNext());
}
/**
* java.util.PriorityQueue#iterator()
*/
public void test_iterator() {
PriorityQueue<Integer> integerQueue = new PriorityQueue<Integer>();
Integer[] array = { 2, 45, 7, -12, 9 };
for (int i = 0; i < array.length; i++) {
integerQueue.offer(array[i]);
}
Iterator<Integer> iter = integerQueue.iterator();
assertNotNull(iter);
ArrayList<Integer> iterResult = new ArrayList<Integer>();
while (iter.hasNext()) {
iterResult.add(iter.next());
}
Object[] resultArray = iterResult.toArray();
Arrays.sort(array);
Arrays.sort(resultArray);
assertTrue(Arrays.equals(array, resultArray));
}
/**
* iterator.remove removes current element
*/
public void testIteratorRemove() {
final PriorityQueue q = new PriorityQueue(3);
q.add(new Integer(2));
q.add(new Integer(1));
q.add(new Integer(3));
Iterator it = q.iterator();
it.next();
it.remove();
it = q.iterator();
assertEquals(it.next(), new Integer(2));
assertEquals(it.next(), new Integer(3));
assertFalse(it.hasNext());
}
@Override
public Iterator<IntermediateSampleData<T>> sampleInPartition(Iterator<T> input) {
if (numSamples == 0) {
return emptyIntermediateIterable;
}
// This queue holds fixed number elements with the top K weight for current partition.
PriorityQueue<IntermediateSampleData<T>> queue = new PriorityQueue<IntermediateSampleData<T>>(numSamples);
int index = 0;
IntermediateSampleData<T> smallest = null;
while (input.hasNext()) {
T element = input.next();
if (index < numSamples) {
// Fill the queue with first K elements from input.
queue.add(new IntermediateSampleData<T>(random.nextDouble(), element));
smallest = queue.peek();
} else {
double rand = random.nextDouble();
// Remove the element with the smallest weight, and append current element into the queue.
if (rand > smallest.getWeight()) {
queue.remove();
queue.add(new IntermediateSampleData<T>(rand, element));
smallest = queue.peek();
}
}
index++;
}
return queue.iterator();
}
@Override
public Iterator<IntermediateSampleData<T>> sampleInPartition(Iterator<T> input) {
if (numSamples == 0) {
return emptyIntermediateIterable;
}
// This queue holds fixed number elements with the top K weight for current partition.
PriorityQueue<IntermediateSampleData<T>> queue = new PriorityQueue<IntermediateSampleData<T>>(numSamples);
int index = 0;
IntermediateSampleData<T> smallest = null;
while (input.hasNext()) {
T element = input.next();
if (index < numSamples) {
// Fill the queue with first K elements from input.
queue.add(new IntermediateSampleData<T>(random.nextDouble(), element));
smallest = queue.peek();
} else {
double rand = random.nextDouble();
// Remove the element with the smallest weight, and append current element into the queue.
if (rand > smallest.getWeight()) {
queue.remove();
queue.add(new IntermediateSampleData<T>(rand, element));
smallest = queue.peek();
}
}
index++;
}
return queue.iterator();
}
/**
* iterator iterates through all elements
*/
public void testIterator() {
PriorityQueue q = populatedQueue(SIZE);
Iterator it = q.iterator();
int i;
for (i = 0; it.hasNext(); i++)
assertTrue(q.contains(it.next()));
assertEquals(i, SIZE);
assertIteratorExhausted(it);
}
@SuppressWarnings("unused")
private static void printVertexQueue(PriorityQueue<Vertex> vertexQueue) {
System.out.println("%%%%%%%%%% vertex queue %%%%%%%%%%%");
int i = 0;
for (Iterator<Vertex> vertexQueueIter = vertexQueue.iterator(); vertexQueueIter.hasNext();) {
Vertex v = vertexQueueIter.next();
System.out.println(i + " (" + v.componentID + " <- " + v.getMostConnectedNeighbor().componentID + ")" + " best net gain: " + v.getBestNetGain());
i++;
}
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%");
}
@SuppressWarnings("unchecked")
@Override
public void endWindow()
{
super.endWindow();
tuplesInCurrentStreamWindow = new LinkedList<T>();
if (lastExpiredWindowState == null) {
// not ready to emit value or empty in a certain window
return;
}
// Assumption: the expiring tuple and any tuple before are already sorted. So it's safe to emit tuples from sortedListInSlidingWin till the expiring tuple
for (T expiredTuple : lastExpiredWindowState) {
// Find sorted list for the given key
PriorityQueue<T> sortedListForE = sortedListInSlidingWin.get(function.apply(expiredTuple));
for (Iterator<T> iterator = sortedListForE.iterator(); iterator.hasNext();) {
T minElemInSortedList = iterator.next();
int k = 0;
if (comparator == null) {
if (expiredTuple instanceof Comparable) {
k = ((Comparable<T>)expiredTuple).compareTo(minElemInSortedList);
} else {
errorOutput.emit(expiredTuple);
throw new IllegalArgumentException("Operator \"" + ClassUtils.getShortClassName(this.getClass()) + "\" encounters an invalid tuple " + expiredTuple + "\nNeither the tuple is comparable Nor Comparator is specified!");
}
} else {
k = comparator.compare(expiredTuple, minElemInSortedList);
}
if (k < 0) {
// If the expiring tuple is less than the first element of the sorted list. No more tuples to emit
break;
} else {
// Emit the element in sorted list if it's less than the expiring tuple
outputPort.emit(minElemInSortedList);
// remove the element from the sorted list
iterator.remove();
}
}
}
}
/**
* java.util.PriorityQueue#iterator()
*/
public void test_iterator_remove() {
PriorityQueue<Integer> integerQueue = new PriorityQueue<Integer>();
Integer[] array = { 2, 45, 7, -12, 9 };
for (int i = 0; i < array.length; i++) {
integerQueue.offer(array[i]);
}
Iterator<Integer> iter = integerQueue.iterator();
assertNotNull(iter);
for (int i = 0; i < array.length; i++) {
iter.next();
if (2 == i) {
iter.remove();
}
}
assertEquals(array.length - 1, integerQueue.size());
iter = integerQueue.iterator();
Integer[] newArray = new Integer[array.length - 1];
for (int i = 0; i < newArray.length; i++) {
newArray[i] = iter.next();
}
Arrays.sort(newArray);
for (int i = 0; i < integerQueue.size(); i++) {
assertEquals(newArray[i], integerQueue.poll());
}
}
/**
* iterator iterates through all elements
*/
public void testIterator() {
PriorityQueue q = populatedQueue(SIZE);
Iterator it = q.iterator();
int i;
for (i = 0; it.hasNext(); i++)
assertTrue(q.contains(it.next()));
assertEquals(i, SIZE);
assertIteratorExhausted(it);
}
@Override
public Iterator<IntermediateSampleData<T>> sampleInPartition(Iterator<T> input) {
if (numSamples == 0) {
return emptyIntermediateIterable;
}
// This queue holds fixed number elements with the top K weight for current partition.
PriorityQueue<IntermediateSampleData<T>> queue = new PriorityQueue<IntermediateSampleData<T>>(numSamples);
int index = 0;
IntermediateSampleData<T> smallest = null;
while (input.hasNext()) {
T element = input.next();
if (index < numSamples) {
// Fill the queue with first K elements from input.
queue.add(new IntermediateSampleData<T>(random.nextDouble(), element));
smallest = queue.peek();
} else {
double rand = random.nextDouble();
// Remove the element with the smallest weight, and append current element into the queue.
if (rand > smallest.getWeight()) {
queue.remove();
queue.add(new IntermediateSampleData<T>(rand, element));
smallest = queue.peek();
}
}
index++;
}
return queue.iterator();
}
private void scheduleWorkflows(Map<String, Resource> resourceMap, WorkflowControllerDataProvider cache,
Map<String, Resource> restOfResources, List<String> failureResources,
CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleOutput) {
AssignableInstanceManager assignableInstanceManager = cache.getAssignableInstanceManager();
for (PriorityQueue<WorkflowObject> quotaBasedWorkflowPQ : _quotaBasedWorkflowPQs.values()) {
Iterator<WorkflowObject> it = quotaBasedWorkflowPQ.iterator();
while (it.hasNext()) {
String workflowId = it.next()._workflowId;
Resource resource = resourceMap.get(workflowId);
// TODO : Resource is null could be workflow just created without any IdealState.
// Let's remove this check when Helix is independent from IdealState
if (resource != null) {
try {
WorkflowContext context = _workflowDispatcher
.getOrInitializeWorkflowContext(workflowId, cache.getTaskDataCache());
_workflowDispatcher
.updateWorkflowStatus(workflowId, cache.getWorkflowConfig(workflowId), context,
currentStateOutput, bestPossibleOutput);
String quotaType = getQuotaType(cache.getWorkflowConfig(workflowId));
restOfResources.remove(workflowId);
if (assignableInstanceManager.hasGlobalCapacity(quotaType)) {
_workflowDispatcher.assignWorkflow(workflowId, cache.getWorkflowConfig(workflowId),
context, currentStateOutput, bestPossibleOutput);
} else {
LogUtil.logInfo(logger, _eventId, String.format(
"Fail to schedule new jobs assignment for Workflow %s due to quota %s is full",
workflowId, quotaType));
}
} catch (Exception e) {
LogUtil.logError(logger, _eventId,
"Error computing assignment for Workflow " + workflowId + ". Skipping.", e);
failureResources.add(workflowId);
}
}
}
}
}
/**
* Sample algorithm for the second phase. This operation should be executed as the UDF of
* an all reduce operation.
*
* @param input The intermediate sample output generated in the first phase.
* @return The sampled output.
*/
public Iterator<T> sampleInCoordinator(Iterator<IntermediateSampleData<T>> input) {
if (numSamples == 0) {
return emptyIterable;
}
// This queue holds fixed number elements with the top K weight for the coordinator.
PriorityQueue<IntermediateSampleData<T>> reservoir = new PriorityQueue<IntermediateSampleData<T>>(numSamples);
int index = 0;
IntermediateSampleData<T> smallest = null;
while (input.hasNext()) {
IntermediateSampleData<T> element = input.next();
if (index < numSamples) {
// Fill the queue with first K elements from input.
reservoir.add(element);
smallest = reservoir.peek();
} else {
// If current element weight is larger than the smallest one in queue, remove the element
// with the smallest weight, and append current element into the queue.
if (element.getWeight() > smallest.getWeight()) {
reservoir.remove();
reservoir.add(element);
smallest = reservoir.peek();
}
}
index++;
}
final Iterator<IntermediateSampleData<T>> itr = reservoir.iterator();
return new Iterator<T>() {
@Override
public boolean hasNext() {
return itr.hasNext();
}
@Override
public T next() {
return itr.next().getElement();
}
@Override
public void remove() {
itr.remove();
}
};
}
/**
* Sample algorithm for the second phase. This operation should be executed as the UDF of
* an all reduce operation.
*
* @param input The intermediate sample output generated in the first phase.
* @return The sampled output.
*/
public Iterator<T> sampleInCoordinator(Iterator<IntermediateSampleData<T>> input) {
if (numSamples == 0) {
return emptyIterable;
}
// This queue holds fixed number elements with the top K weight for the coordinator.
PriorityQueue<IntermediateSampleData<T>> reservoir = new PriorityQueue<IntermediateSampleData<T>>(numSamples);
int index = 0;
IntermediateSampleData<T> smallest = null;
while (input.hasNext()) {
IntermediateSampleData<T> element = input.next();
if (index < numSamples) {
// Fill the queue with first K elements from input.
reservoir.add(element);
smallest = reservoir.peek();
} else {
// If current element weight is larger than the smallest one in queue, remove the element
// with the smallest weight, and append current element into the queue.
if (element.getWeight() > smallest.getWeight()) {
reservoir.remove();
reservoir.add(element);
smallest = reservoir.peek();
}
}
index++;
}
final Iterator<IntermediateSampleData<T>> itr = reservoir.iterator();
return new Iterator<T>() {
@Override
public boolean hasNext() {
return itr.hasNext();
}
@Override
public T next() {
return itr.next().getElement();
}
@Override
public void remove() {
itr.remove();
}
};
}
private static void realMain(String[] args) throws Throwable {
final PriorityQueue<Integer> q = new PriorityQueue<>();
Iterator<Integer> it;
//----------------------------------------------------------------
// Empty
//----------------------------------------------------------------
checkQ(q);
check(q.isEmpty());
check(! q.contains(1));
it = q.iterator();
removeIsCurrentlyIllegal(it);
noMoreElements(it);
q.clear();
check(q.isEmpty());
//----------------------------------------------------------------
// Singleton
//----------------------------------------------------------------
q.add(1);
checkQ(q, 1);
check(! q.isEmpty());
check(q.contains(1));
it = q.iterator();
removeIsCurrentlyIllegal(it);
check(it.hasNext());
equal(it.next(), 1);
noMoreElements(it);
remove(it, q);
check(q.isEmpty());
noMoreElements(it);
checkQ(q);
q.clear();
//----------------------------------------------------------------
// @see PriorityQueue.forgetMeNot
//----------------------------------------------------------------
final Integer[] a = {0, 4, 1, 6, 7, 2, 3}; // Carefully chosen!
q.addAll(Arrays.asList(a));
checkQ(q, a);
it = q.iterator();
checkQ(q, a);
removeIsCurrentlyIllegal(it);
checkQ(q, a);
check(it.hasNext());
removeIsCurrentlyIllegal(it);
checkQ(q, a);
check(it.hasNext());
equal(it.next(), 0);
equal(it.next(), 4);
equal(it.next(), 1);
equal(it.next(), 6);
check(it.hasNext());
checkQ(q, a);
remove(it, q);
checkQ(q, 0, 3, 1, 4, 7, 2);
check(it.hasNext());
removeIsCurrentlyIllegal(it);
equal(it.next(), 7);
remove(it, q);
checkQ(q, 0, 2, 1, 4, 3);
check(it.hasNext());
removeIsCurrentlyIllegal(it);
check(it.hasNext());
equal(it.next(), 3);
equal(it.next(), 2);
check(! it.hasNext());
remove(it, q);
checkQ(q, 0, 3, 1, 4);
check(! it.hasNext());
noMoreElements(it);
removeIsCurrentlyIllegal(it);
}
@Override
@Get
public Representation get() {
final String instanceName = (String) getRequest().getAttributes().get("instanceName");
try {
JSONObject responseJson = new JSONObject();
PriorityQueue<InstanceTopicPartitionHolder> currentServingInstance = _helixMirrorMakerManager
.getCurrentServingInstance();
WorkloadInfoRetriever workloadRetriever = _helixMirrorMakerManager.getWorkloadInfoRetriever();
Iterator<InstanceTopicPartitionHolder> iter = currentServingInstance.iterator();
JSONObject instanceMapJson = new JSONObject();
while (iter.hasNext()) {
InstanceTopicPartitionHolder instance = iter.next();
String name = instance.getInstanceName();
if (instanceName == null || instanceName.equals(name)) {
if (!instanceMapJson.containsKey(name)) {
instanceMapJson.put(name, new JSONArray());
}
double totalWorkload = 0;
for (TopicPartition tp : instance.getServingTopicPartitionSet()) {
double tpw = workloadRetriever.topicWorkload(tp.getTopic()).getBytesPerSecondPerPartition();
totalWorkload += tpw;
instanceMapJson.getJSONArray(name).add(tp.getTopic() + "." + tp.getPartition() + ":" + Math.round(tpw));
}
instanceMapJson.getJSONArray(name).add("TOTALWORKLOAD." + instance.getServingTopicPartitionSet().size()
+ ":" + Math.round(totalWorkload));
}
}
responseJson.put("instances", instanceMapJson);
JSONArray blacklistedArray = new JSONArray();
blacklistedArray.addAll(_helixMirrorMakerManager.getBlacklistedInstances());
responseJson.put("blacklisted", blacklistedArray);
JSONArray allInstances = new JSONArray();
allInstances.addAll(_helixMirrorMakerManager.getCurrentLiveInstanceNames());
responseJson.put("allInstances", allInstances);
return new StringRepresentation(responseJson.toJSONString());
} catch (Exception e) {
LOGGER.error("Got error during processing Get request", e);
getResponse().setStatus(Status.SERVER_ERROR_INTERNAL);
return new StringRepresentation(String
.format("Failed to get serving topics for %s, with exception: %s",
instanceName == null ? "all instances" : instanceName, e));
}
}
/**
* Sample algorithm for the second phase. This operation should be executed as the UDF of
* an all reduce operation.
*
* @param input The intermediate sample output generated in the first phase.
* @return The sampled output.
*/
public Iterator<T> sampleInCoordinator(Iterator<IntermediateSampleData<T>> input) {
if (numSamples == 0) {
return emptyIterable;
}
// This queue holds fixed number elements with the top K weight for the coordinator.
PriorityQueue<IntermediateSampleData<T>> reservoir = new PriorityQueue<IntermediateSampleData<T>>(numSamples);
int index = 0;
IntermediateSampleData<T> smallest = null;
while (input.hasNext()) {
IntermediateSampleData<T> element = input.next();
if (index < numSamples) {
// Fill the queue with first K elements from input.
reservoir.add(element);
smallest = reservoir.peek();
} else {
// If current element weight is larger than the smallest one in queue, remove the element
// with the smallest weight, and append current element into the queue.
if (element.getWeight() > smallest.getWeight()) {
reservoir.remove();
reservoir.add(element);
smallest = reservoir.peek();
}
}
index++;
}
final Iterator<IntermediateSampleData<T>> itr = reservoir.iterator();
return new Iterator<T>() {
@Override
public boolean hasNext() {
return itr.hasNext();
}
@Override
public T next() {
return itr.next().getElement();
}
@Override
public void remove() {
itr.remove();
}
};
}
private void partitionDealing(Collection<Node> instances,
TreeMap<String, Integer> toBeReassigned, Map<Node, Set<String>> faultZonePartitionMap,
Map<Node, Node> faultZoneMap, final Map<Node, List<String>> assignmentMap,
final Map<Node, Float> targetPartitionCount, final int randomSeed, int targetAdjustment) {
PriorityQueue<Node> instanceQueue =
new PriorityQueue<>(instances.size(), new Comparator<Node>() {
@Override
public int compare(Node node1, Node node2) {
int node1Load = assignmentMap.containsKey(node1) ? assignmentMap.get(node1).size() : 0;
int node2Load = assignmentMap.containsKey(node2) ? assignmentMap.get(node2).size() : 0;
if (node1Load == node2Load) {
if (_mode.equals(Mode.EVENNESS)) {
// Also consider node target load if mode is evenness
Float node1Target = targetPartitionCount.get(node1);
Float node2Target = targetPartitionCount.get(node2);
if (node1Target != node2Target) {
return node2Target.compareTo(node1Target);
}
}
return new Integer((node1.getName() + randomSeed).hashCode())
.compareTo((node2.getName() + randomSeed).hashCode());
} else {
return node1Load - node2Load;
}
}
});
instanceQueue.addAll(instances);
while (!toBeReassigned.isEmpty()) {
boolean anyPartitionAssigned = false;
Iterator<Node> instanceIter = instanceQueue.iterator();
while (instanceIter.hasNext()) {
Node instance = instanceIter.next();
// Temporary remove the node from queue.
// If any partition assigned to the instance, add it back to reset priority.
instanceIter.remove();
boolean partitionAssignedToInstance = false;
Node faultZone = faultZoneMap.get(instance);
List<String> partitions = assignmentMap.containsKey(instance) ?
assignmentMap.get(instance) :
new ArrayList<String>();
int space = instance.getWeight() <= 0 ? 0
: (int) (Math.floor(targetPartitionCount.get(instance))) + targetAdjustment
- partitions.size();
if (space > 0) {
// Find a pending partition to locate
for (String pendingPartition : toBeReassigned.navigableKeySet()) {
if (!faultZonePartitionMap.get(faultZone).contains(pendingPartition)) {
if (!assignmentMap.containsKey(instance)) {
assignmentMap.put(instance, partitions);
}
partitions.add(pendingPartition);
faultZonePartitionMap.get(faultZone).add(pendingPartition);
if (toBeReassigned.get(pendingPartition) == 1) {
toBeReassigned.remove(pendingPartition);
} else {
toBeReassigned.put(pendingPartition, toBeReassigned.get(pendingPartition) - 1);
}
// if any assignment is made:
// this instance can hold more partitions in the future
partitionAssignedToInstance = true;
break;
}
}
}
if (partitionAssignedToInstance) {
// Reset priority in the queue
instanceQueue.add(instance);
anyPartitionAssigned = true;
break;
}
}
if (!anyPartitionAssigned) {
// if no pending partition is assigned to any instances in this loop, new assignment is not possible
break;
}
}
}