下面列出了java.util.concurrent.locks.ReentrantLock#isHeldByCurrentThread() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void locked(ReentrantLock lock, Runnable task)
{
try
{
lock.lockInterruptibly();
task.run();
}
catch (InterruptedException e)
{
throw new IllegalStateException(e);
}
finally
{
if (lock.isHeldByCurrentThread())
lock.unlock();
}
}
/**
* Enters this monitor when the guard is satisfied. Blocks indefinitely.
*/
public void enterWhenUninterruptibly(Guard guard) {
if (guard.monitor != this) {
throw new IllegalMonitorStateException();
}
final ReentrantLock lock = this.lock;
boolean signalBeforeWaiting = lock.isHeldByCurrentThread();
lock.lock();
boolean satisfied = false;
try {
if (!guard.isSatisfied()) {
awaitUninterruptibly(guard, signalBeforeWaiting);
}
satisfied = true;
} finally {
if (!satisfied) {
leave();
}
}
}
/**
* Enters this monitor when the guard is satisfied. Blocks indefinitely, but may be interrupted.
*
* @throws InterruptedException if interrupted while waiting
*/
public void enterWhen(Guard guard) throws InterruptedException {
if (guard.monitor != this) {
throw new IllegalMonitorStateException();
}
final ReentrantLock lock = this.lock;
boolean signalBeforeWaiting = lock.isHeldByCurrentThread();
lock.lockInterruptibly();
boolean satisfied = false;
try {
if (!guard.isSatisfied()) {
await(guard, signalBeforeWaiting);
}
satisfied = true;
} finally {
if (!satisfied) {
leave();
}
}
}
public static <E> E locked(ReentrantLock lock, Supplier<E> task)
{
try
{
lock.lockInterruptibly();
return task.get();
}
catch (InterruptedException e)
{
throw new IllegalStateException(e);
}
finally
{
if (lock.isHeldByCurrentThread())
lock.unlock();
}
}
/**
* Enters this monitor when the guard is satisfied. Blocks indefinitely.
*/
public void enterWhenUninterruptibly(Guard guard) {
if (guard.monitor != this) {
throw new IllegalMonitorStateException();
}
final ReentrantLock lock = this.lock;
boolean signalBeforeWaiting = lock.isHeldByCurrentThread();
lock.lock();
boolean satisfied = false;
try {
if (!guard.isSatisfied()) {
awaitUninterruptibly(guard, signalBeforeWaiting);
}
satisfied = true;
} finally {
if (!satisfied) {
leave();
}
}
}
/**
* Enters this monitor when the guard is satisfied. Blocks indefinitely.
*/
public void enterWhenUninterruptibly(Guard guard) {
if (guard.monitor != this) {
throw new IllegalMonitorStateException();
}
final ReentrantLock lock = this.lock;
boolean signalBeforeWaiting = lock.isHeldByCurrentThread();
lock.lock();
boolean satisfied = false;
try {
if (!guard.isSatisfied()) {
awaitUninterruptibly(guard, signalBeforeWaiting);
}
satisfied = true;
} finally {
if (!satisfied) {
leave();
}
}
}
@Override
public Object apply(WarpScriptStack stack) throws WarpScriptException {
Object top = stack.pop();
if (!(top instanceof Macro)) {
throw new WarpScriptException(getName() + " expects a macro on top of the stack.");
}
ReentrantLock lock = (ReentrantLock) stack.getAttribute(CEVAL.CONCURRENT_LOCK_ATTRIBUTE);
try {
if (null != lock) {
lock.lockInterruptibly();
}
stack.exec((Macro) top);
} catch (InterruptedException ie) {
throw new WarpScriptException(ie);
} finally {
if (null != lock && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
return stack;
}
/**
* Fair retrieval of an object in the queue.
* Objects are returned in the order the threads requested them.
* {@inheritDoc}
*/
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
int idx = getNextPoll();
E result = null;
final ReentrantLock lock = this.locks[idx];
try {
//acquire the global lock until we know what to do
lock.lock();
//check to see if we have objects
result = items[idx].poll();
if (result==null && timeout>0) {
//the queue is empty we will wait for an object
ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<>(1);
//add to the bottom of the wait list
waiters[idx].addLast(c);
//unlock the global lock
lock.unlock();
//wait for the specified timeout
if (!c.await(timeout, unit)) {
//if we timed out, remove ourselves from the waitlist
lock.lock();
waiters[idx].remove(c);
lock.unlock();
}
//return the item we received, can be null if we timed out
result = c.getItem();
} else {
//we have an object, release
lock.unlock();
}
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
return result;
}
/**
* Fair retrieval of an object in the queue.
* Objects are returned in the order the threads requested them.
* {@inheritDoc}
*/
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
int idx = getNextPoll();
E result = null;
final ReentrantLock lock = this.locks[idx];
boolean error = true;
//acquire the global lock until we know what to do
lock.lock();
try {
//check to see if we have objects
result = items[idx].poll();
if (result==null && timeout>0) {
//the queue is empty we will wait for an object
ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<E>(1);
//add to the bottom of the wait list
waiters[idx].addLast(c);
//unlock the global lock
lock.unlock();
//wait for the specified timeout
if (!c.await(timeout, unit)) {
//if we timed out, remove ourselves from the waitlist
lock.lock();
waiters[idx].remove(c);
lock.unlock();
}
//return the item we received, can be null if we timed out
result = c.getItem();
} else {
//we have an object, release
lock.unlock();
}
error = false;
} finally {
if (error && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
return result;
}
/**
* Fair retrieval of an object in the queue.
* Objects are returned in the order the threads requested them.
* {@inheritDoc}
*/
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
int idx = getNextPoll();
E result = null;
final ReentrantLock lock = this.locks[idx];
boolean error = true;
//acquire the global lock until we know what to do
lock.lock();
try {
//check to see if we have objects
result = items[idx].poll();
if (result==null && timeout>0) {
//the queue is empty we will wait for an object
ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<E>(1);
//add to the bottom of the wait list
waiters[idx].addLast(c);
//unlock the global lock
lock.unlock();
//wait for the specified timeout
if (!c.await(timeout, unit)) {
//if we timed out, remove ourselves from the waitlist
lock.lock();
waiters[idx].remove(c);
lock.unlock();
}
//return the item we received, can be null if we timed out
result = c.getItem();
} else {
//we have an object, release
lock.unlock();
}
error = false;
} finally {
if (error && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
return result;
}
/**
* Request an item from the queue asynchronously
* @return - a future pending the result from the queue poll request
*/
public Future<E> pollAsync() {
Future<E> result = null;
final ReentrantLock lock = this.lock;
boolean error = true;
//grab the global lock
lock.lock();
try {
//check to see if we have objects in the queue
E item = items.poll();
if (item==null) {
//queue is empty, add ourselves as waiters
ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<E>(1);
waiters.addLast(c);
lock.unlock();
//return a future that will wait for the object
result = new ItemFuture<E>(c);
} else {
lock.unlock();
//return a future with the item
result = new ItemFuture<E>(item);
}
error = false;
} finally {
if (error && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
return result;
}
/**
* Unlocks the given key
* @param key
*/
public void unlock(K key) {
ReentrantLock lock = getLock(key);
if (!lock.isHeldByCurrentThread()) {
throw new IllegalStateException("Cannot release lock not held by current thread: " + key);
}
lock.unlock();
}
/**
* Request an item from the queue asynchronously
* @return - a future pending the result from the queue poll request
*/
public Future<E> pollAsync() {
Future<E> result = null;
final ReentrantLock lock = this.lock;
boolean error = true;
//grab the global lock
lock.lock();
try {
//check to see if we have objects in the queue
E item = items.poll();
if (item==null) {
//queue is empty, add ourselves as waiters
ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<E>(1);
waiters.addLast(c);
lock.unlock();
//return a future that will wait for the object
result = new ItemFuture<E>(c);
} else {
lock.unlock();
//return a future with the item
result = new ItemFuture<E>(item);
}
error = false;
} finally {
if (error && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
return result;
}
/**
* Acquires lock if the current thread does not hold it already, executes code and returns. The
* passed lambda may throw an exception, which is then thrown by this method.
*
* @param <T>
* @param lock
* @param code
* @throws T
*/
public static < T extends Throwable > void lockAndThen( ReentrantLock lock, ExceptionalRunnable< T > code )
throws T {
if( lock.isHeldByCurrentThread() ) {
code.run();
} else {
lock.lock();
try {
code.run();
} finally {
lock.unlock();
}
}
}
/**
* Fair retrieval of an object in the queue.
* Objects are returned in the order the threads requested them.
* {@inheritDoc}
*/
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E result = null;
final ReentrantLock lock = this.lock;
//acquire the global lock until we know what to do
lock.lock();
try {
//check to see if we have objects
result = items.poll();
if (result==null && timeout>0) {
//the queue is empty we will wait for an object
ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<>(1);
//add to the bottom of the wait list
waiters.addLast(c);
//unlock the global lock
lock.unlock();
boolean didtimeout = true;
InterruptedException interruptedException = null;
try {
//wait for the specified timeout
didtimeout = !c.await(timeout, unit);
} catch (InterruptedException ix) {
interruptedException = ix;
}
if (didtimeout) {
//if we timed out, or got interrupted
// remove ourselves from the waitlist
lock.lock();
try {
waiters.remove(c);
} finally {
lock.unlock();
}
}
//return the item we received, can be null if we timed out
result = c.getItem();
if (null!=interruptedException) {
//we got interrupted
if ( null!=result) {
//we got a result - clear the interrupt status
//don't propagate cause we have removed a connection from pool
Thread.interrupted();
} else {
throw interruptedException;
}
}
} else {
//we have an object, release
lock.unlock();
}
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
return result;
}
/**
* Fair retrieval of an object in the queue.
* Objects are returned in the order the threads requested them.
* {@inheritDoc}
*/
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E result = null;
final ReentrantLock lock = this.lock;
boolean error = true;
//acquire the global lock until we know what to do
lock.lock();
try {
//check to see if we have objects
result = items.poll();
if (result==null && timeout>0) {
//the queue is empty we will wait for an object
ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<E>(1);
//add to the bottom of the wait list
waiters.addLast(c);
//unlock the global lock
lock.unlock();
//wait for the specified timeout
boolean didtimeout = true;
InterruptedException interruptedException = null;
try {
//wait for the specified timeout
didtimeout = !c.await(timeout, unit);
} catch (InterruptedException ix) {
interruptedException = ix;
}
if (didtimeout) {
//if we timed out, or got interrupted
// remove ourselves from the waitlist
lock.lock();
try {
waiters.remove(c);
} finally {
lock.unlock();
}
}
//return the item we received, can be null if we timed out
result = c.getItem();
if (null!=interruptedException) {
//we got interrupted
if (null!=result) {
//we got a result - clear the interrupt status
//don't propagate cause we have removed a connection from pool
Thread.interrupted();
} else {
throw interruptedException;
}
}
} else {
//we have an object, release
lock.unlock();
}
error = false;
} finally {
if (error && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
return result;
}
/**
* Enters this monitor when the guard is satisfied. Blocks at most the given time, including both
* the time to acquire the lock and the time to wait for the guard to be satisfied, and may be
* interrupted.
*
* @return whether the monitor was entered, which guarantees that the guard is now satisfied
* @throws InterruptedException if interrupted while waiting
*/
public boolean enterWhen(Guard guard, long time, TimeUnit unit) throws InterruptedException {
final long timeoutNanos = toSafeNanos(time, unit);
if (guard.monitor != this) {
throw new IllegalMonitorStateException();
}
final ReentrantLock lock = this.lock;
boolean reentrant = lock.isHeldByCurrentThread();
long startTime = 0L;
locked:
{
if (!fair) {
// Check interrupt status to get behavior consistent with fair case.
if (Thread.interrupted()) {
throw new InterruptedException();
}
if (lock.tryLock()) {
break locked;
}
}
startTime = initNanoTime(timeoutNanos);
if (!lock.tryLock(time, unit)) {
return false;
}
}
boolean satisfied = false;
boolean threw = true;
try {
satisfied =
guard.isSatisfied()
|| awaitNanos(guard,
(startTime == 0L) ? timeoutNanos : remainingNanos(startTime, timeoutNanos), reentrant);
threw = false;
return satisfied;
} finally {
if (!satisfied) {
try {
// Don't need to signal if timed out, but do if interrupted
if (threw && !reentrant) {
signalNextWaiter();
}
} finally {
lock.unlock();
}
}
}
}
/**
* Enters this monitor when the guard is satisfied. Blocks at most the given time, including both
* the time to acquire the lock and the time to wait for the guard to be satisfied, and may be
* interrupted.
*
* @return whether the monitor was entered, which guarantees that the guard is now satisfied
* @throws InterruptedException if interrupted while waiting
*/
public boolean enterWhen(Guard guard, long time, TimeUnit unit) throws InterruptedException {
final long timeoutNanos = toSafeNanos(time, unit);
if (guard.monitor != this) {
throw new IllegalMonitorStateException();
}
final ReentrantLock lock = this.lock;
boolean reentrant = lock.isHeldByCurrentThread();
long startTime = 0L;
locked:
{
if (!fair) {
// Check interrupt status to get behavior consistent with fair case.
if (Thread.interrupted()) {
throw new InterruptedException();
}
if (lock.tryLock()) {
break locked;
}
}
startTime = initNanoTime(timeoutNanos);
if (!lock.tryLock(time, unit)) {
return false;
}
}
boolean satisfied = false;
boolean threw = true;
try {
satisfied =
guard.isSatisfied()
|| awaitNanos(guard,
(startTime == 0L) ? timeoutNanos : remainingNanos(startTime, timeoutNanos), reentrant);
threw = false;
return satisfied;
} finally {
if (!satisfied) {
try {
// Don't need to signal if timed out, but do if interrupted
if (threw && !reentrant) {
signalNextWaiter();
}
} finally {
lock.unlock();
}
}
}
}
/**
* Fair retrieval of an object in the queue.
* Objects are returned in the order the threads requested them.
* {@inheritDoc}
*/
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E result = null;
final ReentrantLock lock = this.lock;
boolean error = true;
//acquire the global lock until we know what to do
lock.lock();
try {
//check to see if we have objects
result = items.poll();
if (result==null && timeout>0) {
//the queue is empty we will wait for an object
ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<E>(1);
//add to the bottom of the wait list
waiters.addLast(c);
//unlock the global lock
lock.unlock();
//wait for the specified timeout
boolean didtimeout = true;
InterruptedException interruptedException = null;
try {
//wait for the specified timeout
didtimeout = !c.await(timeout, unit);
} catch (InterruptedException ix) {
interruptedException = ix;
}
if (didtimeout) {
//if we timed out, or got interrupted
// remove ourselves from the waitlist
lock.lock();
try {
waiters.remove(c);
} finally {
lock.unlock();
}
}
//return the item we received, can be null if we timed out
result = c.getItem();
if (null!=interruptedException) {
//we got interrupted
if (null!=result) {
//we got a result - clear the interrupt status
//don't propagate cause we have removed a connection from pool
Thread.interrupted();
} else {
throw interruptedException;
}
}
} else {
//we have an object, release
lock.unlock();
}
error = false;
} finally {
if (error && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
return result;
}
@Override
public Object apply(WarpScriptStack stack) throws WarpScriptException {
Object mutexo = stack.getAttribute(MUTEX.MUTEX_ATTRIBUTE + stack.getUUID());
if (null == mutexo) {
throw new WarpScriptException(getName() + " can only be called when in a MUTEX section.");
}
String mutex = String.valueOf(mutexo);
Object top = stack.pop();
if (!(top instanceof String)) {
throw new WarpScriptException(getName() + " expects a symbol name on top of the stack.");
}
String symbol = String.valueOf(top);
top = stack.pop();
ReentrantLock lock = SharedMemoryWarpScriptExtension.getLock(mutex);
if (!lock.isHeldByCurrentThread()) {
throw new WarpScriptException(getName() + " expects the mutex '" + mutex + "' to be held when calling " + getName());
}
SharedMemoryWarpScriptExtension.store(symbol, mutex, top);
return stack;
}