java.nio.channels.SelectableChannel#register()源码实例Demo

下面列出了java.nio.channels.SelectableChannel#register() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: incubator-tuweni   文件: SelectorTest.java
@Test
void selectorRemovesKeysOnCancelWhenSelecting() throws Exception {
  Pipe pipe = Pipe.open();

  Selector selector = Selector.open();
  SelectableChannel source = pipe.source();
  source.configureBlocking(false);

  SelectionKey key = source.register(selector, OP_READ);
  assertTrue(selector.keys().contains(key));

  key.cancel();
  assertTrue(selector.keys().contains(key));
  assertSame(key, source.keyFor(selector));

  selector.selectNow();
  assertFalse(selector.keys().contains(key));
  assertNull(source.keyFor(selector));
}
 
源代码2 项目: netty4.0.27Learn   文件: NioEventLoop.java
/**
 * Registers an arbitrary {@link SelectableChannel}, not necessarily created by Netty, to the {@link Selector}
 * of this event loop.  Once the specified {@link SelectableChannel} is registered, the specified {@code task} will
 * be executed by this event loop when the {@link SelectableChannel} is ready.
 */
public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
    if (ch == null) {
        throw new NullPointerException("ch");
    }
    if (interestOps == 0) {
        throw new IllegalArgumentException("interestOps must be non-zero.");
    }
    if ((interestOps & ~ch.validOps()) != 0) {
        throw new IllegalArgumentException(
                "invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')');
    }
    if (task == null) {
        throw new NullPointerException("task");
    }

    if (isShutdown()) {
        throw new IllegalStateException("event loop shut down");
    }

    try {
        ch.register(selector, interestOps, task);
    } catch (Exception e) {
        throw new EventLoopException("failed to register a channel", e);
    }
}
 
源代码3 项目: cava   文件: SelectorTest.java
@Test
void selectorRemovesKeysOnChannelCloseWhenSelecting() throws Exception {
  Pipe pipe = Pipe.open();

  Selector selector = Selector.open();
  SelectableChannel source = pipe.source();
  source.configureBlocking(false);

  SelectionKey key = source.register(selector, OP_READ);
  assertTrue(selector.keys().contains(key));

  source.close();
  assertTrue(selector.keys().contains(key));

  selector.selectNow();
  assertFalse(selector.keys().contains(key));
}
 
源代码4 项目: cava   文件: SelectorTest.java
@Test
void selectorRemovesKeysOnCancelWhenSelecting() throws Exception {
  Pipe pipe = Pipe.open();

  Selector selector = Selector.open();
  SelectableChannel source = pipe.source();
  source.configureBlocking(false);

  SelectionKey key = source.register(selector, OP_READ);
  assertTrue(selector.keys().contains(key));

  key.cancel();
  assertTrue(selector.keys().contains(key));
  assertSame(key, source.keyFor(selector));

  selector.selectNow();
  assertFalse(selector.keys().contains(key));
  assertNull(source.keyFor(selector));
}
 
源代码5 项目: tomcatsrc   文件: NioReceiver.java
/**
 * Register the given channel with the given selector for
 * the given operations of interest
 */
protected void registerChannel(Selector selector,
                               SelectableChannel channel,
                               int ops,
                               Object attach) throws Exception {
    if (channel == null)return; // could happen
    // set the new channel non-blocking
    channel.configureBlocking(false);
    // register it with the selector
    channel.register(selector, ops, attach);
}
 
源代码6 项目: hottub   文件: SelectorImpl.java
private void handleDeferredRegistrations()
{
    synchronized (deferredRegistrations) {
        int deferredListSize = deferredRegistrations.size();
        for (int i = 0; i < deferredListSize; i++) {
            EventHandler eventHandler =
                (EventHandler)deferredRegistrations.get(i);
            if (orb.transportDebugFlag) {
                dprint(".handleDeferredRegistrations: " + eventHandler);
            }
            SelectableChannel channel = eventHandler.getChannel();
            SelectionKey selectionKey = null;
            try {
                selectionKey =
                    channel.register(selector,
                                     eventHandler.getInterestOps(),
                                     (Object)eventHandler);
            } catch (ClosedChannelException e) {
                if (orb.transportDebugFlag) {
                    dprint(".handleDeferredRegistrations: ", e);
                }
            }
            eventHandler.setSelectionKey(selectionKey);
        }
        deferredRegistrations.clear();
    }
}
 
源代码7 项目: incubator-tuweni   文件: SelectorTest.java
@Test
void selectorRemovesKeysOnChannelCloseWhileSelecting() throws Exception {
  Pipe pipe = Pipe.open();

  Selector selector = Selector.open();
  SelectableChannel source = pipe.source();
  source.configureBlocking(false);

  SelectionKey key = source.register(selector, OP_READ);
  assertTrue(selector.keys().contains(key));

  CountDownLatch latch = new CountDownLatch(1);
  Future<?> job = executor.submit(() -> {
    latch.countDown();
    try {
      selector.select();
    } catch (IOException e) {
      throw new UncheckedIOException(e);
    }
  });

  latch.await();
  Thread.sleep(100);

  source.close();
  selector.wakeup();
  job.get();
  assertFalse(selector.keys().contains(key));
}
 
源代码8 项目: incubator-tuweni   文件: SelectorTest.java
@Test
void selectorRemovesKeysOnCancelWhileSelecting() throws Exception {
  Pipe pipe = Pipe.open();

  Selector selector = Selector.open();
  SelectableChannel source = pipe.source();
  source.configureBlocking(false);

  SelectionKey key = source.register(selector, OP_READ);
  assertTrue(selector.keys().contains(key));

  CountDownLatch latch = new CountDownLatch(1);
  Future<?> job = executor.submit(() -> {
    latch.countDown();
    try {
      selector.select();
    } catch (IOException e) {
      throw new UncheckedIOException(e);
    }
  });

  latch.await();
  Thread.sleep(100);

  key.cancel();
  assertTrue(selector.keys().contains(key));
  assertSame(key, source.keyFor(selector));

  selector.wakeup();
  job.get();
  assertFalse(selector.keys().contains(key));
  assertNull(source.keyFor(selector));
}
 
源代码9 项目: new-bull   文件: ReactorEchoServerV1.java
public void register(SelectableChannel channel, int ops, Handler handler) {
    try {
        channel.register(selector, ops, handler);
    } catch (ClosedChannelException e) {
        e.printStackTrace();
    }
}
 
源代码10 项目: baratine   文件: PollTcpManagerNio.java
private void registerItem(PollController conn)
{
  try {
    SelectableChannel selChannel = conn.getSocket().selectableChannel();

    SelectionKey key = selChannel.register(_selector, 
                                           SelectionKey.OP_READ,
                                           conn);

    if (key == null) {
      log.warning(this + " selection failed for " + conn);

      return;
    }

    if (! _lifecycle.isActive()) {
      return;
    }

    if (_activeCount.decrementAndGet() == 0 && _lifecycle.isDestroyed()) {
      destroy();
    }

    if (log.isLoggable(Level.FINER)) {
      log.finer(conn + " add keepalive (select fd=" + key + 
                ",timeout=" + (conn.getIdleExpireTime() - CurrentTime.currentTime()) + "ms)");
    }

    _keepaliveAsyncMeter.start();
  } catch (Exception e) {
    _lifecycle.toError();
    log.log(Level.WARNING, e.toString(), e);
  }
}
 
源代码11 项目: TencentKona-8   文件: SelectorImpl.java
private void handleDeferredRegistrations()
{
    synchronized (deferredRegistrations) {
        int deferredListSize = deferredRegistrations.size();
        for (int i = 0; i < deferredListSize; i++) {
            EventHandler eventHandler =
                (EventHandler)deferredRegistrations.get(i);
            if (orb.transportDebugFlag) {
                dprint(".handleDeferredRegistrations: " + eventHandler);
            }
            SelectableChannel channel = eventHandler.getChannel();
            SelectionKey selectionKey = null;
            try {
                selectionKey =
                    channel.register(selector,
                                     eventHandler.getInterestOps(),
                                     (Object)eventHandler);
            } catch (ClosedChannelException e) {
                if (orb.transportDebugFlag) {
                    dprint(".handleDeferredRegistrations: ", e);
                }
            }
            eventHandler.setSelectionKey(selectionKey);
        }
        deferredRegistrations.clear();
    }
}
 
源代码12 项目: jdk8u60   文件: SelectorImpl.java
private void handleDeferredRegistrations()
{
    synchronized (deferredRegistrations) {
        int deferredListSize = deferredRegistrations.size();
        for (int i = 0; i < deferredListSize; i++) {
            EventHandler eventHandler =
                (EventHandler)deferredRegistrations.get(i);
            if (orb.transportDebugFlag) {
                dprint(".handleDeferredRegistrations: " + eventHandler);
            }
            SelectableChannel channel = eventHandler.getChannel();
            SelectionKey selectionKey = null;
            try {
                selectionKey =
                    channel.register(selector,
                                     eventHandler.getInterestOps(),
                                     (Object)eventHandler);
            } catch (ClosedChannelException e) {
                if (orb.transportDebugFlag) {
                    dprint(".handleDeferredRegistrations: " + e);
                }
            }
            eventHandler.setSelectionKey(selectionKey);
        }
        deferredRegistrations.clear();
    }
}
 
源代码13 项目: cava   文件: SelectorTest.java
@Test
void selectorRemovesKeysOnChannelCloseWhileSelecting() throws Exception {
  Pipe pipe = Pipe.open();

  Selector selector = Selector.open();
  SelectableChannel source = pipe.source();
  source.configureBlocking(false);

  SelectionKey key = source.register(selector, OP_READ);
  assertTrue(selector.keys().contains(key));

  CountDownLatch latch = new CountDownLatch(1);
  Future<?> job = executor.submit(() -> {
    latch.countDown();
    try {
      selector.select();
    } catch (IOException e) {
      throw new UncheckedIOException(e);
    }
  });

  latch.await();
  Thread.sleep(100);

  source.close();
  selector.wakeup();
  job.get();
  assertFalse(selector.keys().contains(key));
}
 
源代码14 项目: openjdk-jdk8u   文件: SelectorImpl.java
private void handleDeferredRegistrations()
{
    synchronized (deferredRegistrations) {
        int deferredListSize = deferredRegistrations.size();
        for (int i = 0; i < deferredListSize; i++) {
            EventHandler eventHandler =
                (EventHandler)deferredRegistrations.get(i);
            if (orb.transportDebugFlag) {
                dprint(".handleDeferredRegistrations: " + eventHandler);
            }
            SelectableChannel channel = eventHandler.getChannel();
            SelectionKey selectionKey = null;
            try {
                selectionKey =
                    channel.register(selector,
                                     eventHandler.getInterestOps(),
                                     (Object)eventHandler);
            } catch (ClosedChannelException e) {
                if (orb.transportDebugFlag) {
                    dprint(".handleDeferredRegistrations: ", e);
                }
            }
            eventHandler.setSelectionKey(selectionKey);
        }
        deferredRegistrations.clear();
    }
}
 
源代码15 项目: Smack   文件: SmackReactor.java
public SelectionKey registerWithSelector(SelectableChannel channel, int ops, ChannelSelectedCallback callback)
        throws ClosedChannelException {
    SelectionKeyAttachment selectionKeyAttachment = new SelectionKeyAttachment(callback);

    registrationLock.lock();
    try {
        selector.wakeup();
        return channel.register(selector, ops, selectionKeyAttachment);
    } finally {
        registrationLock.unlock();
    }
}
 
源代码16 项目: craft-atom   文件: NioProcessor.java
private void register() throws ClosedChannelException {
	for (NioByteChannel channel = newChannels.poll(); channel != null; channel = newChannels.poll()) {
		SelectableChannel sc = channel.innerChannel();
		SelectionKey key = sc.register(selector, SelectionKey.OP_READ, channel);
		channel.setSelectionKey(key);
		idleTimer.add(channel);
		
		// fire channel opened event
		fireChannelOpened(channel);
	}
}
 
源代码17 项目: hadoop   文件: SocketIOWithTimeout.java
/**
 * Waits on the channel with the given timeout using one of the 
 * cached selectors. It also removes any cached selectors that are
 * idle for a few seconds.
 * 
 * @param channel
 * @param ops
 * @param timeout
 * @return
 * @throws IOException
 */
int select(SelectableChannel channel, int ops, long timeout) 
                                               throws IOException {
 
  SelectorInfo info = get(channel);
  
  SelectionKey key = null;
  int ret = 0;
  
  try {
    while (true) {
      long start = (timeout == 0) ? 0 : Time.now();

      key = channel.register(info.selector, ops);
      ret = info.selector.select(timeout);
      
      if (ret != 0) {
        return ret;
      }
      
      if (Thread.currentThread().isInterrupted()) {
        throw new InterruptedIOException("Interrupted while waiting for "
            + "IO on channel " + channel + ". " + timeout
            + " millis timeout left.");
      }

      /* Sometimes select() returns 0 much before timeout for 
       * unknown reasons. So select again if required.
       */
      if (timeout > 0) {
        timeout -= Time.now() - start;
        if (timeout <= 0) {
          return 0;
        }
      }
      
    }
  } finally {
    if (key != null) {
      key.cancel();
    }
    
    //clear the canceled key.
    try {
      info.selector.selectNow();
    } catch (IOException e) {
      LOG.info("Unexpected Exception while clearing selector : ", e);
      // don't put the selector back.
      info.close();
      return ret; 
    }
    
    release(info);
  }
}
 
源代码18 项目: hadoop-gpu   文件: SocketIOWithTimeout.java
/**
 * Waits on the channel with the given timeout using one of the 
 * cached selectors. It also removes any cached selectors that are
 * idle for a few seconds.
 * 
 * @param channel
 * @param ops
 * @param timeout
 * @return
 * @throws IOException
 */
int select(SelectableChannel channel, int ops, long timeout) 
                                               throws IOException {
 
  SelectorInfo info = get(channel);
  
  SelectionKey key = null;
  int ret = 0;
  
  try {
    while (true) {
      long start = (timeout == 0) ? 0 : System.currentTimeMillis();

      key = channel.register(info.selector, ops);
      ret = info.selector.select(timeout);
      
      if (ret != 0) {
        return ret;
      }
      
      /* Sometimes select() returns 0 much before timeout for 
       * unknown reasons. So select again if required.
       */
      if (timeout > 0) {
        timeout -= System.currentTimeMillis() - start;
        if (timeout <= 0) {
          return 0;
        }
      }
      
      if (Thread.currentThread().isInterrupted()) {
        throw new InterruptedIOException("Interruped while waiting for " +
                                         "IO on channel " + channel +
                                         ". " + timeout + 
                                         " millis timeout left.");
      }
    }
  } finally {
    if (key != null) {
      key.cancel();
    }
    
    //clear the canceled key.
    try {
      info.selector.selectNow();
    } catch (IOException e) {
      LOG.info("Unexpected Exception while clearing selector : " +
               StringUtils.stringifyException(e));
      // don't put the selector back.
      info.close();
      return ret; 
    }
    
    release(info);
  }
}
 
源代码19 项目: RDFS   文件: SocketIOWithTimeout.java
/**
 * Waits on the channel with the given timeout using one of the 
 * cached selectors. It also removes any cached selectors that are
 * idle for a few seconds.
 * 
 * @param channel
 * @param ops
 * @param timeout
 * @return
 * @throws IOException
 */
int select(SelectableChannel channel, int ops, long timeout) 
                                               throws IOException {
 
  SelectorInfo info = get(channel);
  
  SelectionKey key = null;
  int ret = 0;
  
  try {
    while (true) {
      long start = (timeout == 0) ? 0 : System.currentTimeMillis();

      key = channel.register(info.selector, ops);
      ret = info.selector.select(timeout);
      
      if (ret != 0) {
        return ret;
      }
      
      /* Sometimes select() returns 0 much before timeout for 
       * unknown reasons. So select again if required.
       */
      if (timeout > 0) {
        timeout -= System.currentTimeMillis() - start;
        if (timeout <= 0) {
          return 0;
        }
      }
      
      if (Thread.currentThread().isInterrupted()) {
        throw new InterruptedIOException("Interruped while waiting for " +
                                         "IO on channel " + channel +
                                         ". " + timeout + 
                                         " millis timeout left.");
      }
    }
  } finally {
    if (key != null) {
      key.cancel();
    }
    
    //clear the canceled key.
    try {
      info.selector.selectNow();
    } catch (IOException e) {
      LOG.info("Unexpected Exception while clearing selector : " +
               StringUtils.stringifyException(e));
      // don't put the selector back.
      info.close();
      return ret; 
    }
    
    release(info);
  }
}
 
源代码20 项目: attic-apex-malhar   文件: NetworkManager.java
public synchronized <T extends SelectableChannel> ChannelAction<T> registerAction(int port, ConnectionType type, ChannelListener<T> listener, int ops) throws IOException
{
  boolean startProc = (channels.size() == 0);
  SelectableChannel channel = null;
  SocketAddress address = new InetSocketAddress(port);
  ConnectionInfo connectionInfo = new ConnectionInfo();
  connectionInfo.address =  address;
  connectionInfo.connectionType = type;
  ChannelConfiguration channelConfiguration = channels.get(connectionInfo);
  if (channelConfiguration == null) {
    Object socket = null;
    if (type == ConnectionType.TCP) {
      SocketChannel schannel = SocketChannel.open();
      schannel.configureBlocking(false);
      Socket ssocket = schannel.socket();
      ssocket.bind(address);
      socket = ssocket;
      channel = schannel;
    } else if (type == ConnectionType.UDP) {
      DatagramChannel dchannel = DatagramChannel.open();
      dchannel.configureBlocking(false);
      DatagramSocket dsocket = dchannel.socket();
      dsocket.bind(address);
      socket = dsocket;
      channel = dchannel;
    }
    if (channel == null) {
      throw new IOException("Unsupported connection type");
    }
    channelConfiguration = new ChannelConfiguration();
    channelConfiguration.actions = new ConcurrentLinkedQueue<ChannelAction>();
    channelConfiguration.channel = channel;
    channelConfiguration.connectionInfo = connectionInfo;
    channels.put(connectionInfo, channelConfiguration);
    channelConfigurations.put(channel, channelConfiguration);
  } else {
    channel = channelConfiguration.channel;
  }
  ChannelAction channelAction = new ChannelAction();
  channelAction.channelConfiguration = channelConfiguration;
  channelAction.listener = listener;
  channelAction.ops = ops;
  channelConfiguration.actions.add(channelAction);
  if (startProc) {
    startProcess();
  }
  if (listener != null) {
    channel.register(selector, ops);
  }
  return channelAction;
}