java.util.List#wait ( )源码实例Demo

下面列出了java.util.List#wait ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: bboxdb   文件: OSMBDBNodeStore.java
@Override
public void storeNode(final Node node) throws Exception {

	final int connectionNumber = getConnectionPositionForNode(node.getId());
	final List<SerializableNode> queue = pendingWriteQueues.get(connectionNumber);

	// Submit write request to queue (write will be handled async in a BDBWriter thread)
	synchronized (queue) {
		while(queue.size() > MAX_ELEMENTS_PER_QUEUE) {
			queue.wait();
		}
		
		final SerializableNode serializableNode = new SerializableNode(node);
		queue.add(serializableNode);
		queue.notifyAll();
	}
}
 
源代码2 项目: uyuni   文件: PackageCreateTest.java
public void testPackageCreate() throws Exception {
    String randomString = TestUtils.randomString();
    Thread[] threads = new Thread[10];
    List<String> finishedList = new Vector<String>();
    for (int i = 0; i < threads.length; i++) {
        Runnable r = new PackageCreateTestThread(randomString, finishedList);
        threads[i] = new Thread(r);
        threads[i].start();
    }
    synchronized (finishedList) {
        while (finishedList.size() < threads.length) {
            finishedList.wait();
        }
    }
}
 
源代码3 项目: spacewalk   文件: PackageCreateTest.java
public void testPackageCreate() throws Exception {
    String randomString = TestUtils.randomString();
    Thread[] threads = new Thread[10];
    List<String> finishedList = new Vector<String>();
    for (int i = 0; i < threads.length; i++) {
        Runnable r = new PackageCreateTestThread(randomString, finishedList);
        threads[i] = new Thread(r);
        threads[i].start();
    }
    synchronized (finishedList) {
        while (finishedList.size() < threads.length) {
            finishedList.wait();
        }
    }
}
 
源代码4 项目: rxrabbit   文件: RxRabbitTests.java
@Test
@RepeatRule.Repeat(times = 3)
public void ignores_acks_on_messages_delivered_before_connection_reset() throws Exception {
    int nrMessages = 20;
    sendNMessagesAsync(nrMessages, 0, publisher).toBlocking().last();

    final Observable<Message> consumer = createConsumer();
    final Set<Integer> uniqueMessages = new TreeSet<>();
    final Set<Long> deliveryTags = new HashSet<>();
    final List<Message> seenMessages = new ArrayList<>();

    final Subscription subscribe = consumer
            .observeOn(Schedulers.io())
            .doOnNext(message -> {
                log.traceWithParams("Got message", "basicProperties", message.basicProperties);
                synchronized (seenMessages) {
                    seenMessages.add(message);
                    uniqueMessages.add(Integer.valueOf(message.basicProperties.getMessageId()));
                    deliveryTags.add(message.envelope.getDeliveryTag());
                    if (seenMessages.size() == prefetchCount) {
                        log.infoWithParams("Restarting the rabbitMQ broker");
                        try {
                            dockerContainers.rabbit().kill();
                            dockerContainers.up();
                            int connectionSize = 0;
                            while (connectionSize == 0) {
                                try {
                                    connectionSize = getConnectionNames().size();
                                } catch (Exception ignored) {
                                    log.infoWithParams("Waiting for connection to be visible to rabbit admin interface.");
                                    Thread.sleep(100);
                                }
                            }
                            log.infoWithParams("Acking messages received before broker was restarted");
                            for (Message m : seenMessages) {
                                m.acknowledger.ack();
                            }
                        } catch (Exception e) {
                            log.errorWithParams("TODO this should NEVER happen. (but it can :( )", e);
                        }
                    } else if (seenMessages.size() > prefetchCount) {
                        message.acknowledger.ack();
                    }
                    seenMessages.notifyAll();
                }
            })
            .subscribeOn(Schedulers.io())
            .onErrorResumeNext(throwable -> {
                log.errorWithParams("Error", throwable);
                return Observable.empty();
            })
            .subscribe();


    while (uniqueMessages.size() < nrMessages) {
        synchronized (seenMessages) {
            seenMessages.wait(100);
        }
    }

    subscribe.unsubscribe();
    log.infoWithParams("delivery tags", "nr", deliveryTags.size(), "tags", deliveryTags);
    assertThat(uniqueMessages.size(), is(nrMessages));
    assertThat(deliveryTags.size(), is(nrMessages + prefetchCount));
}
 
源代码5 项目: brooklyn-library   文件: CassandraNodeSshDriver.java
@Override
public void launch() {
    String subnetHostname = Machines.findSubnetOrPublicHostname(entity).get();
    Set<Entity> seeds = getEntity().getConfig(CassandraNode.INITIAL_SEEDS);
    List<Entity> ancestors = getCassandraAncestors();
    log.info("Launching " + entity + ": " +
            "cluster "+getClusterName()+", " +
            "hostname (public) " + getEntity().getAttribute(Attributes.HOSTNAME) + ", " +
            "hostname (subnet) " + subnetHostname + ", " +
            "seeds "+((CassandraNode)entity).getSeeds()+" (from "+seeds+")");

    boolean isFirst = seeds.iterator().next().equals(entity);
    if (isClustered() && !isFirst && CassandraDatacenter.WAIT_FOR_FIRST) {
        // wait for the first node
        long firstStartTime = Entities.submit(entity, DependentConfiguration.attributeWhenReady(
            ancestors.get(ancestors.size()-1), CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC)).getUnchecked();
        // optionally force a delay before starting subsequent nodes; see comment at CassandraCluster.DELAY_AFTER_FIRST
        Duration toWait = Duration.millis(firstStartTime + CassandraDatacenter.DELAY_AFTER_FIRST.toMilliseconds() -  System.currentTimeMillis());
        if (toWait.toMilliseconds()>0) {
            log.info("Launching " + entity + ": delaying launch of non-first node by "+toWait+" to prevent schema disagreements");
            Tasks.setBlockingDetails("Pausing to ensure first node has time to start");
            Time.sleep(toWait);
            Tasks.resetBlockingDetails();
        }
    }

    List<Entity> queuedStart = null;
    if (CassandraDatacenter.DELAY_BETWEEN_STARTS!=null && !ancestors.isEmpty()) {
        Entity root = ancestors.get(ancestors.size()-1);
        // TODO currently use the class as a semaphore; messy, and obviously will not federate;
        // should develop a brooklyn framework semaphore (similar to that done on SshMachineLocation)
        // and use it - note however the synch block is very very short so relatively safe at least
        synchronized (CassandraNode.class) {
            queuedStart = root.getAttribute(CassandraDatacenter.QUEUED_START_NODES);
            if (queuedStart==null) {
                queuedStart = new ArrayList<Entity>();
                root.sensors().set(CassandraDatacenter.QUEUED_START_NODES, queuedStart);
            }
            queuedStart.add(getEntity());
            root.sensors().set(CassandraDatacenter.QUEUED_START_NODES, queuedStart);
        }
        do {
            // get it again in case it is backed by something external
            queuedStart = root.getAttribute(CassandraDatacenter.QUEUED_START_NODES);
            if (queuedStart.get(0).equals(getEntity())) break;
            synchronized (queuedStart) {
                try {
                    queuedStart.wait(1000);
                } catch (InterruptedException e) {
                    Exceptions.propagate(e);
                }
            }
        } while (true);

        // TODO should look at last start time... but instead we always wait
        CassandraDatacenter.DELAY_BETWEEN_STARTS.countdownTimer().waitForExpiryUnchecked();
    }

    try {
        // Relies on `bin/cassandra -p <pidfile>`, rather than us writing pid file ourselves.
        newScript(MutableMap.of(USE_PID_FILE, false), LAUNCHING)
                .body.append(
                        // log the date to attempt to debug occasional http://wiki.apache.org/cassandra/FAQ#schema_disagreement
                        // (can be caused by machines out of synch time-wise; but in our case it seems to be caused by other things!)
                        "echo date on cassandra server `hostname` when launching is `date`",
                        launchEssentialCommand(),
                        "echo after essential command")
                .execute();
        if (!isClustered()) {
            InputStream creationScript = DatastoreMixins.getDatabaseCreationScript(entity);
            if (creationScript!=null) {
                Tasks.setBlockingDetails("Pausing to ensure Cassandra (singleton) has started before running creation script");
                Time.sleep(Duration.seconds(20));
                Tasks.resetBlockingDetails();
                executeScriptAsync(Streams.readFullyStringAndClose(creationScript));
            }
        }
        if (isClustered() && isFirst) {
            for (Entity ancestor: getCassandraAncestors()) {
                ancestor.sensors().set(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC, System.currentTimeMillis());
            }
        }
    } finally {
        if (queuedStart!=null) {
            Entity head = queuedStart.remove(0);
            checkArgument(head.equals(getEntity()), "first queued node was "+head+" but we are "+getEntity());
            synchronized (queuedStart) {
                queuedStart.notifyAll();
            }
        }
    }
}
 
源代码6 项目: RDFS   文件: JobHistory.java
void moveToDone(final JobID id, boolean sync) {
  final List<Path> paths = new ArrayList<Path>();
  final Path historyFile = getHistoryFile(id);
  if (historyFile == null) {
    LOG.info("No file for job-history with " + id + " found in cache!");
  } else {
    paths.add(historyFile);
  }

  final Path confPath = getConfFileWriters(id);
  if (confPath == null) {
    LOG.info("No file for jobconf with " + id + " found in cache!");
  } else {
    paths.add(confPath);
  }

  Runnable r = new Runnable() {

    public void run() {
      //move the files to doneDir folder
      try {
        List<PrintWriter> writers = getWriters(id);
        synchronized (writers) {
          while (writers.size() > 0) {
            writers.wait();
          }
        }

        URI srcURI = logFs.getUri();
        URI doneURI = doneFs.getUri();
        boolean useRename = (srcURI.compareTo(doneURI) == 0);

        for (Path path : paths) {
          //check if path exists, in case of retries it may not exist
          if (logFs.exists(path)) {
            LOG.info("Moving " + path.toString() + " to " + 
                doneDir.toString()); 

            Path dstPath = new Path(doneDir, path.getName());
            if (useRename) {
              doneFs.rename(path, doneDir);
            } else {
              FileUtil.copy(logFs, path, doneFs, dstPath, true, true, conf);
            }

            doneFs.setPermission(dstPath,
                                     new FsPermission(HISTORY_FILE_PERMISSION));

          }
        }
      } catch (Throwable e) {
        LOG.error("Unable to move history file to DONE folder:\n"
                  + StringUtils.stringifyException(e));
      }

      String historyFileDonePath = null;
      if (historyFile != null) {
        historyFileDonePath = new Path(doneDir, 
            historyFile.getName()).toString();
      }

      jobHistoryFileMap.put(id, new MovedFileInfo(historyFileDonePath,
          System.currentTimeMillis()));
      jobTracker.historyFileCopied(id, historyFileDonePath);
      
      //purge the job from the cache
      purgeJob(id);
    }

  };

  if (sync) {
    r.run();
  } else {
    executor.execute(r);
  }
}