java.nio.channels.Selector#select()源码实例Demo

下面列出了java.nio.channels.Selector#select() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: jdk8u_jdk   文件: WakeupAfterClose.java
public static void main(String[] args) throws Exception {
    final Selector sel = Selector.open();

    Runnable r = new Runnable() {
        public void run() {
            try {
                sel.select();
            } catch (IOException x) {
                x.printStackTrace();
            }
        }
    };

    // start thread to block in Selector
    Thread t = new Thread(r);
    t.start();

    // give thread time to start
    Thread.sleep(1000);

    // interrupt, close, and wakeup is the magic sequence to provoke the NPE
    t.interrupt();
    sel.close();
    sel.wakeup();
}
 
源代码2 项目: dragonwell8_jdk   文件: WakeupAfterClose.java
public static void main(String[] args) throws Exception {
    final Selector sel = Selector.open();

    Runnable r = new Runnable() {
        public void run() {
            try {
                sel.select();
            } catch (IOException x) {
                x.printStackTrace();
            }
        }
    };

    // start thread to block in Selector
    Thread t = new Thread(r);
    t.start();

    // give thread time to start
    Thread.sleep(1000);

    // interrupt, close, and wakeup is the magic sequence to provoke the NPE
    t.interrupt();
    sel.close();
    sel.wakeup();
}
 
源代码3 项目: openjdk-jdk8u   文件: WakeupAfterClose.java
public static void main(String[] args) throws Exception {
    final Selector sel = Selector.open();

    Runnable r = new Runnable() {
        public void run() {
            try {
                sel.select();
            } catch (IOException x) {
                x.printStackTrace();
            }
        }
    };

    // start thread to block in Selector
    Thread t = new Thread(r);
    t.start();

    // give thread time to start
    Thread.sleep(1000);

    // interrupt, close, and wakeup is the magic sequence to provoke the NPE
    t.interrupt();
    sel.close();
    sel.wakeup();
}
 
源代码4 项目: openjdk-jdk9   文件: WakeupSpeed.java
public static void main(String argv[]) throws Exception {
    int waitTime = 4000;
    Selector selector = Selector.open();
    try {
        selector.wakeup();

        long t1 = System.currentTimeMillis();
        selector.select(waitTime);
        long t2 = System.currentTimeMillis();
        long totalTime = t2 - t1;

        if (totalTime > waitTime)
            throw new RuntimeException("Test failed");
    } finally {
        selector.close();
    }
}
 
源代码5 项目: jdk8u60   文件: WakeupSpeed.java
public static void main(String argv[]) throws Exception {
    int waitTime = 4000;
    Selector selector = Selector.open();
    try {
        selector.wakeup();

        long t1 = System.currentTimeMillis();
        selector.select(waitTime);
        long t2 = System.currentTimeMillis();
        long totalTime = t2 - t1;

        if (totalTime > waitTime)
            throw new RuntimeException("Test failed");
    } finally {
        selector.close();
    }
}
 
源代码6 项目: Mycat-JCache   文件: TCPNIOAcceptor.java
@Override
public void run() {
    final Selector selector = this.selector;
    for (; ; ) {
        try {
            selector.select(500L);
            Set<SelectionKey> keys = selector.selectedKeys();
            try {
                keys.forEach(this::handleKey);
            } finally {
                keys.clear();
            }
        } catch (Throwable e) {
            logger.warn(getName(), e);
        }
    }
}
 
源代码7 项目: feeyo-redisproxy   文件: NIOAcceptor.java
@Override
public void run() {
	final Selector selector = this.selector;
	for (;;) {
		++acceptCount;
		try {
			selector.select( 1000L ); 
			Set<SelectionKey> keys = selector.selectedKeys();
			try {
				for (SelectionKey key : keys) {
					if (key.isValid() && key.isAcceptable()) {
						accept();							
					} else {
						key.cancel();
					}
				}
			} finally {
				keys.clear();
			}
		} catch (Throwable e) {
			LOGGER.warn(getName(), e);
		}
	}
}
 
源代码8 项目: Mycat-NIO   文件: NIOAcceptor.java
@Override
public void run() {
	final Selector selector = this.selector;
	for (;;) {
		++acceptCount;
		try {
			selector.select(1000L);
			Set<SelectionKey> keys = selector.selectedKeys();
			try {
				for (SelectionKey key : keys) {
					if (key.isValid() && key.isAcceptable()) {
						accept();
					} else {
						key.cancel();
					}
				}
			} finally {
				keys.clear();
			}
		} catch (Throwable e) {
			LOGGER.warn(getName(), e);
		}
	}
}
 
源代码9 项目: JavaBase   文件: EchoNIOServer.java
public void start() throws IOException {
  Selector selector = Selector.open();
  //通过OPEN方法来打开一个未绑定的ServerSocketChannel 实例
  ServerSocketChannel server = ServerSocketChannel.open();
  //将该ServerSocketChannel绑定到指定ip
  server.bind(new InetSocketAddress(NIOServer.PORT));
  //设置是NIO 非阻塞模式
  server.configureBlocking(false);

  //将sever注册到指定Selector对象上
  server.register(selector, SelectionKey.OP_ACCEPT);

  while (!stop) {
    selector.select(2000);
    Set<SelectionKey> selectedKeys = selector.selectedKeys();
    Iterator<SelectionKey> it = selectedKeys.iterator();
    SelectionKey key;
    while (it.hasNext()) {
      key = it.next();
      it.remove();
      try {
        handleInput(selector, key);
      } catch (Exception e) {
        if (key != null) {
          key.cancel();
          if (key.channel() != null) {
            key.channel().close();
          }
        }
      }
    }
  }
}
 
源代码10 项目: heisenberg   文件: NIOReactor.java
@Override
public void run() {
    final Selector selector = this.selector;
    for (;;) {
        ++reactCount;
        try {
            selector.select(1000L);
            register(selector);
            Set<SelectionKey> keys = selector.selectedKeys();
            try {
                for (SelectionKey key : keys) {
                    Object att = key.attachment();
                    if (att != null && key.isValid()) {
                        int readyOps = key.readyOps();
                        if ((readyOps & SelectionKey.OP_READ) != 0) {
                            read((NIOConnection) att);
                        } else if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                            write((NIOConnection) att);
                        } else {
                            key.cancel();
                        }
                    } else {
                        key.cancel();
                    }
                }
            } finally {
                keys.clear();
            }
        } catch (Throwable e) {
            LOGGER.warn(name, e);
        }
    }
}
 
源代码11 项目: aion   文件: MainIOLoop.java
private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.currSelector;

    while (true) {

        // from netty docs:
        // If a task was submitted when wakenUp value was true, the task didn't get a chance to
        // call
        // {@link Selector#wakeup}. So we need to check task queue again before executing select
        // operation.
        // If we don't, the task might be pended until select operation was timed out.
        // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
        // -- end netty notes
        if (this.eventBus.hasTasks() && wakenUp.compareAndSet(false, true)) {
            selector.selectNow();
            break;
        }

        int selectedKeys = selector.select(timeoutMillis);

        if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || this.eventBus.hasTasks()) {
            // break when we:
            // 1) selected something
            // 2) user submitted a task for us to run
            // 3) the task queue has an already pending task
            break;
        }

        if (Thread.interrupted()) {
            break;
        }
    }

    // TODO: handle spin lock (epoll error)
    // see: <a
    // href="https://github.com/netty/netty/blob/4.1/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java#L738"></a>
}
 
源代码12 项目: Tomcat8-Source-Read   文件: NioSelectorPool.java
/**
 * Performs a write using the bytebuffer for data to be written and a
 * selector to block (if blocking is requested). If the
 * <code>selector</code> parameter is null, and blocking is requested then
 * it will perform a busy write that could take up a lot of CPU cycles.
 * @param buf           The buffer containing the data, we will write as long as <code>(buf.hasRemaining()==true)</code>
 * @param socket        The socket to write data to
 * @param selector      The selector to use for blocking, if null then a busy write will be initiated
 * @param writeTimeout  The timeout for this write operation in milliseconds, -1 means no timeout
 * @param block         <code>true</code> to perform a blocking write
 *                      otherwise a non-blocking write will be performed
 * @return int - returns the number of bytes written
 * @throws EOFException if write returns -1
 * @throws SocketTimeoutException if the write times out
 * @throws IOException if an IO Exception occurs in the underlying socket logic
 *
 * 将数据返回给页面。
 *
 */
public int write(ByteBuffer buf, NioChannel socket, Selector selector,
                 long writeTimeout, boolean block) throws IOException {
    if ( SHARED && block ) {
        /**
         * 通过NioBlockingSelector利用NioChannel将数据写入网络中。
         */
        return blockingSelector.write(buf,socket,writeTimeout);
    }
    SelectionKey key = null;
    int written = 0;
    boolean timedout = false;
    int keycount = 1; //assume we can write
    long time = System.currentTimeMillis(); //start the timeout timer
    try {
        while ( (!timedout) && buf.hasRemaining() ) {
            int cnt = 0;
            if ( keycount > 0 ) { //only write if we were registered for a write
                cnt = socket.write(buf); //write the data
                if (cnt == -1) throw new EOFException();

                written += cnt;
                if (cnt > 0) {
                    time = System.currentTimeMillis(); //reset our timeout timer
                    continue; //we successfully wrote, try again without a selector
                }
                if (cnt==0 && (!block)) break; //don't block
            }
            if ( selector != null ) {
                //register OP_WRITE to the selector
                if (key==null) key = socket.getIOChannel().register(selector, SelectionKey.OP_WRITE);
                else key.interestOps(SelectionKey.OP_WRITE);
                if (writeTimeout==0) {
                    timedout = buf.hasRemaining();
                } else if (writeTimeout<0) {
                    keycount = selector.select();
                } else {
                    keycount = selector.select(writeTimeout);
                }
            }
            if (writeTimeout > 0 && (selector == null || keycount == 0) ) timedout = (System.currentTimeMillis()-time)>=writeTimeout;
        }//while
        if ( timedout ) throw new SocketTimeoutException();
    } finally {
        if (key != null) {
            key.cancel();
            if (selector != null) selector.selectNow();//removes the key from this selector
        }
    }
    return written;
}
 
public static void main(String[] args) throws Exception {
    Selector selector = Selector.open();
    Member mbr = new MemberImpl("localhost", 9999, 0);
    byte seq = 0;
    byte[] buf = new byte[50000];
    Arrays.fill(buf,seq);
    int len = buf.length;
    BigDecimal total = new BigDecimal((double)0);
    BigDecimal bytes = new BigDecimal((double)len);
    NioSender sender = new NioSender();
    sender.setDestination(mbr);
    sender.setDirectBuffer(true);
    sender.setSelector(selector);
    sender.connect();
    sender.setMessage(buf);
    System.out.println("Writing to 9999");
    long start = 0;
    double mb = 0;
    boolean first = true;
    int count = 0;

    DecimalFormat df = new DecimalFormat("##.00");
    while (count<100000) {
        if (first) {
            first = false;
            start = System.currentTimeMillis();
        }
        sender.setMessage(buf);
        int selectedKeys = 0;
        try {
            selectedKeys = selector.select(0);
        } catch (Exception e) {
            e.printStackTrace();
            continue;
        }

        if (selectedKeys == 0) {
            continue;
        }

        Iterator<SelectionKey> it = selector.selectedKeys().iterator();
        while (it.hasNext()) {
            SelectionKey sk = it.next();
            it.remove();
            try {
                int readyOps = sk.readyOps();
                sk.interestOps(sk.interestOps() & ~readyOps);
                if (sender.process(sk, false)) {
                    total = total.add(bytes);
                    sender.reset();
                    seq++;
                    Arrays.fill(buf,seq);
                    sender.setMessage(buf);
                    mb += ( (double) len) / 1024 / 1024;
                    if ( ( (++count) % 10000) == 0) {
                        long time = System.currentTimeMillis();
                        double seconds = ( (double) (time - start)) / 1000;
                        System.out.println("Throughput " + df.format(mb / seconds) + " MB/seconds, total "+mb+" MB, total "+total+" bytes.");
                    }
                }

            } catch (Throwable t) {
                t.printStackTrace();
                return;
            }
        }
    }
    System.out.println("Complete, sleeping 15 seconds");
    Thread.sleep(15000);
}
 
源代码14 项目: netty4.0.27Learn   文件: NioEventLoop.java
private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
        for (;;) {
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }

            int selectedKeys = selector.select(timeoutMillis);
            selectCnt ++;

            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                // - Selected something,
                // - waken up by user, or
                // - the task queue has a pending task.
                // - a scheduled task is ready for processing
                break;
            }
            if (Thread.interrupted()) {
                // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                // As this is most likely a bug in the handler of the user or it's client library we will
                // also log it.
                //
                // See https://github.com/netty/netty/issues/2426
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely because " +
                            "Thread.currentThread().interrupt() was called. Use " +
                            "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                }
                selectCnt = 1;
                break;
            }

            long time = System.nanoTime();
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                // timeoutMillis elapsed without anything selected.
                selectCnt = 1;
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                // The selector returned prematurely many times in a row.
                // Rebuild the selector to work around the problem.
                logger.warn(
                        "Selector.select() returned prematurely {} times in a row; rebuilding selector.",
                        selectCnt);

                rebuildSelector();
                selector = this.selector;

                // Select again to populate selectedKeys.
                selector.selectNow();
                selectCnt = 1;
                break;
            }

            currentTimeNanos = time;
        }

        if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
            if (logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
            }
        }
    } catch (CancelledKeyException e) {
        if (logger.isDebugEnabled()) {
            logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
        }
        // Harmless exception - log anyway
    }
}
 
源代码15 项目: ExpectIt   文件: SingleInputExpect.java
public <R extends Result> R expect(long timeoutMs, Matcher<R> matcher) throws IOException {
    if (copierFuture == null) {
        throw new IllegalStateException("Not started");
    }

    final long timeToStop = System.currentTimeMillis() + timeoutMs;
    final boolean isInfiniteTimeout = timeoutMs == ExpectImpl.INFINITE_TIMEOUT;
    long timeElapsed = timeoutMs;
    ByteBuffer byteBuffer = ByteBuffer.allocate(BUFFER_SIZE);
    Selector selector = Selector.open();

    try {
        source.register(selector, SelectionKey.OP_READ);
        R result = matcher.matches(buffer.toString(), copierFuture.isDone());
        if (LOG.isLoggable(Level.FINE)) {
            LOG.fine(
                    String.format(
                            "Initial matcher %s result: %s",
                            toDebugString(matcher),
                            toDebugString(result)));
        }
        while (!(result.isSuccessful() || result.canStopMatching())
                && (isInfiniteTimeout || timeElapsed > 0)) {
            int keys = isInfiniteTimeout ? selector.select() : selector.select(timeElapsed);
            // if thread was interrupted the selector returns immediately
            // and keep the thread status, so we need to check it
            if (Thread.currentThread().isInterrupted()) {
                LOG.fine("Thread was interrupted");
                throw new ClosedByInterruptException();
            }

            if (!isInfiniteTimeout) {
                timeElapsed = timeToStop - System.currentTimeMillis();
            }

            if (keys == 0) {
                LOG.fine("Selector returns 0 key");
                continue;
            }

            selector.selectedKeys().clear();
            int len = source.read(byteBuffer);

            if (len > 0) {
                String string = new String(byteBuffer.array(), 0, len, charset);
                processString(string);
                byteBuffer.clear();
            }

            final boolean isEof = len == -1;
            result = matcher.matches(buffer.toString(), isEof);
            if (LOG.isLoggable(Level.FINE)) {
                LOG.fine(
                        String.format(
                                "Matcher %s result: %s. Operation time: %d ms",
                                toDebugString(matcher),
                                toDebugString(result),
                                timeoutMs - timeElapsed));
            }
            if (isEof) {
                break;
            }
        }
        if (result.isSuccessful()) {
            buffer.delete(0, result.end());
        } else if (copierFuture.isDone() && buffer.length() == 0) {
            throw new EOFException("Input closed");
        }
        return result;
    } finally {
        selector.close();
    }
}
 
源代码16 项目: tomcatsrc   文件: SocketNioSend.java
public static void main(String[] args) throws Exception {
    Selector selector;
    synchronized (Selector.class) {
        // Selector.open() isn't thread safe
        // http://bugs.sun.com/view_bug.do?bug_id=6427854
        // Affects 1.6.0_29, fixed in 1.7.0_01
        selector = Selector.open();
    }
    Member mbr = new MemberImpl("localhost", 9999, 0);
    ChannelData data = new ChannelData();
    data.setOptions(Channel.SEND_OPTIONS_BYTE_MESSAGE);
    data.setAddress(mbr);
    byte[] buf = new byte[8192 * 4];
    data.setMessage(new XByteBuffer(buf,false));
    buf = XByteBuffer.createDataPackage(data);
    int len = buf.length;
    BigDecimal total = new BigDecimal((double)0);
    BigDecimal bytes = new BigDecimal((double)len);
    NioSender sender = new NioSender();
    sender.setDestination(mbr);
    sender.setDirectBuffer(true);
    sender.setSelector(selector);
    sender.setTxBufSize(1024*1024);
    sender.connect();
    sender.setMessage(buf);
    System.out.println("Writing to 9999");
    long start = 0;
    double mb = 0;
    boolean first = true;
    int count = 0;
    DecimalFormat df = new DecimalFormat("##.00");
    while (count<100000) {
        if (first) {
            first = false;
            start = System.currentTimeMillis();
        }
        sender.setMessage(buf);
        int selectedKeys = 0;
        try {
            selectedKeys = selector.select(0);
        } catch (Exception e) {
            e.printStackTrace();
            continue;
        }

        if (selectedKeys == 0) {
            continue;
        }

        Iterator<SelectionKey> it = selector.selectedKeys().iterator();
        while (it.hasNext()) {
            SelectionKey sk = it.next();
            it.remove();
            try {
                int readyOps = sk.readyOps();
                sk.interestOps(sk.interestOps() & ~readyOps);
                if (sender.process(sk, false)) {
                    total = total.add(bytes);
                    sender.reset();
                    sender.setMessage(buf);
                    mb += ( (double) len) / 1024 / 1024;
                    if ( ( (++count) % 10000) == 0) {
                        long time = System.currentTimeMillis();
                        double seconds = ( (double) (time - start)) / 1000;
                        System.out.println("Throughput " + df.format(mb / seconds) + " MB/seconds, total "+mb+" MB, total "+total+" bytes.");
                    }
                }

            } catch (Throwable t) {
                t.printStackTrace();
                return;
            }
        }
        selector.selectedKeys().clear();
    }
    System.out.println("Complete, sleeping 15 seconds");
    Thread.sleep(15000);
}
 
源代码17 项目: netty-4.1.22   文件: NioEventLoop.java
private void select(boolean oldWakenUp) throws IOException {
//        获取选择器
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
//            出来延迟队列中的任务
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
            for (;;) {
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
//                        开始监听
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }

                // If a task was submitted when wakenUp value was true, the task didn't get a chance to call 如果在wakenUp值为true时提交了一个任务,那么这个任务就没有机会调用
                // Selector#wakeup. So we need to check task queue again before executing select operation. 选择器#唤醒。因此,在执行select操作之前,我们需要再次检查任务队列。
                // If we don't, the task might be pended until select operation was timed out.如果我们不这样做,任务可能会被挂起,直到选择操作超时。
                // It might be pended until idle timeout if IdleStateHandler existed in pipeline.如果在管道中存在IdleStateHandler,则可能将其挂起,直到空闲超时
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

//                获取监听到的事件
                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;

                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    // - Selected something,
                    // - waken up by user, or
                    // - the task queue has a pending task.
                    // - a scheduled task is ready for processing-选定的东西,
// -用户唤醒,或
// -任务队列有一个挂起的任务。
// -已安排的任务已准备好处理
                    break;
                }
                if (Thread.interrupted()) {
                    // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                    // As this is most likely a bug in the handler of the user or it's client library we will
                    // also log it.//线程被中断,所以重置选定的键并中断,这样我们就不会进入繁忙的循环。
//因为这很可能是一个错误的处理程序的用户或它的客户端库,我们会
//把它记录下来。
                    //
                    // See https://github.com/netty/netty/issues/2426
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely because " +
                                "Thread.currentThread().interrupt() was called. Use " +
                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                    }
                    selectCnt = 1;
                    break;
                }

                long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.timeoutMillis没有选择任何内容。
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    // The selector returned prematurely many times in a row.
                    // Rebuild the selector to work around the problem.//选择器在一行中多次提前返回。
//重新构建选择器来解决问题。
                    logger.warn(
                            "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                            selectCnt, selector);

//                    重新构造选择器
                    rebuildSelector();
                    selector = this.selector;

                    // Select again to populate selectedKeys.再次选择以填充selectedKeys。
                    selector.selectNow();
//                    重置selector轮询次数
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = time;
            }

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
            // Harmless exception - log anyway
        }
    }
 
源代码18 项目: openjdk-jdk9   文件: SelectTimeout.java
private static boolean test(final long timeout)
    throws InterruptedException, IOException {
    AtomicReference<Exception> theException =
        new AtomicReference<>();
    AtomicBoolean isTimedOut = new AtomicBoolean();

    Selector selector = Selector.open();

    Thread t = new Thread(() -> {
        try {
            selector.select(timeout);
            isTimedOut.set(true);
        } catch (IOException ioe) {
            theException.set(ioe);
        }
    });
    t.start();

    t.join(SLEEP_MILLIS);

    boolean result;
    if (theException.get() == null) {
        if (timeout > SLEEP_MILLIS && isTimedOut.get()) {
            System.err.printf("Test timed out early with timeout %d%n",
                timeout);
            result = false;
        } else {
            System.out.printf("Test succeeded with timeout %d%n", timeout);
            result = true;
        }
    } else {
        System.err.printf("Test failed with timeout %d%n", timeout);
        theException.get().printStackTrace();
        result = false;
    }

    t.interrupt();
    selector.close();

    return result;
}
 
源代码19 项目: Tomcat7.0.67   文件: SocketNioValidateSend.java
public static void main(String[] args) throws Exception {
    Selector selector;
    synchronized (Selector.class) {
        // Selector.open() isn't thread safe
        // http://bugs.sun.com/view_bug.do?bug_id=6427854
        // Affects 1.6.0_29, fixed in 1.7.0_01
        selector = Selector.open();
    }
    Member mbr = new MemberImpl("localhost", 9999, 0);
    byte seq = 0;
    byte[] buf = new byte[50000];
    Arrays.fill(buf,seq);
    int len = buf.length;
    BigDecimal total = new BigDecimal((double)0);
    BigDecimal bytes = new BigDecimal((double)len);
    NioSender sender = new NioSender();
    sender.setDestination(mbr);
    sender.setDirectBuffer(true);
    sender.setSelector(selector);
    sender.connect();
    sender.setMessage(buf);
    System.out.println("Writing to 9999");
    long start = 0;
    double mb = 0;
    boolean first = true;
    int count = 0;

    DecimalFormat df = new DecimalFormat("##.00");
    while (count<100000) {
        if (first) {
            first = false;
            start = System.currentTimeMillis();
        }
        sender.setMessage(buf);
        int selectedKeys = 0;
        try {
            selectedKeys = selector.select(0);
        } catch (Exception e) {
            e.printStackTrace();
            continue;
        }

        if (selectedKeys == 0) {
            continue;
        }

        Iterator<SelectionKey> it = selector.selectedKeys().iterator();
        while (it.hasNext()) {
            SelectionKey sk = it.next();
            it.remove();
            try {
                int readyOps = sk.readyOps();
                sk.interestOps(sk.interestOps() & ~readyOps);
                if (sender.process(sk, false)) {
                    total = total.add(bytes);
                    sender.reset();
                    seq++;
                    Arrays.fill(buf,seq);
                    sender.setMessage(buf);
                    mb += ( (double) len) / 1024 / 1024;
                    if ( ( (++count) % 10000) == 0) {
                        long time = System.currentTimeMillis();
                        double seconds = ( (double) (time - start)) / 1000;
                        System.out.println("Throughput " + df.format(mb / seconds) + " MB/seconds, total "+mb+" MB, total "+total+" bytes.");
                    }
                }

            } catch (Throwable t) {
                t.printStackTrace();
                return;
            }
        }
    }
    System.out.println("Complete, sleeping 15 seconds");
    Thread.sleep(15000);
}
 
源代码20 项目: database   文件: HAReceiveService.java
/**
 * Blocking wait for a client connection.
 * 
 * @throws IOException
 *             if something goes wrong.
 */
protected void awaitAccept() throws IOException {

    // blocking wait for a client connection.
    final Selector serverSelector = Selector.open();
    try {

        final SelectionKey serverKey = server.register(serverSelector,
                SelectionKey.OP_ACCEPT);

        try {

            serverSelector.select(); // blocks

            final Set<SelectionKey> keys = serverSelector
                    .selectedKeys();

            final Iterator<SelectionKey> iter = keys.iterator();
            
            while (iter.hasNext()) {

                final SelectionKey key = (SelectionKey) iter.next();

                iter.remove();

                if (key != serverKey)
                    throw new AssertionError();

                break;
            }

        } finally {
            serverKey.cancel();
        }

    } finally {
        serverSelector.close();
    }

}