下面列出了java.nio.channels.SelectableChannel#register() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
void selectorRemovesKeysOnCancelWhenSelecting() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
SelectionKey key = source.register(selector, OP_READ);
assertTrue(selector.keys().contains(key));
key.cancel();
assertTrue(selector.keys().contains(key));
assertSame(key, source.keyFor(selector));
selector.selectNow();
assertFalse(selector.keys().contains(key));
assertNull(source.keyFor(selector));
}
/**
* Registers an arbitrary {@link SelectableChannel}, not necessarily created by Netty, to the {@link Selector}
* of this event loop. Once the specified {@link SelectableChannel} is registered, the specified {@code task} will
* be executed by this event loop when the {@link SelectableChannel} is ready.
*/
public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
if (ch == null) {
throw new NullPointerException("ch");
}
if (interestOps == 0) {
throw new IllegalArgumentException("interestOps must be non-zero.");
}
if ((interestOps & ~ch.validOps()) != 0) {
throw new IllegalArgumentException(
"invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')');
}
if (task == null) {
throw new NullPointerException("task");
}
if (isShutdown()) {
throw new IllegalStateException("event loop shut down");
}
try {
ch.register(selector, interestOps, task);
} catch (Exception e) {
throw new EventLoopException("failed to register a channel", e);
}
}
@Test
void selectorRemovesKeysOnChannelCloseWhenSelecting() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
SelectionKey key = source.register(selector, OP_READ);
assertTrue(selector.keys().contains(key));
source.close();
assertTrue(selector.keys().contains(key));
selector.selectNow();
assertFalse(selector.keys().contains(key));
}
@Test
void selectorRemovesKeysOnCancelWhenSelecting() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
SelectionKey key = source.register(selector, OP_READ);
assertTrue(selector.keys().contains(key));
key.cancel();
assertTrue(selector.keys().contains(key));
assertSame(key, source.keyFor(selector));
selector.selectNow();
assertFalse(selector.keys().contains(key));
assertNull(source.keyFor(selector));
}
/**
* Register the given channel with the given selector for
* the given operations of interest
*/
protected void registerChannel(Selector selector,
SelectableChannel channel,
int ops,
Object attach) throws Exception {
if (channel == null)return; // could happen
// set the new channel non-blocking
channel.configureBlocking(false);
// register it with the selector
channel.register(selector, ops, attach);
}
private void handleDeferredRegistrations()
{
synchronized (deferredRegistrations) {
int deferredListSize = deferredRegistrations.size();
for (int i = 0; i < deferredListSize; i++) {
EventHandler eventHandler =
(EventHandler)deferredRegistrations.get(i);
if (orb.transportDebugFlag) {
dprint(".handleDeferredRegistrations: " + eventHandler);
}
SelectableChannel channel = eventHandler.getChannel();
SelectionKey selectionKey = null;
try {
selectionKey =
channel.register(selector,
eventHandler.getInterestOps(),
(Object)eventHandler);
} catch (ClosedChannelException e) {
if (orb.transportDebugFlag) {
dprint(".handleDeferredRegistrations: ", e);
}
}
eventHandler.setSelectionKey(selectionKey);
}
deferredRegistrations.clear();
}
}
@Test
void selectorRemovesKeysOnChannelCloseWhileSelecting() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
SelectionKey key = source.register(selector, OP_READ);
assertTrue(selector.keys().contains(key));
CountDownLatch latch = new CountDownLatch(1);
Future<?> job = executor.submit(() -> {
latch.countDown();
try {
selector.select();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
latch.await();
Thread.sleep(100);
source.close();
selector.wakeup();
job.get();
assertFalse(selector.keys().contains(key));
}
@Test
void selectorRemovesKeysOnCancelWhileSelecting() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
SelectionKey key = source.register(selector, OP_READ);
assertTrue(selector.keys().contains(key));
CountDownLatch latch = new CountDownLatch(1);
Future<?> job = executor.submit(() -> {
latch.countDown();
try {
selector.select();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
latch.await();
Thread.sleep(100);
key.cancel();
assertTrue(selector.keys().contains(key));
assertSame(key, source.keyFor(selector));
selector.wakeup();
job.get();
assertFalse(selector.keys().contains(key));
assertNull(source.keyFor(selector));
}
public void register(SelectableChannel channel, int ops, Handler handler) {
try {
channel.register(selector, ops, handler);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
}
private void registerItem(PollController conn)
{
try {
SelectableChannel selChannel = conn.getSocket().selectableChannel();
SelectionKey key = selChannel.register(_selector,
SelectionKey.OP_READ,
conn);
if (key == null) {
log.warning(this + " selection failed for " + conn);
return;
}
if (! _lifecycle.isActive()) {
return;
}
if (_activeCount.decrementAndGet() == 0 && _lifecycle.isDestroyed()) {
destroy();
}
if (log.isLoggable(Level.FINER)) {
log.finer(conn + " add keepalive (select fd=" + key +
",timeout=" + (conn.getIdleExpireTime() - CurrentTime.currentTime()) + "ms)");
}
_keepaliveAsyncMeter.start();
} catch (Exception e) {
_lifecycle.toError();
log.log(Level.WARNING, e.toString(), e);
}
}
private void handleDeferredRegistrations()
{
synchronized (deferredRegistrations) {
int deferredListSize = deferredRegistrations.size();
for (int i = 0; i < deferredListSize; i++) {
EventHandler eventHandler =
(EventHandler)deferredRegistrations.get(i);
if (orb.transportDebugFlag) {
dprint(".handleDeferredRegistrations: " + eventHandler);
}
SelectableChannel channel = eventHandler.getChannel();
SelectionKey selectionKey = null;
try {
selectionKey =
channel.register(selector,
eventHandler.getInterestOps(),
(Object)eventHandler);
} catch (ClosedChannelException e) {
if (orb.transportDebugFlag) {
dprint(".handleDeferredRegistrations: ", e);
}
}
eventHandler.setSelectionKey(selectionKey);
}
deferredRegistrations.clear();
}
}
private void handleDeferredRegistrations()
{
synchronized (deferredRegistrations) {
int deferredListSize = deferredRegistrations.size();
for (int i = 0; i < deferredListSize; i++) {
EventHandler eventHandler =
(EventHandler)deferredRegistrations.get(i);
if (orb.transportDebugFlag) {
dprint(".handleDeferredRegistrations: " + eventHandler);
}
SelectableChannel channel = eventHandler.getChannel();
SelectionKey selectionKey = null;
try {
selectionKey =
channel.register(selector,
eventHandler.getInterestOps(),
(Object)eventHandler);
} catch (ClosedChannelException e) {
if (orb.transportDebugFlag) {
dprint(".handleDeferredRegistrations: " + e);
}
}
eventHandler.setSelectionKey(selectionKey);
}
deferredRegistrations.clear();
}
}
@Test
void selectorRemovesKeysOnChannelCloseWhileSelecting() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
SelectionKey key = source.register(selector, OP_READ);
assertTrue(selector.keys().contains(key));
CountDownLatch latch = new CountDownLatch(1);
Future<?> job = executor.submit(() -> {
latch.countDown();
try {
selector.select();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
latch.await();
Thread.sleep(100);
source.close();
selector.wakeup();
job.get();
assertFalse(selector.keys().contains(key));
}
private void handleDeferredRegistrations()
{
synchronized (deferredRegistrations) {
int deferredListSize = deferredRegistrations.size();
for (int i = 0; i < deferredListSize; i++) {
EventHandler eventHandler =
(EventHandler)deferredRegistrations.get(i);
if (orb.transportDebugFlag) {
dprint(".handleDeferredRegistrations: " + eventHandler);
}
SelectableChannel channel = eventHandler.getChannel();
SelectionKey selectionKey = null;
try {
selectionKey =
channel.register(selector,
eventHandler.getInterestOps(),
(Object)eventHandler);
} catch (ClosedChannelException e) {
if (orb.transportDebugFlag) {
dprint(".handleDeferredRegistrations: ", e);
}
}
eventHandler.setSelectionKey(selectionKey);
}
deferredRegistrations.clear();
}
}
public SelectionKey registerWithSelector(SelectableChannel channel, int ops, ChannelSelectedCallback callback)
throws ClosedChannelException {
SelectionKeyAttachment selectionKeyAttachment = new SelectionKeyAttachment(callback);
registrationLock.lock();
try {
selector.wakeup();
return channel.register(selector, ops, selectionKeyAttachment);
} finally {
registrationLock.unlock();
}
}
private void register() throws ClosedChannelException {
for (NioByteChannel channel = newChannels.poll(); channel != null; channel = newChannels.poll()) {
SelectableChannel sc = channel.innerChannel();
SelectionKey key = sc.register(selector, SelectionKey.OP_READ, channel);
channel.setSelectionKey(key);
idleTimer.add(channel);
// fire channel opened event
fireChannelOpened(channel);
}
}
/**
* Waits on the channel with the given timeout using one of the
* cached selectors. It also removes any cached selectors that are
* idle for a few seconds.
*
* @param channel
* @param ops
* @param timeout
* @return
* @throws IOException
*/
int select(SelectableChannel channel, int ops, long timeout)
throws IOException {
SelectorInfo info = get(channel);
SelectionKey key = null;
int ret = 0;
try {
while (true) {
long start = (timeout == 0) ? 0 : Time.now();
key = channel.register(info.selector, ops);
ret = info.selector.select(timeout);
if (ret != 0) {
return ret;
}
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedIOException("Interrupted while waiting for "
+ "IO on channel " + channel + ". " + timeout
+ " millis timeout left.");
}
/* Sometimes select() returns 0 much before timeout for
* unknown reasons. So select again if required.
*/
if (timeout > 0) {
timeout -= Time.now() - start;
if (timeout <= 0) {
return 0;
}
}
}
} finally {
if (key != null) {
key.cancel();
}
//clear the canceled key.
try {
info.selector.selectNow();
} catch (IOException e) {
LOG.info("Unexpected Exception while clearing selector : ", e);
// don't put the selector back.
info.close();
return ret;
}
release(info);
}
}
/**
* Waits on the channel with the given timeout using one of the
* cached selectors. It also removes any cached selectors that are
* idle for a few seconds.
*
* @param channel
* @param ops
* @param timeout
* @return
* @throws IOException
*/
int select(SelectableChannel channel, int ops, long timeout)
throws IOException {
SelectorInfo info = get(channel);
SelectionKey key = null;
int ret = 0;
try {
while (true) {
long start = (timeout == 0) ? 0 : System.currentTimeMillis();
key = channel.register(info.selector, ops);
ret = info.selector.select(timeout);
if (ret != 0) {
return ret;
}
/* Sometimes select() returns 0 much before timeout for
* unknown reasons. So select again if required.
*/
if (timeout > 0) {
timeout -= System.currentTimeMillis() - start;
if (timeout <= 0) {
return 0;
}
}
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedIOException("Interruped while waiting for " +
"IO on channel " + channel +
". " + timeout +
" millis timeout left.");
}
}
} finally {
if (key != null) {
key.cancel();
}
//clear the canceled key.
try {
info.selector.selectNow();
} catch (IOException e) {
LOG.info("Unexpected Exception while clearing selector : " +
StringUtils.stringifyException(e));
// don't put the selector back.
info.close();
return ret;
}
release(info);
}
}
/**
* Waits on the channel with the given timeout using one of the
* cached selectors. It also removes any cached selectors that are
* idle for a few seconds.
*
* @param channel
* @param ops
* @param timeout
* @return
* @throws IOException
*/
int select(SelectableChannel channel, int ops, long timeout)
throws IOException {
SelectorInfo info = get(channel);
SelectionKey key = null;
int ret = 0;
try {
while (true) {
long start = (timeout == 0) ? 0 : System.currentTimeMillis();
key = channel.register(info.selector, ops);
ret = info.selector.select(timeout);
if (ret != 0) {
return ret;
}
/* Sometimes select() returns 0 much before timeout for
* unknown reasons. So select again if required.
*/
if (timeout > 0) {
timeout -= System.currentTimeMillis() - start;
if (timeout <= 0) {
return 0;
}
}
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedIOException("Interruped while waiting for " +
"IO on channel " + channel +
". " + timeout +
" millis timeout left.");
}
}
} finally {
if (key != null) {
key.cancel();
}
//clear the canceled key.
try {
info.selector.selectNow();
} catch (IOException e) {
LOG.info("Unexpected Exception while clearing selector : " +
StringUtils.stringifyException(e));
// don't put the selector back.
info.close();
return ret;
}
release(info);
}
}
public synchronized <T extends SelectableChannel> ChannelAction<T> registerAction(int port, ConnectionType type, ChannelListener<T> listener, int ops) throws IOException
{
boolean startProc = (channels.size() == 0);
SelectableChannel channel = null;
SocketAddress address = new InetSocketAddress(port);
ConnectionInfo connectionInfo = new ConnectionInfo();
connectionInfo.address = address;
connectionInfo.connectionType = type;
ChannelConfiguration channelConfiguration = channels.get(connectionInfo);
if (channelConfiguration == null) {
Object socket = null;
if (type == ConnectionType.TCP) {
SocketChannel schannel = SocketChannel.open();
schannel.configureBlocking(false);
Socket ssocket = schannel.socket();
ssocket.bind(address);
socket = ssocket;
channel = schannel;
} else if (type == ConnectionType.UDP) {
DatagramChannel dchannel = DatagramChannel.open();
dchannel.configureBlocking(false);
DatagramSocket dsocket = dchannel.socket();
dsocket.bind(address);
socket = dsocket;
channel = dchannel;
}
if (channel == null) {
throw new IOException("Unsupported connection type");
}
channelConfiguration = new ChannelConfiguration();
channelConfiguration.actions = new ConcurrentLinkedQueue<ChannelAction>();
channelConfiguration.channel = channel;
channelConfiguration.connectionInfo = connectionInfo;
channels.put(connectionInfo, channelConfiguration);
channelConfigurations.put(channel, channelConfiguration);
} else {
channel = channelConfiguration.channel;
}
ChannelAction channelAction = new ChannelAction();
channelAction.channelConfiguration = channelConfiguration;
channelAction.listener = listener;
channelAction.ops = ops;
channelConfiguration.actions.add(channelAction);
if (startProc) {
startProcess();
}
if (listener != null) {
channel.register(selector, ops);
}
return channelAction;
}