下面列出了java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#unlock() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
*
* @param otherMap
* @author tanyaowu
*/
public void putAll(Map<K, V> otherMap) {
if (otherMap == null || otherMap.isEmpty()) {
return;
}
WriteLock writeLock = this.writeLock();
writeLock.lock();
try {
Map<K, V> map = this.getObj();
map.putAll(otherMap);
} catch (Throwable e) {
log.error(e.getMessage(), e);
} finally {
writeLock.unlock();
}
}
/**
*
* @param t
* @return
* @author tanyaowu
*/
public boolean remove(T t) {
WriteLock writeLock = this.writeLock();
writeLock.lock();
try {
Set<T> set = this.getObj();
return set.remove(t);
} catch (Throwable e) {
log.error(e.getMessage(), e);
} finally {
writeLock.unlock();
}
return false;
}
/**
* {@inheritDoc}
*/
@Override
public void returnObject(final T obj) {
final WriteLock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
pool.returnObject(obj);
} catch (final Exception e) {
// swallowed as of Pool 2
} finally {
writeLock.unlock();
}
}
/**
* {@inheritDoc}
*/
@Override
public void addObject(final K key) throws Exception,
IllegalStateException, UnsupportedOperationException {
final WriteLock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
keyedPool.addObject(key);
} finally {
writeLock.unlock();
}
}
/**
* {@inheritDoc}
*/
@Override
public void invalidateObject(final K key, final V obj) {
final WriteLock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
keyedPool.invalidateObject(key, obj);
} catch (final Exception e) {
// swallowed as of Pool 2
} finally {
writeLock.unlock();
}
}
/**
* {@inheritDoc}
*/
@Override
public void clear() throws Exception, UnsupportedOperationException {
final WriteLock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
pool.clear();
} finally {
writeLock.unlock();
}
}
/** Adds a value to the counter associated with a given key.
*
* @param array a byte array.
* @param offset the first valid byte in {@code array}.
* @param length the number of valid elements in {@code array}.
* @param delta a value to be added to the counter associated with the specified key.
* @return the previous value of the counter associated with the specified key.
*/
public int addTo(final byte[] array, final int offset, final int length, final int delta) {
final long hash = MurmurHash3.hash(array, offset, length);
final WriteLock writeLock = lock[(int)(hash >>> shift)].writeLock();
try {
writeLock.lock();
return stripe[(int)(hash >>> shift)].addTo(array, offset, length, hash, delta);
}
finally {
writeLock.unlock();
}
}
/**
* {@inheritDoc}
*/
@Override
public V borrowObject(final K key) throws Exception,
NoSuchElementException, IllegalStateException {
final WriteLock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
return keyedPool.borrowObject(key);
} finally {
writeLock.unlock();
}
}
/**
* {@inheritDoc}
*/
@Override
public void returnObject(final K key, final V obj) {
final WriteLock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
keyedPool.returnObject(key, obj);
} catch (final Exception e) {
// swallowed
} finally {
writeLock.unlock();
}
}
/**
* {@inheritDoc}
*/
@Override
public void invalidateObject(final K key, final V obj) {
final WriteLock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
keyedPool.invalidateObject(key, obj);
} catch (final Exception e) {
// swallowed as of Pool 2
} finally {
writeLock.unlock();
}
}
/**
* {@inheritDoc}
*/
@Override
public void returnObject(final T obj) {
final WriteLock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
pool.returnObject(obj);
} catch (final Exception e) {
// swallowed as of Pool 2
} finally {
writeLock.unlock();
}
}
/**
*
*
* @author tanyaowu
*/
public void clear() {
WriteLock writeLock = this.writeLock();
writeLock.lock();
try {
Set<T> set = this.getObj();
set.clear();
} catch (Throwable e) {
log.error(e.getMessage(), e);
} finally {
writeLock.unlock();
}
}
IAutoCloseable lockForWriting()
{
final WriteLock writeLock = readwriteLock.writeLock();
logger.debug("Acquiring write lock for {}: {}", this, writeLock);
writeLock.lock();
logger.debug("Acquired write lock for {}: {}", this, writeLock);
return () -> {
writeLock.unlock();
logger.debug("Released write lock for {}: {}", this, writeLock);
};
}
@Override
public void runTask() {
channelContext.getReconnCount().incrementAndGet();
ReentrantReadWriteLock closeLock = channelContext.closeLock;
WriteLock writeLock = closeLock.writeLock();
writeLock.lock();
try {
if (!channelContext.isClosed) //已经连上了,不需要再重连了
{
return;
}
long start = SystemTimer.currTime;
tioClient.reconnect(channelContext, 2);
long end = SystemTimer.currTime;
long iv = end - start;
// if (iv >= 100) {
// log.error("{}, 第{}次重连,重连耗时:{} ms", channelContext, channelContext.getReconnCount(), iv);
// } else {
// log.info("{}, 第{}次重连,重连耗时:{} ms", channelContext, channelContext.getReconnCount(), iv);
// }
log.error("{}, 第{}次重连,重连耗时:{} ms", channelContext, channelContext.getReconnCount(), iv);
// if (channelContext.isClosed) {
// // cacheMap.put(channelContext.getServerNode(), SystemTimer.currTime);
// return;
// }
} catch (java.lang.Throwable e) {
log.error(e.toString(), e);
} finally {
writeLock.unlock();
}
}
/**
*
*
* @author tanyaowu
*/
public void clear() {
WriteLock writeLock = this.writeLock();
writeLock.lock();
try {
List<T> list = this.getObj();
list.clear();
} catch (Throwable e) {
log.error(e.getMessage(), e);
} finally {
writeLock.unlock();
}
}
/**
* 操作obj时,带上写锁
* @param writeLockHandler
*/
public void handle(WriteLockHandler<T> writeLockHandler) {
WriteLock writeLock = lock.writeLock();
writeLock.lock();
try {
writeLockHandler.handler(obj);
} catch (Throwable e) {
log.error(e.getMessage(), e);
} finally {
writeLock.unlock();
}
}
/**
* Discovers all resources, puts them in the {@link #resourceManager},
* and triggers any listeners listening for new inventory.
*
* This will look for all root resources (resources whose types are that of the root resource types
* as defined by {@link ResourceTypeManager#getRootResourceTypes()} and then obtain all their
* children (recursively down to all descendents). Effectively, this discovers the full
* resource hierarchy.
*
* This method does not block - it runs the discovery in another thread.
*/
public void discoverAll() {
status.assertRunning(getClass(), "discoverAll()");
Runnable runnable = new Runnable() {
@Override
public void run() {
WriteLock lock = EndpointService.this.discoveryScanRWLock.writeLock();
lock.lock();
try {
DiscoveryResults discoveryResults = new DiscoveryResults();
LOG.infoDiscoveryRequested(getMonitoredEndpoint());
long duration = -1;
try (S session = openSession()) {
Set<ResourceType<L>> rootTypes = getResourceTypeManager().getRootResourceTypes();
Context timer = getDiagnostics().getFullDiscoveryScanTimer().time();
for (ResourceType<L> rootType : rootTypes) {
discoverChildren(null, rootType, session, discoveryResults);
}
long nanos = timer.stop();
duration = TimeUnit.MILLISECONDS.convert(nanos, TimeUnit.NANOSECONDS);
} catch (Exception e) {
LOG.errorCouldNotAccess(EndpointService.this, e);
discoveryResults.error(e);
}
getResourceManager().logTreeGraph("Discovered all resources for: " + getMonitoredEndpoint(),
duration);
discoveryResults.discoveryFinished();
} finally {
lock.unlock();
}
}
};
try {
this.fullDiscoveryScanThreadPool.execute(runnable);
} catch (RejectedExecutionException ree) {
LOG.debugf("Redundant full discovery scan will be ignored for endpoint [%s]", getMonitoredEndpoint());
}
}
private int fillReadBuffer(boolean block, ByteBuffer to) throws IOException {
if (closed) {
throw new IOException(sm.getString("socket.apr.closed", getSocket()));
}
Lock readLock = getBlockingStatusReadLock();
WriteLock writeLock = getBlockingStatusWriteLock();
boolean readDone = false;
int result = 0;
readLock.lock();
try {
if (getBlockingStatus() == block) {
if (block) {
Socket.timeoutSet(getSocket().longValue(), getReadTimeout() * 1000);
}
result = Socket.recvb(getSocket().longValue(), to, to.position(),
to.remaining());
readDone = true;
}
} finally {
readLock.unlock();
}
if (!readDone) {
writeLock.lock();
try {
// Set the current settings for this socket
setBlockingStatus(block);
if (block) {
Socket.timeoutSet(getSocket().longValue(), getReadTimeout() * 1000);
} else {
Socket.timeoutSet(getSocket().longValue(), 0);
}
// Downgrade the lock
readLock.lock();
try {
writeLock.unlock();
result = Socket.recvb(getSocket().longValue(), to, to.position(),
to.remaining());
} finally {
readLock.unlock();
}
} finally {
// Should have been released above but may not have been on some
// exception paths
if (writeLock.isHeldByCurrentThread()) {
writeLock.unlock();
}
}
}
if (result > 0) {
to.position(to.position() + result);
return result;
} else if (result == 0 || -result == Status.EAGAIN) {
return 0;
} else if ((-result) == Status.ETIMEDOUT || (-result) == Status.TIMEUP) {
if (block) {
throw new SocketTimeoutException(sm.getString("iib.readtimeout"));
} else {
// Attempting to read from the socket when the poller
// has not signalled that there is data to read appears
// to behave like a blocking read with a short timeout
// on OSX rather than like a non-blocking read. If no
// data is read, treat the resulting timeout like a
// non-blocking read that returned no data.
return 0;
}
} else if (-result == Status.APR_EOF) {
return -1;
} else if ((OS.IS_WIN32 || OS.IS_WIN64) &&
(-result == Status.APR_OS_START_SYSERR + 10053)) {
// 10053 on Windows is connection aborted
throw new EOFException(sm.getString("socket.apr.clientAbort"));
} else {
throw new IOException(sm.getString("socket.apr.read.error",
Integer.valueOf(-result), getSocket(), this));
}
}
@Override
protected int doWrite(boolean block, byte[] b, int off, int len)
throws IOException {
if (closed) {
throw new IOException(sm.getString("apr.closed", Long.valueOf(socket)));
}
Lock readLock = wrapper.getBlockingStatusReadLock();
WriteLock writeLock = wrapper.getBlockingStatusWriteLock();
try {
readLock.lock();
if (wrapper.getBlockingStatus() == block) {
return doWriteInternal(b, off, len);
}
} finally {
readLock.unlock();
}
try {
writeLock.lock();
// Set the current settings for this socket
wrapper.setBlockingStatus(block);
if (block) {
Socket.timeoutSet(socket, endpoint.getSoTimeout() * 1000);
} else {
Socket.timeoutSet(socket, 0);
}
// Downgrade the lock
try {
readLock.lock();
writeLock.unlock();
return doWriteInternal(b, off, len);
} finally {
readLock.unlock();
}
} finally {
// Should have been released above but may not have been on some
// exception paths
if (writeLock.isHeldByCurrentThread()) {
writeLock.unlock();
}
}
}
/**
* @see java.lang.Runnable#run()
*
* @重写人: tanyaowu
* @重写时间: 2017年2月2日 下午8:24:40
*
*/
@Override
public void run()
{
ReentrantReadWriteLock closeLock = channelContext.getCloseLock();
WriteLock writeLock = closeLock.writeLock();
try
{
writeLock.lock();
if (!channelContext.isClosed()) //已经连上了,不需要再重连了
{
return;
}
long start = SystemTimer.currentTimeMillis();
aioClient.reconnect(channelContext, 2);
long end = SystemTimer.currentTimeMillis();
long iv = end - start;
if (iv >= 100)
{
log.error("{},重连耗时:{} ms", channelContext, iv);
} else
{
log.info("{},重连耗时:{} ms", channelContext, iv);
}
if (channelContext.isClosed())
{
channelContext.setReconnCount(channelContext.getReconnCount() + 1);
// map.put(channelContext.getServerNode(), SystemTimer.currentTimeMillis());
return;
}
} catch (java.lang.Throwable e)
{
log.error(e.toString(), e);
} finally
{
writeLock.unlock();
}
}