下面列出了java.util.List#wait ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void storeNode(final Node node) throws Exception {
final int connectionNumber = getConnectionPositionForNode(node.getId());
final List<SerializableNode> queue = pendingWriteQueues.get(connectionNumber);
// Submit write request to queue (write will be handled async in a BDBWriter thread)
synchronized (queue) {
while(queue.size() > MAX_ELEMENTS_PER_QUEUE) {
queue.wait();
}
final SerializableNode serializableNode = new SerializableNode(node);
queue.add(serializableNode);
queue.notifyAll();
}
}
public void testPackageCreate() throws Exception {
String randomString = TestUtils.randomString();
Thread[] threads = new Thread[10];
List<String> finishedList = new Vector<String>();
for (int i = 0; i < threads.length; i++) {
Runnable r = new PackageCreateTestThread(randomString, finishedList);
threads[i] = new Thread(r);
threads[i].start();
}
synchronized (finishedList) {
while (finishedList.size() < threads.length) {
finishedList.wait();
}
}
}
public void testPackageCreate() throws Exception {
String randomString = TestUtils.randomString();
Thread[] threads = new Thread[10];
List<String> finishedList = new Vector<String>();
for (int i = 0; i < threads.length; i++) {
Runnable r = new PackageCreateTestThread(randomString, finishedList);
threads[i] = new Thread(r);
threads[i].start();
}
synchronized (finishedList) {
while (finishedList.size() < threads.length) {
finishedList.wait();
}
}
}
@Test
@RepeatRule.Repeat(times = 3)
public void ignores_acks_on_messages_delivered_before_connection_reset() throws Exception {
int nrMessages = 20;
sendNMessagesAsync(nrMessages, 0, publisher).toBlocking().last();
final Observable<Message> consumer = createConsumer();
final Set<Integer> uniqueMessages = new TreeSet<>();
final Set<Long> deliveryTags = new HashSet<>();
final List<Message> seenMessages = new ArrayList<>();
final Subscription subscribe = consumer
.observeOn(Schedulers.io())
.doOnNext(message -> {
log.traceWithParams("Got message", "basicProperties", message.basicProperties);
synchronized (seenMessages) {
seenMessages.add(message);
uniqueMessages.add(Integer.valueOf(message.basicProperties.getMessageId()));
deliveryTags.add(message.envelope.getDeliveryTag());
if (seenMessages.size() == prefetchCount) {
log.infoWithParams("Restarting the rabbitMQ broker");
try {
dockerContainers.rabbit().kill();
dockerContainers.up();
int connectionSize = 0;
while (connectionSize == 0) {
try {
connectionSize = getConnectionNames().size();
} catch (Exception ignored) {
log.infoWithParams("Waiting for connection to be visible to rabbit admin interface.");
Thread.sleep(100);
}
}
log.infoWithParams("Acking messages received before broker was restarted");
for (Message m : seenMessages) {
m.acknowledger.ack();
}
} catch (Exception e) {
log.errorWithParams("TODO this should NEVER happen. (but it can :( )", e);
}
} else if (seenMessages.size() > prefetchCount) {
message.acknowledger.ack();
}
seenMessages.notifyAll();
}
})
.subscribeOn(Schedulers.io())
.onErrorResumeNext(throwable -> {
log.errorWithParams("Error", throwable);
return Observable.empty();
})
.subscribe();
while (uniqueMessages.size() < nrMessages) {
synchronized (seenMessages) {
seenMessages.wait(100);
}
}
subscribe.unsubscribe();
log.infoWithParams("delivery tags", "nr", deliveryTags.size(), "tags", deliveryTags);
assertThat(uniqueMessages.size(), is(nrMessages));
assertThat(deliveryTags.size(), is(nrMessages + prefetchCount));
}
@Override
public void launch() {
String subnetHostname = Machines.findSubnetOrPublicHostname(entity).get();
Set<Entity> seeds = getEntity().getConfig(CassandraNode.INITIAL_SEEDS);
List<Entity> ancestors = getCassandraAncestors();
log.info("Launching " + entity + ": " +
"cluster "+getClusterName()+", " +
"hostname (public) " + getEntity().getAttribute(Attributes.HOSTNAME) + ", " +
"hostname (subnet) " + subnetHostname + ", " +
"seeds "+((CassandraNode)entity).getSeeds()+" (from "+seeds+")");
boolean isFirst = seeds.iterator().next().equals(entity);
if (isClustered() && !isFirst && CassandraDatacenter.WAIT_FOR_FIRST) {
// wait for the first node
long firstStartTime = Entities.submit(entity, DependentConfiguration.attributeWhenReady(
ancestors.get(ancestors.size()-1), CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC)).getUnchecked();
// optionally force a delay before starting subsequent nodes; see comment at CassandraCluster.DELAY_AFTER_FIRST
Duration toWait = Duration.millis(firstStartTime + CassandraDatacenter.DELAY_AFTER_FIRST.toMilliseconds() - System.currentTimeMillis());
if (toWait.toMilliseconds()>0) {
log.info("Launching " + entity + ": delaying launch of non-first node by "+toWait+" to prevent schema disagreements");
Tasks.setBlockingDetails("Pausing to ensure first node has time to start");
Time.sleep(toWait);
Tasks.resetBlockingDetails();
}
}
List<Entity> queuedStart = null;
if (CassandraDatacenter.DELAY_BETWEEN_STARTS!=null && !ancestors.isEmpty()) {
Entity root = ancestors.get(ancestors.size()-1);
// TODO currently use the class as a semaphore; messy, and obviously will not federate;
// should develop a brooklyn framework semaphore (similar to that done on SshMachineLocation)
// and use it - note however the synch block is very very short so relatively safe at least
synchronized (CassandraNode.class) {
queuedStart = root.getAttribute(CassandraDatacenter.QUEUED_START_NODES);
if (queuedStart==null) {
queuedStart = new ArrayList<Entity>();
root.sensors().set(CassandraDatacenter.QUEUED_START_NODES, queuedStart);
}
queuedStart.add(getEntity());
root.sensors().set(CassandraDatacenter.QUEUED_START_NODES, queuedStart);
}
do {
// get it again in case it is backed by something external
queuedStart = root.getAttribute(CassandraDatacenter.QUEUED_START_NODES);
if (queuedStart.get(0).equals(getEntity())) break;
synchronized (queuedStart) {
try {
queuedStart.wait(1000);
} catch (InterruptedException e) {
Exceptions.propagate(e);
}
}
} while (true);
// TODO should look at last start time... but instead we always wait
CassandraDatacenter.DELAY_BETWEEN_STARTS.countdownTimer().waitForExpiryUnchecked();
}
try {
// Relies on `bin/cassandra -p <pidfile>`, rather than us writing pid file ourselves.
newScript(MutableMap.of(USE_PID_FILE, false), LAUNCHING)
.body.append(
// log the date to attempt to debug occasional http://wiki.apache.org/cassandra/FAQ#schema_disagreement
// (can be caused by machines out of synch time-wise; but in our case it seems to be caused by other things!)
"echo date on cassandra server `hostname` when launching is `date`",
launchEssentialCommand(),
"echo after essential command")
.execute();
if (!isClustered()) {
InputStream creationScript = DatastoreMixins.getDatabaseCreationScript(entity);
if (creationScript!=null) {
Tasks.setBlockingDetails("Pausing to ensure Cassandra (singleton) has started before running creation script");
Time.sleep(Duration.seconds(20));
Tasks.resetBlockingDetails();
executeScriptAsync(Streams.readFullyStringAndClose(creationScript));
}
}
if (isClustered() && isFirst) {
for (Entity ancestor: getCassandraAncestors()) {
ancestor.sensors().set(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC, System.currentTimeMillis());
}
}
} finally {
if (queuedStart!=null) {
Entity head = queuedStart.remove(0);
checkArgument(head.equals(getEntity()), "first queued node was "+head+" but we are "+getEntity());
synchronized (queuedStart) {
queuedStart.notifyAll();
}
}
}
}
void moveToDone(final JobID id, boolean sync) {
final List<Path> paths = new ArrayList<Path>();
final Path historyFile = getHistoryFile(id);
if (historyFile == null) {
LOG.info("No file for job-history with " + id + " found in cache!");
} else {
paths.add(historyFile);
}
final Path confPath = getConfFileWriters(id);
if (confPath == null) {
LOG.info("No file for jobconf with " + id + " found in cache!");
} else {
paths.add(confPath);
}
Runnable r = new Runnable() {
public void run() {
//move the files to doneDir folder
try {
List<PrintWriter> writers = getWriters(id);
synchronized (writers) {
while (writers.size() > 0) {
writers.wait();
}
}
URI srcURI = logFs.getUri();
URI doneURI = doneFs.getUri();
boolean useRename = (srcURI.compareTo(doneURI) == 0);
for (Path path : paths) {
//check if path exists, in case of retries it may not exist
if (logFs.exists(path)) {
LOG.info("Moving " + path.toString() + " to " +
doneDir.toString());
Path dstPath = new Path(doneDir, path.getName());
if (useRename) {
doneFs.rename(path, doneDir);
} else {
FileUtil.copy(logFs, path, doneFs, dstPath, true, true, conf);
}
doneFs.setPermission(dstPath,
new FsPermission(HISTORY_FILE_PERMISSION));
}
}
} catch (Throwable e) {
LOG.error("Unable to move history file to DONE folder:\n"
+ StringUtils.stringifyException(e));
}
String historyFileDonePath = null;
if (historyFile != null) {
historyFileDonePath = new Path(doneDir,
historyFile.getName()).toString();
}
jobHistoryFileMap.put(id, new MovedFileInfo(historyFileDonePath,
System.currentTimeMillis()));
jobTracker.historyFileCopied(id, historyFileDonePath);
//purge the job from the cache
purgeJob(id);
}
};
if (sync) {
r.run();
} else {
executor.execute(r);
}
}