下面列出了java.nio.channels.Selector#selectNow() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
void cancelledKeyRemovedFromChannel() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
for (int i = 0; i < 1000; ++i) {
assertNull(source.keyFor(selector));
SelectionKey key = source.register(selector, OP_READ);
selector.selectedKeys().clear();
selector.selectNow();
key.cancel();
selector.wakeup();
selector.selectedKeys().clear();
selector.selectNow();
}
}
@Test
void selectorRemovesKeysOnCancelWhenSelecting() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
SelectionKey key = source.register(selector, OP_READ);
assertTrue(selector.keys().contains(key));
key.cancel();
assertTrue(selector.keys().contains(key));
assertSame(key, source.keyFor(selector));
selector.selectNow();
assertFalse(selector.keys().contains(key));
assertNull(source.keyFor(selector));
}
@Test
void cancelledKeyRemovedFromChannel() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
for (int i = 0; i < 1000; ++i) {
assertNull(source.keyFor(selector));
SelectionKey key = source.register(selector, OP_READ);
selector.selectedKeys().clear();
selector.selectNow();
key.cancel();
selector.wakeup();
selector.selectedKeys().clear();
selector.selectNow();
}
}
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.currSelector;
while (true) {
// from netty docs:
// If a task was submitted when wakenUp value was true, the task didn't get a chance to
// call
// {@link Selector#wakeup}. So we need to check task queue again before executing select
// operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
// -- end netty notes
if (this.eventBus.hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
break;
}
int selectedKeys = selector.select(timeoutMillis);
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || this.eventBus.hasTasks()) {
// break when we:
// 1) selected something
// 2) user submitted a task for us to run
// 3) the task queue has an already pending task
break;
}
if (Thread.interrupted()) {
break;
}
}
// TODO: handle spin lock (epoll error)
// see: <a
// href="https://github.com/netty/netty/blob/4.1/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java#L738"></a>
}
/**
* Performs a write using the bytebuffer for data to be written and a
* selector to block (if blocking is requested). If the
* <code>selector</code> parameter is null, and blocking is requested then
* it will perform a busy write that could take up a lot of CPU cycles.
* @param buf The buffer containing the data, we will write as long as <code>(buf.hasRemaining()==true)</code>
* @param socket The socket to write data to
* @param selector The selector to use for blocking, if null then a busy write will be initiated
* @param writeTimeout The timeout for this write operation in milliseconds, -1 means no timeout
* @param block <code>true</code> to perform a blocking write
* otherwise a non-blocking write will be performed
* @return int - returns the number of bytes written
* @throws EOFException if write returns -1
* @throws SocketTimeoutException if the write times out
* @throws IOException if an IO Exception occurs in the underlying socket logic
*
* 将数据返回给页面。
*
*/
public int write(ByteBuffer buf, NioChannel socket, Selector selector,
long writeTimeout, boolean block) throws IOException {
if ( SHARED && block ) {
/**
* 通过NioBlockingSelector利用NioChannel将数据写入网络中。
*/
return blockingSelector.write(buf,socket,writeTimeout);
}
SelectionKey key = null;
int written = 0;
boolean timedout = false;
int keycount = 1; //assume we can write
long time = System.currentTimeMillis(); //start the timeout timer
try {
while ( (!timedout) && buf.hasRemaining() ) {
int cnt = 0;
if ( keycount > 0 ) { //only write if we were registered for a write
cnt = socket.write(buf); //write the data
if (cnt == -1) throw new EOFException();
written += cnt;
if (cnt > 0) {
time = System.currentTimeMillis(); //reset our timeout timer
continue; //we successfully wrote, try again without a selector
}
if (cnt==0 && (!block)) break; //don't block
}
if ( selector != null ) {
//register OP_WRITE to the selector
if (key==null) key = socket.getIOChannel().register(selector, SelectionKey.OP_WRITE);
else key.interestOps(SelectionKey.OP_WRITE);
if (writeTimeout==0) {
timedout = buf.hasRemaining();
} else if (writeTimeout<0) {
keycount = selector.select();
} else {
keycount = selector.select(writeTimeout);
}
}
if (writeTimeout > 0 && (selector == null || keycount == 0) ) timedout = (System.currentTimeMillis()-time)>=writeTimeout;
}//while
if ( timedout ) throw new SocketTimeoutException();
} finally {
if (key != null) {
key.cancel();
if (selector != null) selector.selectNow();//removes the key from this selector
}
}
return written;
}
public static void main(String[] args) throws Exception {
InetAddress addr = InetAddress.getByName(null);
ServerSocketChannel sc = ServerSocketChannel.open();
sc.socket().bind(new InetSocketAddress(addr, 0));
SocketChannel.open(new InetSocketAddress(addr,
sc.socket().getLocalPort()));
SocketChannel accepted = sc.accept();
accepted.configureBlocking(false);
SocketChannel.open(new InetSocketAddress(addr,
sc.socket().getLocalPort()));
SocketChannel accepted2 = sc.accept();
accepted2.configureBlocking(false);
final Selector sel = Selector.open();
SelectionKey key2 = accepted2.register(sel, SelectionKey.OP_READ);
final SelectionKey[] key = new SelectionKey[]{
accepted.register(sel, SelectionKey.OP_READ)};
// thread that will be changing key[0].interestOps to OP_READ | OP_WRITE
new Thread() {
public void run() {
try {
for (int k = 0; k < 15; k++) {
for (int i = 0; i < 10000; i++) {
synchronized (notifyLock) {
synchronized (selectorLock) {
sel.wakeup();
key[0].interestOps(SelectionKey.OP_READ
| SelectionKey.OP_WRITE);
}
notified = false;
long beginTime = System.currentTimeMillis();
while (true) {
notifyLock.wait(5000);
if (notified) {
break;
}
long endTime = System.currentTimeMillis();
if (endTime - beginTime > 5000) {
succTermination = false;
// wake up main thread doing select()
sel.wakeup();
return;
}
}
}
}
}
succTermination = true;
// wake up main thread doing select()
sel.wakeup();
} catch (Exception e) {
System.out.println(e);
succTermination = true;
// wake up main thread doing select()
sel.wakeup();
}
}
}.start();
// main thread will be doing registering/deregistering with the sel
while (true) {
sel.select();
if (Boolean.TRUE.equals(succTermination)) {
System.out.println("Test passed");
sel.close();
sc.close();
break;
} else if (Boolean.FALSE.equals(succTermination)) {
System.out.println("Failed to pass the test");
sel.close();
sc.close();
throw new RuntimeException("Failed to pass the test");
}
synchronized (selectorLock) {
}
if (sel.selectedKeys().contains(key[0]) && key[0].isWritable()) {
synchronized (notifyLock) {
notified = true;
notifyLock.notify();
key[0].cancel();
sel.selectNow();
key2 = accepted2.register(sel, SelectionKey.OP_READ);
key[0] = accepted.register(sel, SelectionKey.OP_READ);
}
}
key2.cancel();
sel.selectedKeys().clear();
}
}
public static void main(String[] args) throws Exception {
InetAddress addr = InetAddress.getByName(null);
ServerSocketChannel sc = ServerSocketChannel.open();
sc.socket().bind(new InetSocketAddress(addr, 0));
SocketChannel.open(new InetSocketAddress(addr,
sc.socket().getLocalPort()));
SocketChannel accepted = sc.accept();
accepted.configureBlocking(false);
SocketChannel.open(new InetSocketAddress(addr,
sc.socket().getLocalPort()));
SocketChannel accepted2 = sc.accept();
accepted2.configureBlocking(false);
final Selector sel = Selector.open();
SelectionKey key2 = accepted2.register(sel, SelectionKey.OP_READ);
final SelectionKey[] key = new SelectionKey[]{
accepted.register(sel, SelectionKey.OP_READ)};
// thread that will be changing key[0].interestOps to OP_READ | OP_WRITE
new Thread() {
public void run() {
try {
for (int k = 0; k < 15; k++) {
for (int i = 0; i < 10000; i++) {
synchronized (notifyLock) {
synchronized (selectorLock) {
sel.wakeup();
key[0].interestOps(SelectionKey.OP_READ
| SelectionKey.OP_WRITE);
}
notified = false;
long beginTime = System.currentTimeMillis();
while (true) {
notifyLock.wait(5000);
if (notified) {
break;
}
long endTime = System.currentTimeMillis();
if (endTime - beginTime > 5000) {
succTermination = false;
// wake up main thread doing select()
sel.wakeup();
return;
}
}
}
}
}
succTermination = true;
// wake up main thread doing select()
sel.wakeup();
} catch (Exception e) {
System.out.println(e);
succTermination = true;
// wake up main thread doing select()
sel.wakeup();
}
}
}.start();
// main thread will be doing registering/deregistering with the sel
while (true) {
sel.select();
if (Boolean.TRUE.equals(succTermination)) {
System.out.println("Test passed");
sel.close();
sc.close();
break;
} else if (Boolean.FALSE.equals(succTermination)) {
System.out.println("Failed to pass the test");
sel.close();
sc.close();
throw new RuntimeException("Failed to pass the test");
}
synchronized (selectorLock) {
}
if (sel.selectedKeys().contains(key[0]) && key[0].isWritable()) {
synchronized (notifyLock) {
notified = true;
notifyLock.notify();
key[0].cancel();
sel.selectNow();
key2 = accepted2.register(sel, SelectionKey.OP_READ);
key[0] = accepted.register(sel, SelectionKey.OP_READ);
}
}
key2.cancel();
sel.selectedKeys().clear();
}
}
public static void main(String[] args) throws Exception {
InetAddress addr = InetAddress.getByName(null);
ServerSocketChannel sc = ServerSocketChannel.open();
sc.socket().bind(new InetSocketAddress(addr, 0));
SocketChannel.open(new InetSocketAddress(addr,
sc.socket().getLocalPort()));
SocketChannel accepted = sc.accept();
accepted.configureBlocking(false);
SocketChannel.open(new InetSocketAddress(addr,
sc.socket().getLocalPort()));
SocketChannel accepted2 = sc.accept();
accepted2.configureBlocking(false);
final Selector sel = Selector.open();
SelectionKey key2 = accepted2.register(sel, SelectionKey.OP_READ);
final SelectionKey[] key = new SelectionKey[]{
accepted.register(sel, SelectionKey.OP_READ)};
// thread that will be changing key[0].interestOps to OP_READ | OP_WRITE
new Thread() {
public void run() {
try {
for (int k = 0; k < 15; k++) {
for (int i = 0; i < 10000; i++) {
synchronized (notifyLock) {
synchronized (selectorLock) {
sel.wakeup();
key[0].interestOps(SelectionKey.OP_READ
| SelectionKey.OP_WRITE);
}
notified = false;
long beginTime = System.currentTimeMillis();
while (true) {
notifyLock.wait(5000);
if (notified) {
break;
}
long endTime = System.currentTimeMillis();
if (endTime - beginTime > 5000) {
succTermination = false;
// wake up main thread doing select()
sel.wakeup();
return;
}
}
}
}
}
succTermination = true;
// wake up main thread doing select()
sel.wakeup();
} catch (Exception e) {
System.out.println(e);
succTermination = true;
// wake up main thread doing select()
sel.wakeup();
}
}
}.start();
// main thread will be doing registering/deregistering with the sel
while (true) {
sel.select();
if (Boolean.TRUE.equals(succTermination)) {
System.out.println("Test passed");
sel.close();
sc.close();
break;
} else if (Boolean.FALSE.equals(succTermination)) {
System.out.println("Failed to pass the test");
sel.close();
sc.close();
throw new RuntimeException("Failed to pass the test");
}
synchronized (selectorLock) {
}
if (sel.selectedKeys().contains(key[0]) && key[0].isWritable()) {
synchronized (notifyLock) {
notified = true;
notifyLock.notify();
key[0].cancel();
sel.selectNow();
key2 = accepted2.register(sel, SelectionKey.OP_READ);
key[0] = accepted.register(sel, SelectionKey.OP_READ);
}
}
key2.cancel();
sel.selectedKeys().clear();
}
}
public static void main(String[] args) throws Exception {
InetAddress addr = InetAddress.getByName(null);
ServerSocketChannel sc = ServerSocketChannel.open();
sc.socket().bind(new InetSocketAddress(addr, 0));
SocketChannel.open(new InetSocketAddress(addr,
sc.socket().getLocalPort()));
SocketChannel accepted = sc.accept();
accepted.configureBlocking(false);
SocketChannel.open(new InetSocketAddress(addr,
sc.socket().getLocalPort()));
SocketChannel accepted2 = sc.accept();
accepted2.configureBlocking(false);
final Selector sel = Selector.open();
SelectionKey key2 = accepted2.register(sel, SelectionKey.OP_READ);
final SelectionKey[] key = new SelectionKey[]{
accepted.register(sel, SelectionKey.OP_READ)};
// thread that will be changing key[0].interestOps to OP_READ | OP_WRITE
new Thread() {
public void run() {
try {
for (int k = 0; k < 15; k++) {
for (int i = 0; i < 10000; i++) {
synchronized (notifyLock) {
synchronized (selectorLock) {
sel.wakeup();
key[0].interestOps(SelectionKey.OP_READ
| SelectionKey.OP_WRITE);
}
notified = false;
long beginTime = System.currentTimeMillis();
while (true) {
notifyLock.wait(5000);
if (notified) {
break;
}
long endTime = System.currentTimeMillis();
if (endTime - beginTime > 5000) {
succTermination = false;
// wake up main thread doing select()
sel.wakeup();
return;
}
}
}
}
}
succTermination = true;
// wake up main thread doing select()
sel.wakeup();
} catch (Exception e) {
System.out.println(e);
succTermination = true;
// wake up main thread doing select()
sel.wakeup();
}
}
}.start();
// main thread will be doing registering/deregistering with the sel
while (true) {
sel.select();
if (Boolean.TRUE.equals(succTermination)) {
System.out.println("Test passed");
sel.close();
sc.close();
break;
} else if (Boolean.FALSE.equals(succTermination)) {
System.out.println("Failed to pass the test");
sel.close();
sc.close();
throw new RuntimeException("Failed to pass the test");
}
synchronized (selectorLock) {
}
if (sel.selectedKeys().contains(key[0]) && key[0].isWritable()) {
synchronized (notifyLock) {
notified = true;
notifyLock.notify();
key[0].cancel();
sel.selectNow();
key2 = accepted2.register(sel, SelectionKey.OP_READ);
key[0] = accepted.register(sel, SelectionKey.OP_READ);
}
}
key2.cancel();
sel.selectedKeys().clear();
}
}
static void runTest(int initCount, int massCount, int maxSelectTime)
throws Exception {
testStartTime = System.nanoTime();
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 7359);
// Create server channel, add it to selector and run epoll_ctl.
log("Setting up server");
Selector serverSelector = Selector.open();
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
server.socket().bind(address, 5000);
server.register(serverSelector, SelectionKey.OP_ACCEPT);
serverSelector.selectNow();
log("Setting up client");
ClientThread client = new ClientThread(address);
client.start();
Thread.sleep(100);
// Set up initial set of client sockets.
log("Starting initial client connections");
client.connectClients(initCount);
Thread.sleep(500); // Wait for client connections to arrive
// Accept all initial client sockets, add to selector and run
// epoll_ctl.
log("Accepting initial connections");
List<SocketChannel> serverChannels1 =
acceptAndAddAll(serverSelector, server, initCount);
if (serverChannels1.size() != initCount) {
throw new Exception("Accepted " + serverChannels1.size() +
" instead of " + initCount);
}
serverSelector.selectNow();
// Set up mass set of client sockets.
log("Requesting mass client connections");
client.connectClients(massCount);
Thread.sleep(500); // Wait for client connections to arrive
// Accept all mass client sockets, add to selector and do NOT
// run epoll_ctl.
log("Accepting mass connections");
List<SocketChannel> serverChannels2 =
acceptAndAddAll(serverSelector, server, massCount);
if (serverChannels2.size() != massCount) {
throw new Exception("Accepted " + serverChannels2.size() +
" instead of " + massCount);
}
// Close initial set of sockets.
log("Closing initial connections");
closeAll(serverChannels1);
// Now get the timing of select() call.
log("Running the final select call");
long startTime = System.nanoTime();
serverSelector.selectNow();
long duration = durationMillis(startTime);
log("Init count = " + initCount +
", mass count = " + massCount +
", duration = " + duration + "ms");
if (duration > maxSelectTime) {
System.out.println
("\n\n\n\n\nFAILURE: The final selectNow() took " +
duration + "ms " +
"- seems like O(N^2) bug is still here\n\n");
System.exit(1);
}
}
static void runTest(int initCount, int massCount, int maxSelectTime)
throws Exception {
testStartTime = System.nanoTime();
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 7359);
// Create server channel, add it to selector and run epoll_ctl.
log("Setting up server");
Selector serverSelector = Selector.open();
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
server.socket().bind(address, 5000);
server.register(serverSelector, SelectionKey.OP_ACCEPT);
serverSelector.selectNow();
log("Setting up client");
ClientThread client = new ClientThread(address);
client.start();
Thread.sleep(100);
// Set up initial set of client sockets.
log("Starting initial client connections");
client.connectClients(initCount);
Thread.sleep(500); // Wait for client connections to arrive
// Accept all initial client sockets, add to selector and run
// epoll_ctl.
log("Accepting initial connections");
List<SocketChannel> serverChannels1 =
acceptAndAddAll(serverSelector, server, initCount);
if (serverChannels1.size() != initCount) {
throw new Exception("Accepted " + serverChannels1.size() +
" instead of " + initCount);
}
serverSelector.selectNow();
// Set up mass set of client sockets.
log("Requesting mass client connections");
client.connectClients(massCount);
Thread.sleep(500); // Wait for client connections to arrive
// Accept all mass client sockets, add to selector and do NOT
// run epoll_ctl.
log("Accepting mass connections");
List<SocketChannel> serverChannels2 =
acceptAndAddAll(serverSelector, server, massCount);
if (serverChannels2.size() != massCount) {
throw new Exception("Accepted " + serverChannels2.size() +
" instead of " + massCount);
}
// Close initial set of sockets.
log("Closing initial connections");
closeAll(serverChannels1);
// Now get the timing of select() call.
log("Running the final select call");
long startTime = System.nanoTime();
serverSelector.selectNow();
long duration = durationMillis(startTime);
log("Init count = " + initCount +
", mass count = " + massCount +
", duration = " + duration + "ms");
if (duration > maxSelectTime) {
System.out.println
("\n\n\n\n\nFAILURE: The final selectNow() took " +
duration + "ms " +
"- seems like O(N^2) bug is still here\n\n");
System.exit(1);
}
}
static void runTest(int initCount, int massCount, int maxSelectTime)
throws Exception {
testStartTime = System.nanoTime();
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 7359);
// Create server channel, add it to selector and run epoll_ctl.
log("Setting up server");
Selector serverSelector = Selector.open();
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
server.socket().bind(address, 5000);
server.register(serverSelector, SelectionKey.OP_ACCEPT);
serverSelector.selectNow();
log("Setting up client");
ClientThread client = new ClientThread(address);
client.start();
Thread.sleep(100);
// Set up initial set of client sockets.
log("Starting initial client connections");
client.connectClients(initCount);
Thread.sleep(500); // Wait for client connections to arrive
// Accept all initial client sockets, add to selector and run
// epoll_ctl.
log("Accepting initial connections");
List<SocketChannel> serverChannels1 =
acceptAndAddAll(serverSelector, server, initCount);
if (serverChannels1.size() != initCount) {
throw new Exception("Accepted " + serverChannels1.size() +
" instead of " + initCount);
}
serverSelector.selectNow();
// Set up mass set of client sockets.
log("Requesting mass client connections");
client.connectClients(massCount);
Thread.sleep(500); // Wait for client connections to arrive
// Accept all mass client sockets, add to selector and do NOT
// run epoll_ctl.
log("Accepting mass connections");
List<SocketChannel> serverChannels2 =
acceptAndAddAll(serverSelector, server, massCount);
if (serverChannels2.size() != massCount) {
throw new Exception("Accepted " + serverChannels2.size() +
" instead of " + massCount);
}
// Close initial set of sockets.
log("Closing initial connections");
closeAll(serverChannels1);
// Now get the timing of select() call.
log("Running the final select call");
long startTime = System.nanoTime();
serverSelector.selectNow();
long duration = durationMillis(startTime);
log("Init count = " + initCount +
", mass count = " + massCount +
", duration = " + duration + "ms");
if (duration > maxSelectTime) {
System.out.println
("\n\n\n\n\nFAILURE: The final selectNow() took " +
duration + "ms " +
"- seems like O(N^2) bug is still here\n\n");
System.exit(1);
}
}
public int write(ByteBuffer buf, NioChannel socket, Selector selector,
long writeTimeout, boolean block) throws IOException {
if ( SHARED && block ) {
return blockingSelector.write(buf,socket,writeTimeout);
}
SelectionKey key = null;
int written = 0;
boolean timedout = false;
int keycount = 1; //assume we can write
long time = System.currentTimeMillis(); //start the timeout timer
try {
while ( (!timedout) && buf.hasRemaining() ) {
int cnt = 0;
if ( keycount > 0 ) { //only write if we were registered for a write
cnt = socket.write(buf); //write the data
if (cnt == -1) throw new EOFException();
written += cnt;
if (cnt > 0) {
time = System.currentTimeMillis(); //reset our timeout timer
continue; //we successfully wrote, try again without a selector
}
if (cnt==0 && (!block)) break; //don't block
}
if ( selector != null ) {
//register OP_WRITE to the selector
if (key==null) key = socket.getIOChannel().register(selector, SelectionKey.OP_WRITE);
else key.interestOps(SelectionKey.OP_WRITE);
keycount = selector.select(writeTimeout);
}
if (writeTimeout > 0 && (selector == null || keycount == 0) ) timedout = (System.currentTimeMillis()-time)>=writeTimeout;
}//while
if ( timedout ) throw new SocketTimeoutException();
} finally {
if (key != null) {
key.cancel();
if (selector != null) selector.selectNow();//removes the key from this selector
}
}
return written;
}
private void run() throws IOException
{
final InetSocketAddress sendAddress = new InetSocketAddress("localhost", Common.PONG_PORT);
final ByteBuffer buffer = ByteBuffer.allocateDirect(Configuration.MTU_LENGTH_DEFAULT);
final DatagramChannel receiveChannel = DatagramChannel.open();
Common.init(receiveChannel);
receiveChannel.bind(new InetSocketAddress("localhost", Common.PING_PORT));
final DatagramChannel sendChannel = DatagramChannel.open();
Common.init(sendChannel);
final Selector selector = Selector.open();
final IntSupplier handler =
() ->
{
try
{
buffer.clear();
receiveChannel.receive(buffer);
final long receivedSequenceNumber = buffer.getLong(0);
final long receivedTimestamp = buffer.getLong(SIZE_OF_LONG);
buffer.clear();
buffer.putLong(receivedSequenceNumber);
buffer.putLong(receivedTimestamp);
buffer.flip();
sendChannel.send(buffer, sendAddress);
}
catch (final IOException ex)
{
ex.printStackTrace();
}
return 1;
};
receiveChannel.register(selector, OP_READ, handler);
final AtomicBoolean running = new AtomicBoolean(true);
SigInt.register(() -> running.set(false));
while (true)
{
while (selector.selectNow() == 0)
{
if (!running.get())
{
return;
}
}
final Set<SelectionKey> selectedKeys = selector.selectedKeys();
final Iterator<SelectionKey> iter = selectedKeys.iterator();
while (iter.hasNext())
{
final SelectionKey key = iter.next();
if (key.isReadable())
{
((IntSupplier)key.attachment()).getAsInt();
}
iter.remove();
}
}
}
public static void main(String[] args) throws Exception {
InetAddress addr = InetAddress.getByName(null);
ServerSocketChannel sc = ServerSocketChannel.open();
sc.socket().bind(new InetSocketAddress(addr, 0));
SocketChannel.open(new InetSocketAddress(addr,
sc.socket().getLocalPort()));
SocketChannel accepted = sc.accept();
accepted.configureBlocking(false);
SocketChannel.open(new InetSocketAddress(addr,
sc.socket().getLocalPort()));
SocketChannel accepted2 = sc.accept();
accepted2.configureBlocking(false);
final Selector sel = Selector.open();
SelectionKey key2 = accepted2.register(sel, SelectionKey.OP_READ);
final SelectionKey[] key = new SelectionKey[]{
accepted.register(sel, SelectionKey.OP_READ)};
// thread that will be changing key[0].interestOps to OP_READ | OP_WRITE
new Thread() {
public void run() {
try {
for (int k = 0; k < 15; k++) {
for (int i = 0; i < 10000; i++) {
synchronized (notifyLock) {
synchronized (selectorLock) {
sel.wakeup();
key[0].interestOps(SelectionKey.OP_READ
| SelectionKey.OP_WRITE);
}
notified = false;
long beginTime = System.currentTimeMillis();
while (true) {
notifyLock.wait(5000);
if (notified) {
break;
}
long endTime = System.currentTimeMillis();
if (endTime - beginTime > 5000) {
succTermination = false;
// wake up main thread doing select()
sel.wakeup();
return;
}
}
}
}
}
succTermination = true;
// wake up main thread doing select()
sel.wakeup();
} catch (Exception e) {
System.out.println(e);
succTermination = true;
// wake up main thread doing select()
sel.wakeup();
}
}
}.start();
// main thread will be doing registering/deregistering with the sel
while (true) {
sel.select();
if (Boolean.TRUE.equals(succTermination)) {
System.out.println("Test passed");
sel.close();
sc.close();
break;
} else if (Boolean.FALSE.equals(succTermination)) {
System.out.println("Failed to pass the test");
sel.close();
sc.close();
throw new RuntimeException("Failed to pass the test");
}
synchronized (selectorLock) {
}
if (sel.selectedKeys().contains(key[0]) && key[0].isWritable()) {
synchronized (notifyLock) {
notified = true;
notifyLock.notify();
key[0].cancel();
sel.selectNow();
key2 = accepted2.register(sel, SelectionKey.OP_READ);
key[0] = accepted.register(sel, SelectionKey.OP_READ);
}
}
key2.cancel();
sel.selectedKeys().clear();
}
}
static void runTest(int initCount, int massCount, int maxSelectTime)
throws Exception {
testStartTime = System.nanoTime();
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 7359);
// Create server channel, add it to selector and run epoll_ctl.
log("Setting up server");
Selector serverSelector = Selector.open();
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
server.socket().bind(address, 5000);
server.register(serverSelector, SelectionKey.OP_ACCEPT);
serverSelector.selectNow();
log("Setting up client");
ClientThread client = new ClientThread(address);
client.start();
Thread.sleep(100);
// Set up initial set of client sockets.
log("Starting initial client connections");
client.connectClients(initCount);
Thread.sleep(500); // Wait for client connections to arrive
// Accept all initial client sockets, add to selector and run
// epoll_ctl.
log("Accepting initial connections");
List<SocketChannel> serverChannels1 =
acceptAndAddAll(serverSelector, server, initCount);
if (serverChannels1.size() != initCount) {
throw new Exception("Accepted " + serverChannels1.size() +
" instead of " + initCount);
}
serverSelector.selectNow();
// Set up mass set of client sockets.
log("Requesting mass client connections");
client.connectClients(massCount);
Thread.sleep(500); // Wait for client connections to arrive
// Accept all mass client sockets, add to selector and do NOT
// run epoll_ctl.
log("Accepting mass connections");
List<SocketChannel> serverChannels2 =
acceptAndAddAll(serverSelector, server, massCount);
if (serverChannels2.size() != massCount) {
throw new Exception("Accepted " + serverChannels2.size() +
" instead of " + massCount);
}
// Close initial set of sockets.
log("Closing initial connections");
closeAll(serverChannels1);
// Now get the timing of select() call.
log("Running the final select call");
long startTime = System.nanoTime();
serverSelector.selectNow();
long duration = durationMillis(startTime);
log("Init count = " + initCount +
", mass count = " + massCount +
", duration = " + duration + "ms");
if (duration > maxSelectTime) {
System.out.println
("\n\n\n\n\nFAILURE: The final selectNow() took " +
duration + "ms " +
"- seems like O(N^2) bug is still here\n\n");
System.exit(1);
}
}
public static void main(String[] args) throws Exception {
InetAddress addr = InetAddress.getByName(null);
ServerSocketChannel sc = ServerSocketChannel.open();
sc.socket().bind(new InetSocketAddress(addr, 0));
SocketChannel.open(new InetSocketAddress(addr,
sc.socket().getLocalPort()));
SocketChannel accepted = sc.accept();
accepted.configureBlocking(false);
SocketChannel.open(new InetSocketAddress(addr,
sc.socket().getLocalPort()));
SocketChannel accepted2 = sc.accept();
accepted2.configureBlocking(false);
final Selector sel = Selector.open();
SelectionKey key2 = accepted2.register(sel, SelectionKey.OP_READ);
final SelectionKey[] key = new SelectionKey[]{
accepted.register(sel, SelectionKey.OP_READ)};
// thread that will be changing key[0].interestOps to OP_READ | OP_WRITE
new Thread() {
public void run() {
try {
for (int k = 0; k < 15; k++) {
for (int i = 0; i < 10000; i++) {
synchronized (notifyLock) {
synchronized (selectorLock) {
sel.wakeup();
key[0].interestOps(SelectionKey.OP_READ
| SelectionKey.OP_WRITE);
}
notified = false;
long beginTime = System.currentTimeMillis();
while (true) {
notifyLock.wait(5000);
if (notified) {
break;
}
long endTime = System.currentTimeMillis();
if (endTime - beginTime > 5000) {
succTermination = false;
// wake up main thread doing select()
sel.wakeup();
return;
}
}
}
}
}
succTermination = true;
// wake up main thread doing select()
sel.wakeup();
} catch (Exception e) {
System.out.println(e);
succTermination = true;
// wake up main thread doing select()
sel.wakeup();
}
}
}.start();
// main thread will be doing registering/deregistering with the sel
while (true) {
sel.select();
if (Boolean.TRUE.equals(succTermination)) {
System.out.println("Test passed");
sel.close();
sc.close();
break;
} else if (Boolean.FALSE.equals(succTermination)) {
System.out.println("Failed to pass the test");
sel.close();
sc.close();
throw new RuntimeException("Failed to pass the test");
}
synchronized (selectorLock) {
}
if (sel.selectedKeys().contains(key[0]) && key[0].isWritable()) {
synchronized (notifyLock) {
notified = true;
notifyLock.notify();
key[0].cancel();
sel.selectNow();
key2 = accepted2.register(sel, SelectionKey.OP_READ);
key[0] = accepted.register(sel, SelectionKey.OP_READ);
}
}
key2.cancel();
sel.selectedKeys().clear();
}
}
static void runTest(int initCount, int massCount, int maxSelectTime)
throws Exception {
testStartTime = System.nanoTime();
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 7359);
// Create server channel, add it to selector and run epoll_ctl.
log("Setting up server");
Selector serverSelector = Selector.open();
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
server.socket().bind(address, 5000);
server.register(serverSelector, SelectionKey.OP_ACCEPT);
serverSelector.selectNow();
log("Setting up client");
ClientThread client = new ClientThread(address);
client.start();
Thread.sleep(100);
// Set up initial set of client sockets.
log("Starting initial client connections");
client.connectClients(initCount);
Thread.sleep(500); // Wait for client connections to arrive
// Accept all initial client sockets, add to selector and run
// epoll_ctl.
log("Accepting initial connections");
List<SocketChannel> serverChannels1 =
acceptAndAddAll(serverSelector, server, initCount);
if (serverChannels1.size() != initCount) {
throw new Exception("Accepted " + serverChannels1.size() +
" instead of " + initCount);
}
serverSelector.selectNow();
// Set up mass set of client sockets.
log("Requesting mass client connections");
client.connectClients(massCount);
Thread.sleep(500); // Wait for client connections to arrive
// Accept all mass client sockets, add to selector and do NOT
// run epoll_ctl.
log("Accepting mass connections");
List<SocketChannel> serverChannels2 =
acceptAndAddAll(serverSelector, server, massCount);
if (serverChannels2.size() != massCount) {
throw new Exception("Accepted " + serverChannels2.size() +
" instead of " + massCount);
}
// Close initial set of sockets.
log("Closing initial connections");
closeAll(serverChannels1);
// Now get the timing of select() call.
log("Running the final select call");
long startTime = System.nanoTime();
serverSelector.selectNow();
long duration = durationMillis(startTime);
log("Init count = " + initCount +
", mass count = " + massCount +
", duration = " + duration + "ms");
if (duration > maxSelectTime) {
System.out.println
("\n\n\n\n\nFAILURE: The final selectNow() took " +
duration + "ms " +
"- seems like O(N^2) bug is still here\n\n");
System.exit(1);
}
}
private void select() throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentNanoTime = System.nanoTime();
long selectDeadLineNanos = currentNanoTime + TimeUnit.SECONDS.toNanos(1);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentNanoTime + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
int selectedKeys = selector.select(timeoutMillis);
selectCnt++;
if (selectedKeys != 0) {
break;
}
if (selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
LOGGER.warn("Selector.select() returned prematurely {} times in a row; rebuilding selector.", selectCnt);
rebuildSelector();
selector = this.selector;
// 重新select,填充 selectedKeys
selector.selectNow();
selectCnt = 1;
break;
}
currentNanoTime = System.nanoTime();
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
}
}
} catch (CancelledKeyException e) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
}
}
}
public int write(ByteBuffer buf, NioChannel socket, Selector selector,
long writeTimeout, boolean block) throws IOException {
if ( SHARED && block ) {
return blockingSelector.write(buf,socket,writeTimeout);
}
SelectionKey key = null;
int written = 0;
boolean timedout = false;
int keycount = 1; //assume we can write
long time = System.currentTimeMillis(); //start the timeout timer
try {
while ( (!timedout) && buf.hasRemaining() ) {
int cnt = 0;
if ( keycount > 0 ) { //only write if we were registered for a write
cnt = socket.write(buf); //write the data
if (cnt == -1) throw new EOFException();
written += cnt;
if (cnt > 0) {
time = System.currentTimeMillis(); //reset our timeout timer
continue; //we successfully wrote, try again without a selector
}
if (cnt==0 && (!block)) break; //don't block
}
if ( selector != null ) {
//register OP_WRITE to the selector
if (key==null) key = socket.getIOChannel().register(selector, SelectionKey.OP_WRITE);
else key.interestOps(SelectionKey.OP_WRITE);
keycount = selector.select(writeTimeout);
}
if (writeTimeout > 0 && (selector == null || keycount == 0) ) timedout = (System.currentTimeMillis()-time)>=writeTimeout;
}//while
if ( timedout ) throw new SocketTimeoutException();
} finally {
if (key != null) {
key.cancel();
if (selector != null) selector.selectNow();//removes the key from this selector
}
}
return written;
}