下面列出了怎么用java.nio.channels.spi.AbstractSelectableChannel的API类实例代码及写法,或者点击链接到github查看源代码。
public void
selectFailure(
VirtualAbstractSelectorListener listener,
AbstractSelectableChannel sc,
Object attachment,
Throwable msg)
{
if ( op == OP_ACCEPT ){
((VirtualAcceptSelectorListener)listener).selectFailure( VirtualChannelSelector.this, (ServerSocketChannel)sc, attachment, msg );
}else{
((VirtualSelectorListener)listener).selectFailure( VirtualChannelSelector.this, (SocketChannel)sc, attachment, msg );
}
}
public boolean
isPaused(
AbstractSelectableChannel channel )
{
SelectionKey key = channel.keyFor( selector );
if( key != null && key.isValid() ) {
return(( key.interestOps() & INTEREST_OP ) == 0 );
}else{
try{
register_cancel_list_mon.enter();
Boolean b = paused_states.get( channel );
return( b != null && b );
}finally{
register_cancel_list_mon.exit();
}
}
}
public void pauseSelects( AbstractSelectableChannel channel ) {
//System.out.println( "pauseSelects: " + channel + " - " + Debug.getCompressedStackTrace() );
if( channel == null ) {
return;
}
SelectionKey key = channel.keyFor( selector );
if( key != null && key.isValid() ) {
key.interestOps( key.interestOps() & ~INTEREST_OP );
}
else { //channel not (yet?) registered
if( channel.isOpen() ) { //only bother if channel has not already been closed
try{ register_cancel_list_mon.enter();
paused_states.put( channel, Boolean.TRUE); //ensure the op is paused upon reg select-time reg
}
finally{ register_cancel_list_mon.exit(); }
}
}
}
public boolean
isRegistered( AbstractSelectableChannel channel ){
SelectionKey key = channel.keyFor( selector );
if( key != null ){
return( true );
}else{
try{
register_cancel_list_mon.enter();
// ensure that there's only one operation outstanding for a given channel
// at any one time (the latest operation requested )
return( register_cancel_list.containsKey( channel ));
}finally{
register_cancel_list_mon.exit();
}
}
}
public void pauseSelects( AbstractSelectableChannel channel ) {
//System.out.println( "pauseSelects: " + channel + " - " + Debug.getCompressedStackTrace() );
if( channel == null ) {
return;
}
SelectionKey key = channel.keyFor( selector );
if( key != null && key.isValid() ) {
key.interestOps( key.interestOps() & ~INTEREST_OP );
}
else { //channel not (yet?) registered
if( channel.isOpen() ) { //only bother if channel has not already been closed
try{ register_cancel_list_mon.enter();
paused_states.put( channel, new Boolean( true ) ); //ensure the op is paused upon reg select-time reg
}
finally{ register_cancel_list_mon.exit(); }
}
}
}
/**
* Pause selection operations for the given channel
* @param channel to pause
*/
public void pauseSelects( AbstractSelectableChannel channel ) {
if( SAFE_SELECTOR_MODE_ENABLED ) {
try{ selectors_mon.enter();
//System.out.println( "pause - " + channel.hashCode() + " - " + Debug.getCompressedStackTrace());
for( Map.Entry<VirtualChannelSelectorImpl, ArrayList<AbstractSelectableChannel>> entry: selectors.entrySet()) {
VirtualChannelSelectorImpl sel = entry.getKey();
ArrayList<AbstractSelectableChannel> channels = entry.getValue();
if( channels.contains( channel ) ) {
sel.pauseSelects( channel );
return;
}
}
Debug.out( "pauseSelects():: channel not found!" );
}
finally{ selectors_mon.exit(); }
}
else {
selector_impl.pauseSelects( channel );
}
}
/**
* Resume selection operations for the given channel
* @param channel to resume
*/
public void resumeSelects( AbstractSelectableChannel channel ) {
if( SAFE_SELECTOR_MODE_ENABLED ) {
try{ selectors_mon.enter();
//System.out.println( "resume - " + channel.hashCode() + " - " + Debug.getCompressedStackTrace());
for( Map.Entry<VirtualChannelSelectorImpl, ArrayList<AbstractSelectableChannel>> entry: selectors.entrySet()) {
VirtualChannelSelectorImpl sel = entry.getKey();
ArrayList<AbstractSelectableChannel> channels = entry.getValue();
if( channels.contains( channel ) ) {
sel.resumeSelects( channel );
return;
}
}
Debug.out( "resumeSelects():: channel not found!" );
}
finally{ selectors_mon.exit(); }
}
else {
selector_impl.resumeSelects( channel );
}
}
/**
* Cancel the selection operations for the given channel.
* @param channel channel originally registered
*/
public void cancel( AbstractSelectableChannel channel ) {
if( SAFE_SELECTOR_MODE_ENABLED ) {
try{ selectors_mon.enter();
//System.out.println( "cancel - " + channel.hashCode() + " - " + Debug.getCompressedStackTrace());
for( Map.Entry<VirtualChannelSelectorImpl, ArrayList<AbstractSelectableChannel>> entry: selectors.entrySet()) {
VirtualChannelSelectorImpl sel = entry.getKey();
ArrayList<AbstractSelectableChannel> channels = entry.getValue();
if( channels.remove( channel ) ) {
sel.cancel( channel );
return;
}
}
}
finally{ selectors_mon.exit(); }
}
else {
if( selector_impl != null ) selector_impl.cancel( channel );
}
}
public void
selectFailure(
VirtualAbstractSelectorListener listener,
AbstractSelectableChannel sc,
Object attachment,
Throwable msg)
{
if ( op == OP_ACCEPT ){
((VirtualAcceptSelectorListener)listener).selectFailure( VirtualChannelSelector.this, (ServerSocketChannel)sc, attachment, msg );
}else{
((VirtualSelectorListener)listener).selectFailure( VirtualChannelSelector.this, (SocketChannel)sc, attachment, msg );
}
}
@Override
protected AbstractSelectableChannel getChannel() throws IOException {
DatagramChannel c;
if (isWaitResponse()) {
c = DatagramChannelWithTimeouts.open();
((DatagramChannelWithTimeouts) c).setReadTimeout(getTimeoutAsInt());
} else {
c = DatagramChannel.open();
}
String bindAddress = getBindAddress();
if (bindAddress.isEmpty()) {
bindAddress = "0.0.0.0";
}
int adr = getBindPortAsInt();
c.bind(new InetSocketAddress(bindAddress, adr));
int port = Integer.parseInt(getPort());
c.connect(new InetSocketAddress(getHostName(), port));
return c;
}
void doAsyncClose(final AbstractSelectableChannel sc) {
AdaptorCloseAndInterrupt.pool.schedule(new Callable<Void>() {
public Void call() throws Exception {
sc.close();
return null;
}
}, new Random().nextInt(1000), TimeUnit.MILLISECONDS);
}
void doAsyncClose(final AbstractSelectableChannel sc) {
AdaptorCloseAndInterrupt.pool.schedule(new Callable<Void>() {
public Void call() throws Exception {
sc.close();
return null;
}
}, new Random().nextInt(1000), TimeUnit.MILLISECONDS);
}
void doAsyncClose(final AbstractSelectableChannel sc) {
AdaptorCloseAndInterrupt.pool.schedule(new Callable<Void>() {
public Void call() throws Exception {
sc.close();
return null;
}
}, new Random().nextInt(1000), TimeUnit.MILLISECONDS);
}
void doAsyncClose(final AbstractSelectableChannel sc) {
AdaptorCloseAndInterrupt.pool.schedule(new Callable<Void>() {
public Void call() throws Exception {
sc.close();
return null;
}
}, new Random().nextInt(1000), TimeUnit.MILLISECONDS);
}
void doAsyncClose(final AbstractSelectableChannel sc) {
AdaptorCloseAndInterrupt.pool.schedule(new Callable<Void>() {
public Void call() throws Exception {
sc.close();
return null;
}
}, new Random().nextInt(1000), TimeUnit.MILLISECONDS);
}
@Override
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
Object attachment)
{
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
if (attachment != null)
k.attach(attachment);
// register (if needed) before adding to key set
implRegister(k);
// add to the selector's key set, removing it immediately if the selector
// is closed. The key is not in the channel's key set at this point but
// it may be observed by a thread iterating over the selector's key set.
keys.add(k);
try {
k.interestOps(ops);
} catch (ClosedSelectorException e) {
assert ch.keyFor(this) == null;
keys.remove(k);
k.cancel();
throw e;
}
return k;
}
static void forEach(AbstractSelectableChannel ch, Consumer<SelectionKeyImpl> action) {
try {
FOREACH.invoke(ch, action);
} catch (Exception e) {
throw new InternalError(e);
}
}
void doAsyncClose(final AbstractSelectableChannel sc) {
AdaptorCloseAndInterrupt.pool.schedule(new Callable<Void>() {
public Void call() throws Exception {
sc.close();
isClosed.set(true);
return null;
}
}, new Random().nextInt(1000), TimeUnit.MILLISECONDS);
}
void doAsyncClose(final AbstractSelectableChannel sc) {
AdaptorCloseAndInterrupt.pool.schedule(new Callable<Void>() {
public Void call() throws Exception {
sc.close();
return null;
}
}, new Random().nextInt(1000), TimeUnit.MILLISECONDS);
}
public boolean
selectSuccess(
VirtualAbstractSelectorListener listener,
AbstractSelectableChannel sc,
Object attachment )
{
if ( op == OP_ACCEPT ){
return(((VirtualAcceptSelectorListener)listener).selectSuccess( VirtualChannelSelector.this, (ServerSocketChannel)sc, attachment ));
}else{
return(((VirtualSelectorListener)listener).selectSuccess( VirtualChannelSelector.this, (SocketChannel)sc, attachment ));
}
}
public void resumeSelects( AbstractSelectableChannel channel ) {
//System.out.println( "resumeSelects: " + channel + " - " + Debug.getCompressedStackTrace() );
if( channel == null ) {
Debug.printStackTrace( new Exception( "resumeSelects():: channel == null" ) );
return;
}
SelectionKey key = channel.keyFor( selector );
if( key != null && key.isValid() ) {
// if we're resuming a non-interested key then reset the metrics
if (( key.interestOps() & INTEREST_OP ) == 0 ){
RegistrationData data = (RegistrationData)key.attachment();
data.last_select_success_time = SystemTime.getCurrentTime();
data.non_progress_count = 0;
}
key.interestOps( key.interestOps() | INTEREST_OP );
}
else { //channel not (yet?) registered
try{ register_cancel_list_mon.enter();
paused_states.remove( channel ); //check if the channel's op has been already paused before select-time reg
}
finally{ register_cancel_list_mon.exit(); }
}
//try{
// selector.wakeup();
//}
//catch( Throwable t ) { Debug.out( "selector.wakeup():: caught exception: ", t ); }
}
public void
register(
AbstractSelectableChannel channel,
VirtualChannelSelector.VirtualAbstractSelectorListener listener,
Object attachment )
{
if ( destroyed ){
Debug.out( "register called after selector destroyed" );
}
if ( channel == null ){
Debug.out( "Attempt to register selects for null channel" );
return;
}
try{
register_cancel_list_mon.enter();
// ensure that there's only one operation outstanding for a given channel
// at any one time (the latest operation requested )
register_cancel_list.remove( channel );
paused_states.remove( channel );
register_cancel_list.put( channel, new RegistrationData( channel, listener, attachment ));
}finally{
register_cancel_list_mon.exit();
}
}
RegistrationData( AbstractSelectableChannel _channel, VirtualChannelSelector.VirtualAbstractSelectorListener _listener, Object _attachment ) {
channel = _channel;
listener = _listener;
attachment = _attachment;
last_select_success_time = SystemTime.getCurrentTime();
}
void doAsyncClose(final AbstractSelectableChannel sc) {
AdaptorCloseAndInterrupt.pool.schedule(new Callable<Void>() {
public Void call() throws Exception {
sc.close();
return null;
}
}, new Random().nextInt(1000), TimeUnit.MILLISECONDS);
}
void doAsyncClose(final AbstractSelectableChannel sc) {
AdaptorCloseAndInterrupt.pool.schedule(new Callable<Void>() {
public Void call() throws Exception {
sc.close();
return null;
}
}, new Random().nextInt(1000), TimeUnit.MILLISECONDS);
}
public static void close(AbstractSelectableChannel channel) {
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
LOGGER.error("Fail to close channel", e);
}
}
}
public void resumeSelects( AbstractSelectableChannel channel ) {
//System.out.println( "resumeSelects: " + channel + " - " + Debug.getCompressedStackTrace() );
if( channel == null ) {
Debug.printStackTrace( new Exception( "resumeSelects():: channel == null" ) );
return;
}
SelectionKey key = channel.keyFor( selector );
if( key != null && key.isValid() ) {
// if we're resuming a non-interested key then reset the metrics
if (( key.interestOps() & INTEREST_OP ) == 0 ){
RegistrationData data = (RegistrationData)key.attachment();
data.last_select_success_time = SystemTime.getCurrentTime();
data.non_progress_count = 0;
}
key.interestOps( key.interestOps() | INTEREST_OP );
}
else { //channel not (yet?) registered
try{ register_cancel_list_mon.enter();
paused_states.remove( channel ); //check if the channel's op has been already paused before select-time reg
}
finally{ register_cancel_list_mon.exit(); }
}
//try{
// selector.wakeup();
//}
//catch( Throwable t ) { Debug.out( "selector.wakeup():: caught exception: ", t ); }
}
private RegistrationData( AbstractSelectableChannel _channel, VirtualChannelSelector.VirtualAbstractSelectorListener _listener, Object _attachment ) {
channel = _channel;
listener = _listener;
attachment = _attachment;
last_select_success_time = SystemTime.getCurrentTime();
}
private void initSafeMode() {
//System.out.println( "***************** SAFE SOCKET SELECTOR MODE ENABLED *****************" );
if (Logger.isEnabled()) {
Logger.log(new LogEvent(LOGID, "***************** SAFE SOCKET SELECTOR MODE ENABLED *****************"));
}
selector_impl = null;
selectors = new HashMap<VirtualChannelSelectorImpl,ArrayList<AbstractSelectableChannel>>();
selectors_mon = new AEMonitor( "VirtualChannelSelector:FM" );
selectors.put( new VirtualChannelSelectorImpl( this, op, pause, randomise_keys ), new ArrayList<AbstractSelectableChannel>() );
selectors_keyset_cow = new HashSet<VirtualChannelSelectorImpl>( selectors.keySet());
}
public boolean
selectSuccess(
VirtualAbstractSelectorListener listener,
AbstractSelectableChannel sc,
Object attachment )
{
if ( op == OP_ACCEPT ){
return(((VirtualAcceptSelectorListener)listener).selectSuccess( VirtualChannelSelector.this, (ServerSocketChannel)sc, attachment ));
}else{
return(((VirtualSelectorListener)listener).selectSuccess( VirtualChannelSelector.this, (SocketChannel)sc, attachment ));
}
}