下面列出了java.nio.channels.ClosedSelectorException#org.apache.mina.util.ExceptionMonitor 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Calls {@link IoServiceListener#serviceActivated(IoService)}
* for all registered listeners.
*/
public void fireServiceActivated() {
if (!activated.compareAndSet(false, true)) {
// The instance is already active
return;
}
activationTime = System.currentTimeMillis();
// Activate all the listeners now
for (IoServiceListener listener : listeners) {
try {
listener.serviceActivated(service);
} catch (Throwable e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
}
}
}
/**
* Calls {@link IoServiceListener#serviceDeactivated(IoService)}
* for all registered listeners.
*/
public void fireServiceDeactivated() {
if (!activated.compareAndSet(true, false)) {
// The instance is already desactivated
return;
}
// Desactivate all the listeners
try {
for (IoServiceListener listener : listeners) {
try {
listener.serviceDeactivated(service);
} catch (Throwable e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
}
}
} finally {
disconnectSessions();
}
}
private int registerNew() {
int nHandles = 0;
for (;;) {
ConnectionRequest req = connectQueue.poll();
if (req == null) {
break;
}
H handle = req.handle;
try {
register(handle, req);
nHandles++;
} catch (Exception e) {
req.setException(e);
try {
close(handle);
} catch (Exception e2) {
ExceptionMonitor.getInstance().exceptionCaught(e2);
}
}
}
return nHandles;
}
private void createSessions() {
NioSession session;
while ((session = creatingSessions.poll()) != null) {
try {
session.setSelectionKey(session.getChannel().configureBlocking(false).register(selector, SelectionKey.OP_READ, session));
AbstractIoService service = session.getService();
service.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
service.fireSessionCreated(session);
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
try {
session.destroy();
} catch (Exception e1) {
ExceptionMonitor.getInstance().exceptionCaught(e1);
} finally {
session.setScheduledForRemove();
}
}
}
}
@Override
public void accept(SelectionKey key) {
try {
SocketChannel newChannel = ((ServerSocketChannel)key.channel()).accept();
if (newChannel != null)
processor.add(new NioSession(NioSocketAcceptor.this, newChannel, null));
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
try {
// Sleep 50 ms, so that the select does not spin like crazy doing nothing but eating CPU
// This is typically what will happen if we don't have any more File handle on the server
// Check the ulimit parameter
// NOTE : this is a workaround, there is no way we can handle this exception in any smarter way...
Thread.sleep(50);
} catch (InterruptedException e1) {
}
}
}
@Override
public final void dispose(boolean awaitTermination) {
synchronized (sessionConfig) {
if (disposing)
return;
disposing = true;
try {
dispose0();
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
}
executor.shutdownNow();
if (awaitTermination) {
try {
executor.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e1) {
ExceptionMonitor.getInstance().warn("awaitTermination on [" + this + "] was interrupted");
// Restore the interrupted status
Thread.currentThread().interrupt();
}
}
disposed = true;
}
}
private boolean addNow(MulticastSocketSession session) {
boolean registered = false;
boolean notified = false;
try {
registered = true;
// Build the filter chain of this session.
session.getService().getFilterChainBuilder().buildFilterChain(
session.getFilterChain());
// DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
// in AbstractIoFilterChain.fireSessionOpened().
((AbstractIoService) session.getService()).getListeners().fireSessionCreated(session);
notified = true;
bufSize = session.getConfig().getReadBufferSize();
} catch (Exception e) {
if (notified)
{
// Clear the DefaultIoFilterChain.CONNECT_FUTURE attribute
// and call ConnectFuture.setException().
remove(session);
IoFilterChain filterChain = session.getFilterChain();
filterChain.fireExceptionCaught(e);
wakeup();
} else {
ExceptionMonitor.getInstance().exceptionCaught(e);
try {
destroy(session);
} catch (Exception e1) {
ExceptionMonitor.getInstance().exceptionCaught(e1);
} finally {
registered = false;
}
}
}
return registered;
}
/**
* Constructor for {@link AbstractIoService}. You need to provide a default
* session configuration and an {@link Executor} for handling I/O events. If
* a null {@link Executor} is provided, a default one will be created using
* {@link Executors#newCachedThreadPool()}.
*
* @param sessionConfig
* the default configuration for the managed {@link IoSession}
* @param executor
* the {@link Executor} used for handling execution of I/O
* events. Can be <code>null</code>.
*/
protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) {
if (sessionConfig == null) {
throw new IllegalArgumentException("sessionConfig");
}
if (getTransportMetadata() == null) {
throw new IllegalArgumentException("TransportMetadata");
}
if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(sessionConfig.getClass())) {
throw new IllegalArgumentException("sessionConfig type: " + sessionConfig.getClass() + " (expected: "
+ getTransportMetadata().getSessionConfigType() + ")");
}
// Create the listeners, and add a first listener : a activation listener
// for this service, which will give information on the service state.
listeners = new IoServiceListenerSupport(this);
listeners.add(serviceActivationListener);
// Stores the given session configuration
this.sessionConfig = sessionConfig;
// Make JVM load the exception monitor before some transports
// change the thread context class loader.
ExceptionMonitor.getInstance();
if (executor == null) {
this.executor = Executors.newCachedThreadPool();
createdExecutor = true;
} else {
this.executor = executor;
createdExecutor = false;
}
threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
}
/**
* Calls {@link IoServiceListener#sessionDestroyed(IoSession)} for all registered listeners.
*
* @param session The session which has been destroyed
*/
public void fireSessionDestroyed(IoSession session) {
// Try to remove the remaining empty session set after removal.
if (managedSessions.remove(session.getId()) == null) {
return;
}
// Fire session events.
session.getFilterChain().fireSessionClosed();
// Fire listener events.
try {
for (IoServiceListener l : listeners) {
try {
l.sessionDestroyed(session);
} catch (Throwable e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
}
}
} finally {
// Fire a virtual service deactivation event for the last session of the connector.
if (session.getService() instanceof IoConnector) {
boolean lastSession = false;
synchronized (managedSessions) {
lastSession = managedSessions.isEmpty();
}
if (lastSession) {
fireServiceDeactivated();
}
}
}
}
@SuppressWarnings("unchecked")
private void notifyListener(IoFutureListener l) {
try {
l.operationComplete(this);
} catch (Throwable t) {
ExceptionMonitor.getInstance().exceptionCaught(t);
}
}
private IoSession newSessionWithoutLock( SocketAddress remoteAddress, SocketAddress localAddress ) throws Exception
{
H handle = boundHandles.get( localAddress );
if ( handle == null )
{
throw new IllegalArgumentException( "Unknown local address: " + localAddress );
}
IoSession session;
synchronized ( sessionRecycler )
{
session = sessionRecycler.recycle( remoteAddress );
if ( session != null )
{
return session;
}
// If a new session needs to be created.
S newSession = newSession( this, handle, remoteAddress );
getSessionRecycler().put( newSession );
session = newSession;
}
initSession( session, null, null );
try
{
this.getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
getListeners().fireSessionCreated( session );
}
catch ( Throwable t )
{
ExceptionMonitor.getInstance().exceptionCaught( t );
}
return session;
}
@SuppressWarnings("unchecked")
private void processReadySessions( Set<SelectionKey> handles )
{
Iterator<SelectionKey> iterator = handles.iterator();
while ( iterator.hasNext() )
{
SelectionKey key = iterator.next();
H handle = ( H ) key.channel();
iterator.remove();
try
{
if ( ( key != null ) && key.isValid() && key.isReadable() )
{
readHandle( handle );
}
if ( ( key != null ) && key.isValid() && key.isWritable() )
{
for ( IoSession session : getManagedSessions().values() )
{
scheduleFlush( ( S ) session );
}
}
}
catch ( Throwable t )
{
ExceptionMonitor.getInstance().exceptionCaught( t );
}
}
}
/**
* Process a new session :
* - initialize it
* - create its chain
* - fire the CREATED listeners if any
*
* @param session The session to create
* @return true if the session has been registered
*/
private boolean addNow(S session) {
boolean registered = false;
try {
init(session);
registered = true;
// Build the filter chain of this session.
IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
chainBuilder.buildFilterChain(session.getFilterChain());
// DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
// in AbstractIoFilterChain.fireSessionOpened().
// Propagate the SESSION_CREATED event up to the chain
IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
listeners.fireSessionCreated(session);
} catch (Throwable e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
try {
destroy(session);
} catch (Exception e1) {
ExceptionMonitor.getInstance().exceptionCaught(e1);
} finally {
registered = false;
}
}
return registered;
}
private int cancelKeys() {
int nHandles = 0;
for (;;) {
ConnectionRequest req = cancelQueue.poll();
if (req == null) {
break;
}
H handle = req.handle;
try {
close(handle);
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
} finally {
nHandles++;
}
}
if (nHandles > 0) {
wakeup();
}
return nHandles;
}
/**
* This method just checks to see if anything has been placed into the
* cancellation queue. The only thing that should be in the cancelQueue
* is CancellationRequest objects and the only place this happens is in
* the doUnbind() method.
*/
private int unregisterHandles() {
int cancelledHandles = 0;
for (;;) {
AcceptorOperationFuture future = cancelQueue.poll();
if (future == null) {
break;
}
// close the channels
for (SocketAddress a : future.getLocalAddresses()) {
H handle = boundHandles.remove(a);
if (handle == null) {
continue;
}
try {
close(handle);
wakeup(); // wake up again to trigger thread death
} catch (Throwable e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
} finally {
cancelledHandles++;
}
}
future.setDone();
}
return cancelledHandles;
}
@Override
public void run() {
for (;;) {
try {
if (!register() || selector.keys().isEmpty()) {
connectorRef.set(null);
if (registerQueue.isEmpty() || !connectorRef.compareAndSet(null, this))
break;
}
// the timeout for select shall be smaller of the connect timeout or 1 second...
selector.select(this, Math.min(getConnectTimeoutMillis(), 1000));
processTimedOutSessions();
} catch (ClosedSelectorException cse) {
ExceptionMonitor.getInstance().exceptionCaught(cse);
break;
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
connectorRef.compareAndSet(this, null);
ExceptionMonitor.getInstance().exceptionCaught(e1);
break;
}
}
}
}
@Override
public WriteFuture write(Object message) {
if (message == null)
throw new IllegalArgumentException("trying to write a null message: not allowed");
// If the session has been closed or is closing, we can't either send a message to the remote side.
// We generate a future containing an exception.
if (isClosing() || !isConnected())
return DefaultWriteFuture.newNotWrittenFuture(this, new WriteToClosedSessionException(null));
try {
if ((message instanceof IoBuffer) && !((IoBuffer)message).hasRemaining()) {
// Nothing to write: probably an error in the user code
throw new IllegalArgumentException("message is empty, forgot to call flip()?");
} else if (message instanceof FileChannel) {
FileChannel fileChannel = (FileChannel)message;
message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
}
} catch (IOException e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
return DefaultWriteFuture.newNotWrittenFuture(this, e);
}
// Now, we can write the message.
WriteFuture writeFuture = new DefaultWriteFuture(this);
WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture);
filterChain.fireFilterWrite(writeRequest);
return writeFuture;
}
protected static void close(SelectionKey key) {
if (key == null)
return;
try {
key.cancel();
key.channel().close();
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
}
}
protected void close(AbstractSelectableChannel channel) {
if (channel == null)
return;
try {
SelectionKey key = channel.keyFor(selector);
if (key != null)
key.cancel();
channel.close();
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
}
}
@SuppressWarnings("unchecked")
private void notifyListener(@SuppressWarnings("rawtypes") IoFutureListener listener) {
try {
listener.operationComplete(this);
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
}
}
@Override
public void fireExceptionCaught(Throwable cause) {
ConnectFuture future = (ConnectFuture)session.removeAttribute(SESSION_CREATED_FUTURE);
if (future == null) {
try {
callNextExceptionCaught(head, cause);
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
}
} else {
// Please note that this place is not the only place that calls ConnectFuture.setException().
session.closeNow();
future.setException(cause);
}
}
/**
* Calls {@link IoServiceListener#sessionCreated(IoSession)} for all registered listeners.
*
* @param session The session which has been created
*/
public void fireSessionCreated(IoSession session) {
boolean firstSession = false;
if (session.getService() instanceof IoConnector) {
synchronized (managedSessions) {
firstSession = managedSessions.isEmpty();
}
}
// If already registered, ignore.
if (managedSessions.putIfAbsent(session.getId(), session) != null) {
return;
}
// If the first connector session, fire a virtual service activation event.
if (firstSession) {
fireServiceActivated();
}
// Fire session events.
IoFilterChain filterChain = session.getFilterChain();
filterChain.fireSessionCreated();
filterChain.fireSessionOpened();
int managedSessionCount = managedSessions.size();
if (managedSessionCount > largestManagedSessionCount) {
largestManagedSessionCount = managedSessionCount;
}
cumulativeManagedSessionCount++;
// Fire listener events.
for (IoServiceListener l : listeners) {
try {
l.sessionCreated(session);
} catch (Throwable e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
}
}
}
private int unregisterHandles()
{
int nHandles = 0;
for ( ;; )
{
AcceptorOperationFuture request = cancelQueue.poll();
if ( request == null )
{
break;
}
// close the channels
for ( SocketAddress socketAddress : request.getLocalAddresses() )
{
H handle = boundHandles.remove( socketAddress );
if ( handle == null )
{
continue;
}
try
{
close( handle );
wakeup(); // wake up again to trigger thread death
}
catch ( Throwable e )
{
ExceptionMonitor.getInstance().exceptionCaught( e );
}
finally
{
nHandles++;
}
}
request.setDone();
}
return nHandles;
}
/**
* Perform any handshaking processing.
*/
void handshake(NextFilter nextFilter) throws Exception {
for (;;) {
switch (handshakeStatus) {
case FINISHED:
// LOGGER.debug("{} processing the FINISHED state", SslFilter.getSessionInfo(session));
handshakeComplete = true;
// Send the SECURE message only if it's the first SSL handshake
if (firstSSLNegociation) {
firstSSLNegociation = false;
if (session.containsAttribute(SslFilter.USE_NOTIFICATION))
scheduleMessageReceived(nextFilter, SslFilter.SESSION_SECURED);
}
// if (!isOutboundDone()) {
// LOGGER.debug("{} is now secured", SslFilter.getSessionInfo(session));
// } else {
// LOGGER.debug("{} is not secured yet", SslFilter.getSessionInfo(session));
// }
return;
case NEED_TASK:
// LOGGER.debug("{} processing the NEED_TASK state", SslFilter.getSessionInfo(session));
handshakeStatus = doTasks();
break;
case NEED_UNWRAP:
// LOGGER.debug("{} processing the NEED_UNWRAP state", SslFilter.getSessionInfo(session));
// we need more data read
if (unwrapHandshake(nextFilter) == Status.BUFFER_UNDERFLOW && handshakeStatus != HandshakeStatus.FINISHED || isInboundDone())
return; // We need more data or the session is closed
break;
case NEED_WRAP:
case NOT_HANDSHAKING:
// LOGGER.debug("{} processing the NEED_WRAP state", SslFilter.getSessionInfo(session));
// First make sure that the out buffer is completely empty.
// Since we cannot call wrap with data left on the buffer
if (outNetBuffer != null && outNetBuffer.hasRemaining())
return;
createOutNetBuffer(0);
for (;;) { //NOSONAR
SSLEngineResult result = sslEngine.wrap(SimpleBufferAllocator.emptyBuffer.buf(), outNetBuffer.buf());
if (result.getStatus() != Status.BUFFER_OVERFLOW) {
handshakeStatus = result.getHandshakeStatus();
break;
}
outNetBuffer = IoBuffer.reallocate(outNetBuffer, outNetBuffer.capacity() << 1);
outNetBuffer.limit(outNetBuffer.capacity());
}
outNetBuffer.flip();
writeNetBuffer(nextFilter, false);
break;
default:
String msg = "invalid handshaking state" + handshakeStatus + " while processing the handshake for session " + session.getId();
ExceptionMonitor.getInstance().error(msg);
throw new IllegalStateException(msg);
}
}
}