下面列出了怎么用java.util.concurrent.locks.Condition的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
if (topic.equals(MqttProviderConstants.RESPONSE_TOPIC)) {
// TODO How to handle different response types?
DiozeroProtos.Response response = DiozeroProtos.Response.parseFrom(message.getPayload());
Logger.info("Got response message: {}", response);
String correlation_id = response.getCorrelationId();
responses.put(correlation_id, response);
Condition condition = conditions.remove(correlation_id);
if (condition == null) {
Logger.error("No condition for correlation id {}", correlation_id);
} else {
lock.lock();
try {
condition.signalAll();
} finally {
lock.unlock();
}
}
} else {
Logger.warn("Unrecognised topic {}", topic);
}
}
@Test
void testEstablishConnectionNotPossible() throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
ConnectionManager connectionManagerMock = mock(ConnectionManager.class);
when(connectionManagerMock.tryToEstablishConnection()).thenReturn(false);
when(connectionManagerMock.getState()).thenReturn(ConnectionState.NEVER_CONNECTED);
ConnectionManagerWatcherThread sut =
new ConnectionManagerWatcherThread(lock, condition, connectionManagerMock, 50);
sut.start();
Thread.sleep(500);
assertTrue(sut.isAlive());
verify(connectionManagerMock, atLeast(2)).tryToEstablishConnection();
assertEquals(State.TIMED_WAITING, sut.getState());
killThreadAndVerifyState(sut);
}
public void clear() {
final ReentrantLock lock = this.mLock;
lock.lock();
try {
if (this.mSize > 0) {
final SparseArray<IList<T>> queues = this.mQueues;
final int queueCount = queues.size();
for (int i = 0; i < queueCount; i++) {
final int priority = this.mQueues.keyAt(i);
final IList<T> queue = this.mQueues.valueAt(i);
queue.clear();
final Condition notFullCondition = this.mNotFullConditions.get(priority);
notFullCondition.signal();
}
this.mSize = 0;
}
} finally {
lock.unlock();
}
}
@Test
public void test7() throws Exception {
Queue<String> cachePoolQueue = new LinkedList<>();
ReentrantLock reentrantLock = new ReentrantLock();
Condition condition = reentrantLock.newCondition();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.submit(() -> {
Producer2 producer = new Producer2(cachePoolQueue, reentrantLock, condition);
while (true) {
producer.produce();
}
});
for (int i = 0; i < 5; i++) {
executorService.submit(() -> {
Consumer2 consumer = new Consumer2(cachePoolQueue, reentrantLock, condition);
while (true) {
consumer.consume();
}
});
}
executorService.shutdown();
TimeUnit.SECONDS.sleep(5);
}
public void testSignalAll(boolean fair, final AwaitMethod awaitMethod) {
final PublicReentrantLock lock = new PublicReentrantLock(fair);
final Condition c = lock.newCondition();
final CountDownLatch pleaseSignal = new CountDownLatch(2);
class Awaiter extends CheckedRunnable {
public void realRun() throws InterruptedException {
lock.lock();
pleaseSignal.countDown();
await(c, awaitMethod);
lock.unlock();
}
}
Thread t1 = newStartedThread(new Awaiter());
Thread t2 = newStartedThread(new Awaiter());
await(pleaseSignal);
lock.lock();
assertHasWaiters(lock, c, t1, t2);
c.signalAll();
assertHasNoWaiters(lock, c);
lock.unlock();
awaitTermination(t1);
awaitTermination(t2);
}
public EventQueue() {
for (int i = 0; i < NUM_PRIORITIES; i++) {
queues[i] = new Queue();
}
/*
* NOTE: if you ever have to start the associated event dispatch
* thread at this point, be aware of the following problem:
* If this EventQueue instance is created in
* SunToolkit.createNewAppContext() the started dispatch thread
* may call AppContext.getAppContext() before createNewAppContext()
* completes thus causing mess in thread group to appcontext mapping.
*/
appContext = AppContext.getAppContext();
pushPopLock = (Lock)appContext.get(AppContext.EVENT_QUEUE_LOCK_KEY);
pushPopCond = (Condition)appContext.get(AppContext.EVENT_QUEUE_COND_KEY);
}
/**
* Awaits condition "indefinitely" using the specified AwaitMethod.
*/
void await(Condition c, AwaitMethod awaitMethod)
throws InterruptedException {
long timeoutMillis = 2 * LONG_DELAY_MS;
switch (awaitMethod) {
case await:
c.await();
break;
case awaitTimed:
assertTrue(c.await(timeoutMillis, MILLISECONDS));
break;
case awaitNanos:
long timeoutNanos = MILLISECONDS.toNanos(timeoutMillis);
long nanosRemaining = c.awaitNanos(timeoutNanos);
assertTrue(nanosRemaining > timeoutNanos / 2);
assertTrue(nanosRemaining <= timeoutNanos);
break;
case awaitUntil:
assertTrue(c.awaitUntil(delayedDate(timeoutMillis)));
break;
default:
throw new AssertionError();
}
}
public void testGetWaitQueueLength(boolean fair) {
final PublicReentrantReadWriteLock lock =
new PublicReentrantReadWriteLock(fair);
final Condition c = lock.writeLock().newCondition();
final CountDownLatch locked = new CountDownLatch(1);
Thread t = newStartedThread(new CheckedRunnable() {
public void realRun() throws InterruptedException {
lock.writeLock().lock();
assertEquals(0, lock.getWaitQueueLength(c));
locked.countDown();
c.await();
lock.writeLock().unlock();
}});
await(locked);
lock.writeLock().lock();
assertHasWaiters(lock, c, t);
assertEquals(1, lock.getWaitQueueLength(c));
c.signal();
assertHasNoWaiters(lock, c);
assertEquals(0, lock.getWaitQueueLength(c));
lock.writeLock().unlock();
awaitTermination(t);
}
/**
* Constructor for AppContext. This method is <i>not</i> public,
* nor should it ever be used as such. The proper way to construct
* an AppContext is through the use of SunToolkit.createNewAppContext.
* A ThreadGroup is created for the new AppContext, a Thread is
* created within that ThreadGroup, and that Thread calls
* SunToolkit.createNewAppContext before calling anything else.
* That creates both the new AppContext and its EventQueue.
*
* @param threadGroup The ThreadGroup for the new AppContext
* @see sun.awt.SunToolkit
* @since 1.2
*/
AppContext(ThreadGroup threadGroup) {
numAppContexts.incrementAndGet();
this.threadGroup = threadGroup;
threadGroup2appContext.put(threadGroup, this);
this.contextClassLoader =
AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
public ClassLoader run() {
return Thread.currentThread().getContextClassLoader();
}
});
// Initialize push/pop lock and its condition to be used by all the
// EventQueues within this AppContext
Lock eventQueuePushPopLock = new ReentrantLock();
put(EVENT_QUEUE_LOCK_KEY, eventQueuePushPopLock);
Condition eventQueuePushPopCond = eventQueuePushPopLock.newCondition();
put(EVENT_QUEUE_COND_KEY, eventQueuePushPopCond);
}
/**
* Constructor for AppContext. This method is <i>not</i> public,
* nor should it ever be used as such. The proper way to construct
* an AppContext is through the use of SunToolkit.createNewAppContext.
* A ThreadGroup is created for the new AppContext, a Thread is
* created within that ThreadGroup, and that Thread calls
* SunToolkit.createNewAppContext before calling anything else.
* That creates both the new AppContext and its EventQueue.
*
* @param threadGroup The ThreadGroup for the new AppContext
* @see sun.awt.SunToolkit
* @since 1.2
*/
AppContext(ThreadGroup threadGroup) {
numAppContexts.incrementAndGet();
this.threadGroup = threadGroup;
threadGroup2appContext.put(threadGroup, this);
this.contextClassLoader =
AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
public ClassLoader run() {
return Thread.currentThread().getContextClassLoader();
}
});
// Initialize push/pop lock and its condition to be used by all the
// EventQueues within this AppContext
Lock eventQueuePushPopLock = new ReentrantLock();
put(EVENT_QUEUE_LOCK_KEY, eventQueuePushPopLock);
Condition eventQueuePushPopCond = eventQueuePushPopLock.newCondition();
put(EVENT_QUEUE_COND_KEY, eventQueuePushPopCond);
}
@Override
public void onReceive(I2CEvent event) {
Logger.debug(event);
Integer register = Integer.valueOf(event.getRegister());
lock.lock();
try {
Condition condition = conditions.get(register);
if (condition == null) {
Logger.warn("Got an I2C event for a register ({}) not being monitored", register);
} else {
LinkedList<I2CEvent> event_queue = eventQueues.get(register);
if (event_queue == null) {
event_queue = new LinkedList<>();
eventQueues.put(register, event_queue);
}
event_queue.addLast(event);
condition.signalAll();
}
} finally {
lock.unlock();
}
event.getRegister();
}
public EventQueue() {
for (int i = 0; i < NUM_PRIORITIES; i++) {
queues[i] = new Queue();
}
/*
* NOTE: if you ever have to start the associated event dispatch
* thread at this point, be aware of the following problem:
* If this EventQueue instance is created in
* SunToolkit.createNewAppContext() the started dispatch thread
* may call AppContext.getAppContext() before createNewAppContext()
* completes thus causing mess in thread group to appcontext mapping.
*/
appContext = AppContext.getAppContext();
pushPopLock = (Lock)appContext.get(AppContext.EVENT_QUEUE_LOCK_KEY);
pushPopCond = (Condition)appContext.get(AppContext.EVENT_QUEUE_COND_KEY);
}
public EventQueue() {
for (int i = 0; i < NUM_PRIORITIES; i++) {
queues[i] = new Queue();
}
/*
* NOTE: if you ever have to start the associated event dispatch
* thread at this point, be aware of the following problem:
* If this EventQueue instance is created in
* SunToolkit.createNewAppContext() the started dispatch thread
* may call AppContext.getAppContext() before createNewAppContext()
* completes thus causing mess in thread group to appcontext mapping.
*/
appContext = AppContext.getAppContext();
pushPopLock = (Lock)appContext.get(AppContext.EVENT_QUEUE_LOCK_KEY);
pushPopCond = (Condition)appContext.get(AppContext.EVENT_QUEUE_COND_KEY);
}
public Unlock lock(Collection<Long> ids) {
iLock.lock();
try {
if (ids == null || ids.isEmpty()) return new Unlock(ids);
iLog.debug("Locking " + ids + " ...");
Condition otherCondition = null;
while ((otherCondition = hasLock(ids)) != null)
otherCondition.awaitUninterruptibly();
Condition myCondition = iLock.newCondition();
for (Long id: ids)
iIndividualLocks.put(id, myCondition);
iLog.debug("Locked: " + ids);
return new Unlock(ids);
} finally {
iLock.unlock();
}
}
public void testAwait_Timeout(boolean fair) {
final ReentrantLock lock = new ReentrantLock(fair);
final Condition c = lock.newCondition();
final long timeoutMillis = timeoutMillis();
lock.lock();
final long startTime = System.nanoTime();
try {
assertFalse(c.await(timeoutMillis, MILLISECONDS));
} catch (InterruptedException fail) { threadUnexpectedException(fail); }
assertTrue(millisElapsedSince(startTime) >= timeoutMillis);
lock.unlock();
}
@Override
public void processEvent(@NonNull ITmfEvent event) {
fLock.lock();
try {
Condition cond = fNextEventSignal;
if (cond != null) {
cond.await();
}
} catch (InterruptedException e) {
} finally {
super.processEvent(event);
fLock.unlock();
}
}
ConnectionManagerWatcherThread(ReentrantLock connectionManagerLock,
Condition noConnectionCondition, ConnectionManager connectionManager,
long connectRetryWaitTime) {
this.threadStopper = new ThreadStopper();
this.connectionManagerLock = connectionManagerLock;
this.noConnectionCondition = noConnectionCondition;
this.connectionManager = connectionManager;
this.connectRetryWaitTime = connectRetryWaitTime;
this.setDaemon(true);
this.setName("rabbitmq-cdi connect thread");
}
protected void setExpectedResult(Lock waitingLock,
int numOwnedMonitors,
Map<String, Lock[]> ownedMonitors,
Condition waitingSync,
int numOwnedSyncs,
Map<String, ReentrantLock[]> ownedSyncs) {
this.waitingLock = waitingLock;
this.numOwnedMonitors = numOwnedMonitors;
this.ownedMonitors = ownedMonitors;
this.waitingSync = waitingSync;
this.numOwnedSyncs = numOwnedSyncs;
this.ownedSyncs = ownedSyncs;
}
private Condition hasLock(Collection<Long> ids) {
if (iAllLocked != null) return iAllLocked;
for (Long id: ids) {
Condition c = iIndividualLocks.get(id);
if (c != null) return c;
}
return null;
}
public void testGetWaitingThreadsIMSE(boolean fair) {
final PublicReentrantReadWriteLock lock =
new PublicReentrantReadWriteLock(fair);
final Condition c = lock.writeLock().newCondition();
try {
lock.getWaitingThreads(c);
shouldThrow();
} catch (IllegalMonitorStateException success) {}
}
public void testAwaitLockCount(boolean fair) {
final PublicReentrantReadWriteLock lock =
new PublicReentrantReadWriteLock(fair);
final Condition c = lock.writeLock().newCondition();
final CountDownLatch locked = new CountDownLatch(2);
Thread t1 = newStartedThread(new CheckedRunnable() {
public void realRun() throws InterruptedException {
lock.writeLock().lock();
assertWriteLockedByMoi(lock);
assertEquals(1, lock.writeLock().getHoldCount());
locked.countDown();
c.await();
assertWriteLockedByMoi(lock);
assertEquals(1, lock.writeLock().getHoldCount());
lock.writeLock().unlock();
}});
Thread t2 = newStartedThread(new CheckedRunnable() {
public void realRun() throws InterruptedException {
lock.writeLock().lock();
lock.writeLock().lock();
assertWriteLockedByMoi(lock);
assertEquals(2, lock.writeLock().getHoldCount());
locked.countDown();
c.await();
assertWriteLockedByMoi(lock);
assertEquals(2, lock.writeLock().getHoldCount());
lock.writeLock().unlock();
lock.writeLock().unlock();
}});
await(locked);
lock.writeLock().lock();
assertHasWaiters(lock, c, t1, t2);
c.signalAll();
assertHasNoWaiters(lock, c);
lock.writeLock().unlock();
awaitTermination(t1);
awaitTermination(t2);
}
public synchronized Condition newCondition(String conditionName) {
ConditionImpl result = conditionMap.get(conditionName);
if(result == null) {
result = new ConditionImpl(this, conditionName);
conditionMap.put(conditionName, result);
}
return result;
}
/**
* 等待时间
*
* @param condition 信号量
* @return 是否成功
*/
public static boolean awaitQuiet(final Condition condition) {
if (condition == null) {
return false;
}
try {
condition.await();
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
public void testAwaitNanos_Timeout(boolean fair) {
try {
final ReentrantLock lock = new ReentrantLock(fair);
final Condition c = lock.newCondition();
lock.lock();
long startTime = System.nanoTime();
long timeoutMillis = 10;
long timeoutNanos = MILLISECONDS.toNanos(timeoutMillis);
long nanosRemaining = c.awaitNanos(timeoutNanos);
assertTrue(nanosRemaining <= 0);
assertTrue(millisElapsedSince(startTime) >= timeoutMillis);
lock.unlock();
} catch (InterruptedException fail) { threadUnexpectedException(fail); }
}
public void testAwait_Timeout(boolean fair) {
try {
final ReentrantReadWriteLock lock =
new ReentrantReadWriteLock(fair);
final Condition c = lock.writeLock().newCondition();
lock.writeLock().lock();
long startTime = System.nanoTime();
long timeoutMillis = 10;
assertFalse(c.await(timeoutMillis, MILLISECONDS));
assertTrue(millisElapsedSince(startTime) >= timeoutMillis);
lock.writeLock().unlock();
} catch (InterruptedException fail) { threadUnexpectedException(fail); }
}
ShowDataDistributionListener(String dataNode, ReentrantLock lock, Condition cond, Map<String, Integer> results, AtomicBoolean succeed) {
this.dataNode = dataNode;
this.lock = lock;
this.cond = cond;
this.results = results;
this.succeed = succeed;
}
/**
* Signal for the next event to be processed. Calling this method makes
* sense only if {@link #setThrottling(boolean)} has been set to true
*/
public void signalNextEvent() {
fLock.lock();
try {
Condition cond = fNextEventSignal;
if (cond != null) {
cond.signalAll();
}
} finally {
fLock.unlock();
}
}
public void testGetWaitingThreadsIAE(boolean fair) {
final PublicReentrantLock lock = new PublicReentrantLock(fair);
final Condition c = lock.newCondition();
final PublicReentrantLock lock2 = new PublicReentrantLock(fair);
try {
lock2.getWaitingThreads(c);
shouldThrow();
} catch (IllegalArgumentException success) {}
}
/** Interrupt every thread on this node waiting on this lock. */
private synchronized void interruptAll() {
// First release all threads waiting on associated condition queues.
if (!conditionMap.isEmpty()) {
// Temporarily obtain ownership of the lock,
// in order to signal all conditions.
UUID tempUUID = getOwnerNode();
long tempThreadID = currentOwnerThreadId;
setCurrentOwnerNode(thisNode);
currentOwnerThreadId = Thread.currentThread().getId();
for (Condition c : conditionMap.values())
c.signalAll();
// Restore owner node and owner thread.
setCurrentOwnerNode(tempUUID);
currentOwnerThreadId = tempThreadID;
}
// Interrupt any future call to acquire/release on this sync object.
interruptAll = true;
// Interrupt any ongoing transactions.
for (Thread t: getQueuedThreads())
t.interrupt();
}
@Override
public void handleResponse(Channel channel, Object msg) {
if (msg == null) {
return;
}
TaskMessage message = (TaskMessage) msg;
short type = message.get_type();
if (type == TaskMessage.BACK_PRESSURE_REQUEST) {
byte[] messageData = message.message();
ByteBuffer buffer = ByteBuffer.allocate(Integer.SIZE + 1);
buffer.put(messageData);
buffer.flip();
boolean startFlowCtrl = buffer.get() == 1;
int targetTaskId = buffer.getInt();
//LOG.info("Received flow ctrl ({}) for target task-{}", startFlowCtrl, targetTaskId);
if (startFlowCtrl) {
addFlowControl(channel, targetTaskId);
} else {
Pair<Lock, Condition> pair = removeFlowControl(targetTaskId);
/*if (pair != null) {
try {
pair.getFirst().lock();
pair.getSecond().signalAll();
} finally {
pair.getFirst().unlock();
}
}*/
}
} else {
LOG.warn("Unexpected message (type={}) was received from task {}", type, message.task());
}
}