下面列出了java.nio.channels.SelectableChannel#configureBlocking() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Create a new instance
*
* @param parent the parent {@link Channel} by which this instance was created. May be {@code null}
* @param ch the underlying {@link SelectableChannel} on which it operates
* @param readInterestOp the ops to set to receive data from the {@link SelectableChannel}
*/
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
@Test
void selectorRemovesKeysOnChannelCloseWhenSelecting() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
SelectionKey key = source.register(selector, OP_READ);
assertTrue(selector.keys().contains(key));
source.close();
assertTrue(selector.keys().contains(key));
selector.selectNow();
assertFalse(selector.keys().contains(key));
}
@Test
void selectorRemovesKeysOnCancelWhenSelecting() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
SelectionKey key = source.register(selector, OP_READ);
assertTrue(selector.keys().contains(key));
key.cancel();
assertTrue(selector.keys().contains(key));
assertSame(key, source.keyFor(selector));
selector.selectNow();
assertFalse(selector.keys().contains(key));
assertNull(source.keyFor(selector));
}
@Test
void cancelledKeyRemovedFromChannel() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
for (int i = 0; i < 1000; ++i) {
assertNull(source.keyFor(selector));
SelectionKey key = source.register(selector, OP_READ);
selector.selectedKeys().clear();
selector.selectNow();
key.cancel();
selector.wakeup();
selector.selectedKeys().clear();
selector.selectNow();
}
}
protected void startNIOServer() throws Exception {
SelectableChannel server = null;
int interestKey;
//1. Start reactor service
selectorManager.start();
//2. Start server on the specified port
if (this.udpMode) {
server = DatagramChannel.open();
((DatagramChannel) server).socket().bind(new InetSocketAddress(host, port));
interestKey = SelectionKey.OP_READ;
} else {
server = ServerSocketChannel.open();
((ServerSocketChannel) server).socket().bind(new InetSocketAddress(host, port), 1024);
interestKey = SelectionKey.OP_ACCEPT;
}
server.configureBlocking(false);
//3. Choose one reactor to handle NIO event
selectorManager.getReactor(0).registerChannel(server, interestKey);
System.out.println("INFO: NAMI Server started on port " + String.valueOf(port) + "...");
}
@Test
void selectorRemovesKeysOnChannelCloseWhenSelecting() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
SelectionKey key = source.register(selector, OP_READ);
assertTrue(selector.keys().contains(key));
source.close();
assertTrue(selector.keys().contains(key));
selector.selectNow();
assertFalse(selector.keys().contains(key));
}
@Test
void selectorRemovesKeysOnCancelWhenSelecting() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
SelectionKey key = source.register(selector, OP_READ);
assertTrue(selector.keys().contains(key));
key.cancel();
assertTrue(selector.keys().contains(key));
assertSame(key, source.keyFor(selector));
selector.selectNow();
assertFalse(selector.keys().contains(key));
assertNull(source.keyFor(selector));
}
@Test
void cancelledKeyRemovedFromChannel() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
for (int i = 0; i < 1000; ++i) {
assertNull(source.keyFor(selector));
SelectionKey key = source.register(selector, OP_READ);
selector.selectedKeys().clear();
selector.selectNow();
key.cancel();
selector.wakeup();
selector.selectedKeys().clear();
selector.selectNow();
}
}
@Test
void selectorRemovesKeysOnChannelCloseWhileSelecting() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
SelectionKey key = source.register(selector, OP_READ);
assertTrue(selector.keys().contains(key));
CountDownLatch latch = new CountDownLatch(1);
Future<?> job = executor.submit(() -> {
latch.countDown();
try {
selector.select();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
latch.await();
Thread.sleep(100);
source.close();
selector.wakeup();
job.get();
assertFalse(selector.keys().contains(key));
}
/**
* Register the given channel with the given selector for
* the given operations of interest
*/
protected void registerChannel(Selector selector,
SelectableChannel channel,
int ops,
Object attach) throws Exception {
if (channel == null)return; // could happen
// set the new channel non-blocking
channel.configureBlocking(false);
// register it with the selector
channel.register(selector, ops, attach);
}
SocketIOWithTimeout(SelectableChannel channel, long timeout)
throws IOException {
checkChannelValidity(channel);
this.channel = channel;
this.timeout = timeout;
// Set non-blocking
channel.configureBlocking(false);
}
@Test
void selectorRemovesKeysOnChannelCloseWhileSelecting() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
SelectionKey key = source.register(selector, OP_READ);
assertTrue(selector.keys().contains(key));
CountDownLatch latch = new CountDownLatch(1);
Future<?> job = executor.submit(() -> {
latch.countDown();
try {
selector.select();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
latch.await();
Thread.sleep(100);
source.close();
selector.wakeup();
job.get();
assertFalse(selector.keys().contains(key));
}
SocketIOWithTimeout(SelectableChannel channel, long timeout)
throws IOException {
checkChannelValidity(channel);
this.channel = channel;
this.timeout = timeout;
// Set non-blocking
channel.configureBlocking(false);
}
/**
* Register the given channel with the given selector for
* the given operations of interest
*/
protected void registerChannel(Selector selector,
SelectableChannel channel,
int ops,
Object attach) throws Exception {
if (channel == null)return; // could happen
// set the new channel non-blocking
channel.configureBlocking(false);
// register it with the selector
channel.register(selector, ops, attach);
}
SocketIOWithTimeout(SelectableChannel channel, long timeout)
throws IOException {
checkChannelValidity(channel);
this.channel = channel;
this.timeout = timeout;
// Set non-blocking
channel.configureBlocking(false);
}
/**
* Switch this guy to blocking mode so we can use oldIO to read and write msgs.
*/
public void makeBlocking() throws IOException {
//logger.info("DEBUG: makeBlocking " + this);
// if (this.sKey != null) {
// this.sKey = null;
// }
SelectableChannel c = this.theSocket.getChannel();
c.configureBlocking(true);
}
SocketIOWithTimeout(SelectableChannel channel, long timeout)
throws IOException {
checkChannelValidity(channel);
this.channel = channel;
this.timeout = timeout;
// Set non-blocking
channel.configureBlocking(false);
}
protected synchronized void reConnect() throws IOException {
//0. Don't send connect request if it is connecting.
if (isNotConnected()) {
SocketAddress server = new InetSocketAddress(this.host, this.port);
SelectableChannel channel = null;
Session session2 = null;
int event;
if (this.udpMode) {
//1. Create socket channel
channel = DatagramChannel.open();
channel.configureBlocking(false);
//2. Create NioSession for each client connection
session2 = new UDPSession(this.selectorManager);
((UDPSession) session2).setBufferSize(bufferSize);
((UDPSession) session2).setTarget(server);
event = SelectionKey.OP_READ;
session2.setStatus(SessionStatus.CLIENT_CONNECTED);
} else {
//1. Create socket channel
channel = SocketChannel.open();
channel.configureBlocking(false);
try {
if (this.tc != INVALID_TRAFFIC_CLASS_VALUE) ((SocketChannel) channel).socket().setTrafficClass(this.tc);
} catch (Exception ex) {
ex.printStackTrace();
}
((SocketChannel) channel).connect(server);
//2. Create NioSession for each client connection
session2 = new TCPSession(this.selectorManager);
((TCPSession) session2).setTcpNoDelay(this.tcpNoDelay);
event = SelectionKey.OP_CONNECT;
}
session2.setChannel(channel);
session2.setKeepAlive(selectorManager.isKeepAlive());
//3. Register event
this.selectorManager.nextReactor().registerChannel(channel, event, session2);
if (!this.udpMode) {
//4. Wait to connect
if (!session2.waitToConnect(this.connectTimeout)) {
session2.asyncClose();
throw new IOException("connect timed out to " + this.host + ":" + this.port);
}
//5. Handle exception
if (session2.getStatus() == SessionStatus.NOT_CONNECTED) {
session2.asyncClose();
throw new IOException("connect failed to " + this.host + ":" + this.port);
} else if (session2.getStatus() == SessionStatus.CLOSED) { //Already closed
throw new IOException("connect failed to " + this.host + ":" + this.port); //Please see stderr.log for more details.
}
}
this.session = session2;
}
}
protected synchronized void reConnect() throws IOException {
if (isNotConnected()) {
SocketAddress server = new InetSocketAddress(this.host, this.port);
SelectableChannel channel = null;
Session temp = null;
int event;
if (this.udpMode) {
channel = DatagramChannel.open();
channel.configureBlocking(false);
temp = new UDPSession(this.selectorManager);
((UDPSession) temp).setBufferSize(bufferSize);
((UDPSession) temp).setTarget(server);
event = SelectionKey.OP_READ;
temp.setStatus(SessionStatus.CLIENT_CONNECTED);
} else {
channel = SocketChannel.open();
channel.configureBlocking(false);
try {
if (this.tc != INVALID_TRAFFIC_CLASS_VALUE) {
((SocketChannel) channel).socket().setTrafficClass(this.tc);
}
} catch (Exception ex) {
logger.error(ex.getLocalizedMessage());
}
((SocketChannel) channel).connect(server);
temp = new TCPSession(this.selectorManager);
((TCPSession) temp).setTcpNoDelay(this.tcpNoDelay);
event = SelectionKey.OP_CONNECT;
}
temp.setChannel(channel);
temp.setKeepAlive(selectorManager.isKeepAlive());
this.selectorManager.nextReactor().registerChannel(channel, event, temp);
if (!this.udpMode) {
if (!temp.waitToConnect(this.connectTimeout)) {
temp.asyncClose();
throw new TimeoutException("connect " + this.connectTimeout + "ms timed out to " + this.getAddress());
}
if (temp.getStatus() == SessionStatus.NOT_CONNECTED) {
temp.asyncClose();
throw new NotConnectedException("connect failed to " + this.getAddress());
} else if (temp.getStatus() == SessionStatus.CLOSED) {
throw new NotConnectedException("connect failed to " + this.getAddress());
}
}
this.session = temp;
}
}
@Override
protected void init(NioSession session) throws Exception {
SelectableChannel ch = (SelectableChannel) session.getChannel();
ch.configureBlocking(false);
session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
}