下面列出了java.nio.channels.ClosedSelectorException#org.apache.catalina.tribes.io.ObjectReader 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void cancelKey(final SelectionKey key) {
if ( log.isTraceEnabled() )
log.trace("Adding key for cancel event:"+key);
ObjectReader reader = (ObjectReader)key.attachment();
if ( reader != null ) {
reader.setCancelled(true);
reader.finish();
}
Runnable cx = new Runnable() {
@Override
public void run() {
if ( log.isTraceEnabled() )
log.trace("Cancelling key:"+key);
NioReceiver.cancelledKey(key);
}
};
receiver.addEvent(cx);
}
private void cancelKey(final SelectionKey key) {
if ( log.isTraceEnabled() )
log.trace("Adding key for cancel event:"+key);
ObjectReader reader = (ObjectReader)key.attachment();
if ( reader != null ) {
reader.setCancelled(true);
reader.finish();
}
Runnable cx = new Runnable() {
@Override
public void run() {
if ( log.isTraceEnabled() )
log.trace("Cancelling key:"+key);
NioReceiver.cancelledKey(key);
}
};
receiver.addEvent(cx);
}
private void cancelKey(final SelectionKey key) {
if ( log.isTraceEnabled() )
log.trace("Adding key for cancel event:"+key);
ObjectReader reader = (ObjectReader)key.attachment();
if ( reader != null ) {
reader.setCancelled(true);
reader.finish();
}
Runnable cx = new Runnable() {
@Override
public void run() {
if ( log.isTraceEnabled() )
log.trace("Cancelling key:"+key);
NioReceiver.cancelledKey(key);
}
};
receiver.addEvent(cx);
}
protected void registerForRead(final SelectionKey key, ObjectReader reader) {
if ( log.isTraceEnabled() )
log.trace("Adding key for read event:"+key);
reader.finish();
//register our OP_READ interest
Runnable r = new Runnable() {
@Override
public void run() {
try {
if (key.isValid()) {
// cycle the selector so this key is active again
key.selector().wakeup();
// resume interest in OP_READ, OP_WRITE
int resumeOps = key.interestOps() | SelectionKey.OP_READ;
key.interestOps(resumeOps);
if ( log.isTraceEnabled() )
log.trace("Registering key for read:"+key);
}
} catch (CancelledKeyException ckx ) {
NioReceiver.cancelledKey(key);
if ( log.isTraceEnabled() )
log.trace("CKX Cancelling key:"+key);
} catch (Exception x) {
log.error(sm.getString("nioReplicationTask.error.register.key", key),x);
}
}
};
receiver.addEvent(r);
}
public void listen() throws Exception {
if (doListen()) {
log.warn(sm.getString("bioReceiver.already.started"));
return;
}
setListen(true);
while ( doListen() ) {
Socket socket = null;
if ( getTaskPool().available() < 1 ) {
if ( log.isWarnEnabled() )
log.warn(sm.getString("bioReceiver.threads.busy"));
}
BioReplicationTask task = (BioReplicationTask)getTaskPool().getRxTask();
if ( task == null ) continue; //should never happen
try {
socket = serverSocket.accept();
}catch ( Exception x ) {
if ( doListen() ) throw x;
}
if ( !doListen() ) {
task.setDoRun(false);
task.serviceSocket(null,null);
getExecutor().execute(task);
break; //regular shutdown
}
if ( socket == null ) continue;
socket.setReceiveBufferSize(getRxBufSize());
socket.setSendBufferSize(getTxBufSize());
socket.setTcpNoDelay(getTcpNoDelay());
socket.setKeepAlive(getSoKeepAlive());
socket.setOOBInline(getOoBInline());
socket.setReuseAddress(getSoReuseAddress());
socket.setSoLinger(getSoLingerOn(),getSoLingerTime());
socket.setSoTimeout(getTimeout());
ObjectReader reader = new ObjectReader(socket);
task.serviceSocket(socket,reader);
getExecutor().execute(task);
}//while
}
protected void execute(ObjectReader reader) throws Exception{
int pkgcnt = reader.count();
if ( pkgcnt > 0 ) {
ChannelMessage[] msgs = reader.execute();
for ( int i=0; i<msgs.length; i++ ) {
/**
* Use send ack here if you want to ack the request to the remote
* server before completing the request
* This is considered an asynchronous request
*/
if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND);
try {
//process the message
getCallback().messageDataReceived(msgs[i]);
/**
* Use send ack here if you want the request to complete on this
* server before sending the ack to the remote server
* This is considered a synchronized request
*/
if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND);
}catch ( Exception x ) {
if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.FAIL_ACK_COMMAND);
log.error(sm.getString("bioReplicationTask.messageDataReceived.error"),x);
}
if ( getUseBufferPool() ) {
BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
msgs[i].setMessage(null);
}
}
}
}
protected void registerForRead(final SelectionKey key, ObjectReader reader) {
if ( log.isTraceEnabled() )
log.trace("Adding key for read event:"+key);
reader.finish();
//register our OP_READ interest
Runnable r = new Runnable() {
@Override
public void run() {
try {
if (key.isValid()) {
// cycle the selector so this key is active again
key.selector().wakeup();
// resume interest in OP_READ, OP_WRITE
int resumeOps = key.interestOps() | SelectionKey.OP_READ;
key.interestOps(resumeOps);
if ( log.isTraceEnabled() )
log.trace("Registering key for read:"+key);
}
} catch (CancelledKeyException ckx ) {
NioReceiver.cancelledKey(key);
if ( log.isTraceEnabled() )
log.trace("CKX Cancelling key:"+key);
} catch (Exception x) {
log.error("Error registering key for read:"+key,x);
}
}
};
receiver.addEvent(r);
}
protected void socketTimeouts() {
long now = System.currentTimeMillis();
if ( (now-lastCheck) < getSelectorTimeout() ) return;
//timeout
Selector tmpsel = this.selector.get();
Set<SelectionKey> keys = (isListening()&&tmpsel!=null)?tmpsel.keys():null;
if ( keys == null ) return;
for (Iterator<SelectionKey> iter = keys.iterator(); iter.hasNext();) {
SelectionKey key = iter.next();
try {
// if (key.interestOps() == SelectionKey.OP_READ) {
// //only timeout sockets that we are waiting for a read from
// ObjectReader ka = (ObjectReader) key.attachment();
// long delta = now - ka.getLastAccess();
// if (delta > (long) getTimeout()) {
// cancelledKey(key);
// }
// }
// else
if ( key.interestOps() == 0 ) {
//check for keys that didn't make it in.
ObjectReader ka = (ObjectReader) key.attachment();
if ( ka != null ) {
long delta = now - ka.getLastAccess();
if (delta > getTimeout() && (!ka.isAccessed())) {
if (log.isWarnEnabled())
log.warn("Channel key is registered, but has had no interest ops for the last "+getTimeout()+" ms. (cancelled:"+ka.isCancelled()+"):"+key+" last access:"+new java.sql.Timestamp(ka.getLastAccess())+" Possible cause: all threads used, perform thread dump");
ka.setLastAccess(now);
//key.interestOps(SelectionKey.OP_READ);
}//end if
} else {
cancelledKey(key);
}//end if
}//end if
}catch ( CancelledKeyException ckx ) {
cancelledKey(key);
}
}
lastCheck = System.currentTimeMillis();
}
public void listen() throws Exception {
if (doListen()) {
log.warn("ServerSocket already started");
return;
}
setListen(true);
while ( doListen() ) {
Socket socket = null;
if ( getTaskPool().available() < 1 ) {
if ( log.isWarnEnabled() )
log.warn("All BIO server replication threads are busy, unable to handle more requests until a thread is freed up.");
}
BioReplicationTask task = (BioReplicationTask)getTaskPool().getRxTask();
if ( task == null ) continue; //should never happen
try {
socket = serverSocket.accept();
}catch ( Exception x ) {
if ( doListen() ) throw x;
}
if ( !doListen() ) {
task.setDoRun(false);
task.serviceSocket(null,null);
getExecutor().execute(task);
break; //regular shutdown
}
if ( socket == null ) continue;
socket.setReceiveBufferSize(getRxBufSize());
socket.setSendBufferSize(getTxBufSize());
socket.setTcpNoDelay(getTcpNoDelay());
socket.setKeepAlive(getSoKeepAlive());
socket.setOOBInline(getOoBInline());
socket.setReuseAddress(getSoReuseAddress());
socket.setSoLinger(getSoLingerOn(),getSoLingerTime());
socket.setSoTimeout(getTimeout());
ObjectReader reader = new ObjectReader(socket);
task.serviceSocket(socket,reader);
getExecutor().execute(task);
}//while
}
protected void execute(ObjectReader reader) throws Exception{
int pkgcnt = reader.count();
if ( pkgcnt > 0 ) {
ChannelMessage[] msgs = reader.execute();
for ( int i=0; i<msgs.length; i++ ) {
/**
* Use send ack here if you want to ack the request to the remote
* server before completing the request
* This is considered an asynchronized request
*/
if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND);
try {
//process the message
getCallback().messageDataReceived(msgs[i]);
/**
* Use send ack here if you want the request to complete on this
* server before sending the ack to the remote server
* This is considered a synchronized request
*/
if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND);
}catch ( Exception x ) {
if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.FAIL_ACK_COMMAND);
log.error("Error thrown from messageDataReceived.",x);
}
if ( getUseBufferPool() ) {
BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
msgs[i].setMessage(null);
}
}
}
}
protected void registerForRead(final SelectionKey key, ObjectReader reader) {
if ( log.isTraceEnabled() )
log.trace("Adding key for read event:"+key);
reader.finish();
//register our OP_READ interest
Runnable r = new Runnable() {
@Override
public void run() {
try {
if (key.isValid()) {
// cycle the selector so this key is active again
key.selector().wakeup();
// resume interest in OP_READ, OP_WRITE
int resumeOps = key.interestOps() | SelectionKey.OP_READ;
key.interestOps(resumeOps);
if ( log.isTraceEnabled() )
log.trace("Registering key for read:"+key);
}
} catch (CancelledKeyException ckx ) {
NioReceiver.cancelledKey(key);
if ( log.isTraceEnabled() )
log.trace("CKX Cancelling key:"+key);
} catch (Exception x) {
log.error("Error registering key for read:"+key,x);
}
}
};
receiver.addEvent(r);
}
protected void socketTimeouts() {
long now = System.currentTimeMillis();
if ( (now-lastCheck) < getSelectorTimeout() ) return;
//timeout
Selector tmpsel = this.selector.get();
Set<SelectionKey> keys = (isListening()&&tmpsel!=null)?tmpsel.keys():null;
if ( keys == null ) return;
for (Iterator<SelectionKey> iter = keys.iterator(); iter.hasNext();) {
SelectionKey key = iter.next();
try {
// if (key.interestOps() == SelectionKey.OP_READ) {
// //only timeout sockets that we are waiting for a read from
// ObjectReader ka = (ObjectReader) key.attachment();
// long delta = now - ka.getLastAccess();
// if (delta > (long) getTimeout()) {
// cancelledKey(key);
// }
// }
// else
if ( key.interestOps() == 0 ) {
//check for keys that didn't make it in.
ObjectReader ka = (ObjectReader) key.attachment();
if ( ka != null ) {
long delta = now - ka.getLastAccess();
if (delta > getTimeout() && (!ka.isAccessed())) {
if (log.isWarnEnabled())
log.warn("Channel key is registered, but has had no interest ops for the last "+getTimeout()+" ms. (cancelled:"+ka.isCancelled()+"):"+key+" last access:"+new java.sql.Timestamp(ka.getLastAccess())+" Possible cause: all threads used, perform thread dump");
ka.setLastAccess(now);
//key.interestOps(SelectionKey.OP_READ);
}//end if
} else {
cancelledKey(key);
}//end if
}//end if
}catch ( CancelledKeyException ckx ) {
cancelledKey(key);
}
}
lastCheck = System.currentTimeMillis();
}
public void listen() throws Exception {
if (doListen()) {
log.warn("ServerSocket already started");
return;
}
setListen(true);
while ( doListen() ) {
Socket socket = null;
if ( getTaskPool().available() < 1 ) {
if ( log.isWarnEnabled() )
log.warn("All BIO server replication threads are busy, unable to handle more requests until a thread is freed up.");
}
BioReplicationTask task = (BioReplicationTask)getTaskPool().getRxTask();
if ( task == null ) continue; //should never happen
try {
socket = serverSocket.accept();
}catch ( Exception x ) {
if ( doListen() ) throw x;
}
if ( !doListen() ) {
task.setDoRun(false);
task.serviceSocket(null,null);
getExecutor().execute(task);
break; //regular shutdown
}
if ( socket == null ) continue;
socket.setReceiveBufferSize(getRxBufSize());
socket.setSendBufferSize(getTxBufSize());
socket.setTcpNoDelay(getTcpNoDelay());
socket.setKeepAlive(getSoKeepAlive());
socket.setOOBInline(getOoBInline());
socket.setReuseAddress(getSoReuseAddress());
socket.setSoLinger(getSoLingerOn(),getSoLingerTime());
socket.setSoTimeout(getTimeout());
ObjectReader reader = new ObjectReader(socket);
task.serviceSocket(socket,reader);
getExecutor().execute(task);
}//while
}
protected void execute(ObjectReader reader) throws Exception{
int pkgcnt = reader.count();
if ( pkgcnt > 0 ) {
ChannelMessage[] msgs = reader.execute();
for ( int i=0; i<msgs.length; i++ ) {
/**
* Use send ack here if you want to ack the request to the remote
* server before completing the request
* This is considered an asynchronized request
*/
if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND);
try {
//process the message
getCallback().messageDataReceived(msgs[i]);
/**
* Use send ack here if you want the request to complete on this
* server before sending the ack to the remote server
* This is considered a synchronized request
*/
if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND);
}catch ( Exception x ) {
if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.FAIL_ACK_COMMAND);
log.error("Error thrown from messageDataReceived.",x);
}
if ( getUseBufferPool() ) {
BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
msgs[i].setMessage(null);
}
}
}
}
@Override
public synchronized void run() {
if ( buffer == null ) {
int size = getRxBufSize();
if (key.channel() instanceof DatagramChannel) {
size = ChannelReceiver.MAX_UDP_SIZE;
}
if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER) {
buffer = ByteBuffer.allocateDirect(size);
} else {
buffer = ByteBuffer.allocate(size);
}
} else {
buffer.clear();
}
if (key == null) {
return; // just in case
}
if ( log.isTraceEnabled() )
log.trace("Servicing key:"+key);
try {
ObjectReader reader = (ObjectReader)key.attachment();
if ( reader == null ) {
if ( log.isTraceEnabled() )
log.trace("No object reader, cancelling:"+key);
cancelKey(key);
} else {
if ( log.isTraceEnabled() )
log.trace("Draining channel:"+key);
drainChannel(key, reader);
}
} catch (Exception e) {
//this is common, since the sockets on the other
//end expire after a certain time.
if ( e instanceof CancelledKeyException ) {
//do nothing
} else if ( e instanceof IOException ) {
//dont spew out stack traces for IO exceptions unless debug is enabled.
if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].", e);
else log.warn (sm.getString("nioReplicationTask.unable.drainChannel.ioe", e.getMessage()));
} else if ( log.isErrorEnabled() ) {
//this is a real error, log it.
log.error(sm.getString("nioReplicationTask.exception.drainChannel"),e);
}
cancelKey(key);
}
key = null;
// done, ready for more, return to pool
getTaskPool().returnWorker (this);
}
protected void socketTimeouts() {
long now = System.currentTimeMillis();
if ( (now-lastCheck) < getSelectorTimeout() ) return;
//timeout
Selector tmpsel = this.selector.get();
Set<SelectionKey> keys = (isListening()&&tmpsel!=null)?tmpsel.keys():null;
if ( keys == null ) return;
for (Iterator<SelectionKey> iter = keys.iterator(); iter.hasNext();) {
SelectionKey key = iter.next();
try {
// if (key.interestOps() == SelectionKey.OP_READ) {
// //only timeout sockets that we are waiting for a read from
// ObjectReader ka = (ObjectReader) key.attachment();
// long delta = now - ka.getLastAccess();
// if (delta > (long) getTimeout()) {
// cancelledKey(key);
// }
// }
// else
if ( key.interestOps() == 0 ) {
//check for keys that didn't make it in.
ObjectReader ka = (ObjectReader) key.attachment();
if ( ka != null ) {
long delta = now - ka.getLastAccess();
if (delta > getTimeout() && (!ka.isAccessed())) {
if (log.isWarnEnabled())
log.warn(sm.getString(
"nioReceiver.threadsExhausted",
Integer.valueOf(getTimeout()),
Boolean.valueOf(ka.isCancelled()),
key,
new java.sql.Timestamp(ka.getLastAccess())));
ka.setLastAccess(now);
//key.interestOps(SelectionKey.OP_READ);
}//end if
} else {
cancelledKey(key);
}//end if
}//end if
}catch ( CancelledKeyException ckx ) {
cancelledKey(key);
}
}
lastCheck = System.currentTimeMillis();
}
public synchronized void serviceSocket(Socket socket, ObjectReader reader) {
this.socket = socket;
this.reader = reader;
}
@Override
public synchronized void run() {
if ( buffer == null ) {
int size = getRxBufSize();
if (key.channel() instanceof DatagramChannel) {
size = ChannelReceiver.MAX_UDP_SIZE;
}
if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER) {
buffer = ByteBuffer.allocateDirect(size);
} else {
buffer = ByteBuffer.allocate(size);
}
} else {
buffer.clear();
}
if (key == null) {
return; // just in case
}
if ( log.isTraceEnabled() )
log.trace("Servicing key:"+key);
try {
ObjectReader reader = (ObjectReader)key.attachment();
if ( reader == null ) {
if ( log.isTraceEnabled() )
log.trace("No object reader, cancelling:"+key);
cancelKey(key);
} else {
if ( log.isTraceEnabled() )
log.trace("Draining channel:"+key);
drainChannel(key, reader);
}
} catch (Exception e) {
//this is common, since the sockets on the other
//end expire after a certain time.
if ( e instanceof CancelledKeyException ) {
//do nothing
} else if ( e instanceof IOException ) {
//dont spew out stack traces for IO exceptions unless debug is enabled.
if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].", e);
else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].");
} else if ( log.isErrorEnabled() ) {
//this is a real error, log it.
log.error("Exception caught in TcpReplicationThread.drainChannel.",e);
}
cancelKey(key);
} finally {
}
key = null;
// done, ready for more, return to pool
getTaskPool().returnWorker (this);
}
public synchronized void serviceSocket(Socket socket, ObjectReader reader) {
this.socket = socket;
this.reader = reader;
}
@Override
public synchronized void run() {
if ( buffer == null ) {
int size = getRxBufSize();
if (key.channel() instanceof DatagramChannel) {
size = ChannelReceiver.MAX_UDP_SIZE;
}
if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER) {
buffer = ByteBuffer.allocateDirect(size);
} else {
buffer = ByteBuffer.allocate(size);
}
} else {
buffer.clear();
}
if (key == null) {
return; // just in case
}
if ( log.isTraceEnabled() )
log.trace("Servicing key:"+key);
try {
ObjectReader reader = (ObjectReader)key.attachment();
if ( reader == null ) {
if ( log.isTraceEnabled() )
log.trace("No object reader, cancelling:"+key);
cancelKey(key);
} else {
if ( log.isTraceEnabled() )
log.trace("Draining channel:"+key);
drainChannel(key, reader);
}
} catch (Exception e) {
//this is common, since the sockets on the other
//end expire after a certain time.
if ( e instanceof CancelledKeyException ) {
//do nothing
} else if ( e instanceof IOException ) {
//dont spew out stack traces for IO exceptions unless debug is enabled.
if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].", e);
else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].");
} else if ( log.isErrorEnabled() ) {
//this is a real error, log it.
log.error("Exception caught in TcpReplicationThread.drainChannel.",e);
}
cancelKey(key);
} finally {
}
key = null;
// done, ready for more, return to pool
getTaskPool().returnWorker (this);
}
public synchronized void serviceSocket(Socket socket, ObjectReader reader) {
this.socket = socket;
this.reader = reader;
}
/**
* Called to initiate a unit of work by this worker thread
* on the provided SelectionKey object. This method is
* synchronized, as is the run() method, so only one key
* can be serviced at a given time.
* Before waking the worker thread, and before returning
* to the main selection loop, this key's interest set is
* updated to remove OP_READ. This will cause the selector
* to ignore read-readiness for this channel while the
* worker thread is servicing it.
* @param key The key to process
*/
public synchronized void serviceChannel (SelectionKey key) {
if ( log.isTraceEnabled() ) log.trace("About to service key:"+key);
ObjectReader reader = (ObjectReader)key.attachment();
if ( reader != null ) reader.setLastAccess(System.currentTimeMillis());
this.key = key;
key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
}
/**
* Called to initiate a unit of work by this worker thread
* on the provided SelectionKey object. This method is
* synchronized, as is the run() method, so only one key
* can be serviced at a given time.
* Before waking the worker thread, and before returning
* to the main selection loop, this key's interest set is
* updated to remove OP_READ. This will cause the selector
* to ignore read-readiness for this channel while the
* worker thread is servicing it.
*/
public synchronized void serviceChannel (SelectionKey key) {
if ( log.isTraceEnabled() ) log.trace("About to service key:"+key);
ObjectReader reader = (ObjectReader)key.attachment();
if ( reader != null ) reader.setLastAccess(System.currentTimeMillis());
this.key = key;
key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
}
/**
* Called to initiate a unit of work by this worker thread
* on the provided SelectionKey object. This method is
* synchronized, as is the run() method, so only one key
* can be serviced at a given time.
* Before waking the worker thread, and before returning
* to the main selection loop, this key's interest set is
* updated to remove OP_READ. This will cause the selector
* to ignore read-readiness for this channel while the
* worker thread is servicing it.
*/
public synchronized void serviceChannel (SelectionKey key) {
if ( log.isTraceEnabled() ) log.trace("About to service key:"+key);
ObjectReader reader = (ObjectReader)key.attachment();
if ( reader != null ) reader.setLastAccess(System.currentTimeMillis());
this.key = key;
key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
}