类java.nio.channels.Selector源码实例Demo

下面列出了怎么用java.nio.channels.Selector的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: cava   文件: SelectorTest.java
@Test
void selectorRemovesKeysOnChannelCloseWhenSelecting() throws Exception {
  Pipe pipe = Pipe.open();

  Selector selector = Selector.open();
  SelectableChannel source = pipe.source();
  source.configureBlocking(false);

  SelectionKey key = source.register(selector, OP_READ);
  assertTrue(selector.keys().contains(key));

  source.close();
  assertTrue(selector.keys().contains(key));

  selector.selectNow();
  assertFalse(selector.keys().contains(key));
}
 
源代码2 项目: hadoop-gpu   文件: Server.java
public Listener() throws IOException {
  address = new InetSocketAddress(bindAddress, port);
  // Create a new server socket and set to non blocking mode
  acceptChannel = ServerSocketChannel.open();
  acceptChannel.configureBlocking(false);

  // Bind the server socket to the local host and port
  bind(acceptChannel.socket(), address, backlogLength);
  port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
  // create a selector;
  selector= Selector.open();

  // Register accepts on the server socket with the selector.
  acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
  this.setName("IPC Server listener on " + port);
  this.setDaemon(true);
}
 
源代码3 项目: L2jBrasil   文件: SelectorThread.java
public SelectorThread(final SelectorConfig sc, final IMMOExecutor<T> executor, final IPacketHandler<T> packetHandler, final IClientFactory<T> clientFactory, final IAcceptFilter acceptFilter) throws IOException
{
    super.setName("SelectorThread-" + super.getId());

    HELPER_BUFFER_SIZE = sc.HELPER_BUFFER_SIZE;
    HELPER_BUFFER_COUNT = sc.HELPER_BUFFER_COUNT;
    MAX_SEND_PER_PASS = sc.MAX_SEND_PER_PASS;
    MAX_READ_PER_PASS = sc.MAX_READ_PER_PASS;
    SLEEP_TIME = sc.SLEEP_TIME;
    DIRECT_WRITE_BUFFER = ByteBuffer.allocateDirect(sc.WRITE_BUFFER_SIZE) .order(BYTE_ORDER);
    WRITE_BUFFER = ByteBuffer.wrap(new byte[sc.WRITE_BUFFER_SIZE]).order(BYTE_ORDER);
    READ_BUFFER = ByteBuffer.wrap(new byte[sc.READ_BUFFER_SIZE]).order(BYTE_ORDER);
    STRING_BUFFER = new NioNetStringBuffer(64 * 1024);
    _pendingClose = new NioNetStackList<MMOConnection<T>>();
    _bufferPool = new ArrayList<>();
    
    for (int i = 0; i < HELPER_BUFFER_COUNT; i++)
        _bufferPool.add(_bufferPool.size(), ByteBuffer.wrap(new byte[HELPER_BUFFER_SIZE]).order(BYTE_ORDER));

    _acceptFilter = acceptFilter;
    _packetHandler = packetHandler;
    _clientFactory = clientFactory;
    _executor = executor;
    _selector = Selector.open();
}
 
源代码4 项目: j2objc   文件: AbstractSelectableChannelTest.java
/**
 * @tests AbstractSelectableChannel#configureBlocking(boolean)
 */
public void test_configureBlocking_Z_IllegalBlockingMode() throws Exception {
    SocketChannel sc = SocketChannel.open();
    sc.configureBlocking(false);
    Selector acceptSelector = SelectorProvider.provider().openSelector();
    SelectionKey acceptKey = sc.register(acceptSelector,
            SelectionKey.OP_READ, null);
    assertEquals(sc.keyFor(acceptSelector), acceptKey);
    SelectableChannel getChannel = sc.configureBlocking(false);
    assertEquals(getChannel, sc);
    try {
        sc.configureBlocking(true);
        fail("Should throw IllegalBlockingModeException");
    } catch (IllegalBlockingModeException e) {
        // expected
    }
}
 
源代码5 项目: Mycat-NIO   文件: NIOAcceptor.java
@Override
public void run() {
	final Selector selector = this.selector;
	for (;;) {
		++acceptCount;
		try {
			selector.select(1000L);
			Set<SelectionKey> keys = selector.selectedKeys();
			try {
				for (SelectionKey key : keys) {
					if (key.isValid() && key.isAcceptable()) {
						accept();
					} else {
						key.cancel();
					}
				}
			} finally {
				keys.clear();
			}
		} catch (Throwable e) {
			LOGGER.warn(getName(), e);
		}
	}
}
 
源代码6 项目: jdk8u60   文件: WakeupAfterClose.java
public static void main(String[] args) throws Exception {
    final Selector sel = Selector.open();

    Runnable r = new Runnable() {
        public void run() {
            try {
                sel.select();
            } catch (IOException x) {
                x.printStackTrace();
            }
        }
    };

    // start thread to block in Selector
    Thread t = new Thread(r);
    t.start();

    // give thread time to start
    Thread.sleep(1000);

    // interrupt, close, and wakeup is the magic sequence to provoke the NPE
    t.interrupt();
    sel.close();
    sel.wakeup();
}
 
源代码7 项目: android-netty   文件: AbstractNioWorker.java
protected void clearOpWrite(AbstractNioChannel<?> channel) {
    Selector selector = this.selector;
    SelectionKey key = channel.channel.keyFor(selector);
    if (key == null) {
        return;
    }
    if (!key.isValid()) {
        close(key);
        return;
    }

    int interestOps = channel.getRawInterestOps();
    if ((interestOps & SelectionKey.OP_WRITE) != 0) {
        interestOps &= ~SelectionKey.OP_WRITE;
        key.interestOps(interestOps);
        channel.setRawInterestOpsNow(interestOps);
    }
}
 
源代码8 项目: TencentKona-8   文件: WakeupSpeed.java
public static void main(String argv[]) throws Exception {
    int waitTime = 4000;
    Selector selector = Selector.open();
    try {
        selector.wakeup();

        long t1 = System.currentTimeMillis();
        selector.select(waitTime);
        long t2 = System.currentTimeMillis();
        long totalTime = t2 - t1;

        if (totalTime > waitTime)
            throw new RuntimeException("Test failed");
    } finally {
        selector.close();
    }
}
 
源代码9 项目: tomcatsrc   文件: NioSelectorPool.java
protected Selector getSharedSelector() throws IOException {
    if (SHARED && SHARED_SELECTOR == null) {
        synchronized ( NioSelectorPool.class ) {
            if ( SHARED_SELECTOR == null )  {
                synchronized (Selector.class) {
                    // Selector.open() isn't thread safe
                    // http://bugs.sun.com/view_bug.do?bug_id=6427854
                    // Affects 1.6.0_29, fixed in 1.7.0_01
                    SHARED_SELECTOR = Selector.open();
                }
                log.info("Using a shared selector for servlet write/read");
            }
        }
    }
    return  SHARED_SELECTOR;
}
 
源代码10 项目: aeron   文件: Common.java
public static NioSelectedKeySet keySet(final Selector selector)
{
    NioSelectedKeySet tmpSet = null;

    if (null != PUBLIC_SELECTED_KEYS_FIELD)
    {
        try
        {
            tmpSet = new NioSelectedKeySet();

            SELECTED_KEYS_FIELD.set(selector, tmpSet);
            PUBLIC_SELECTED_KEYS_FIELD.set(selector, tmpSet);
        }
        catch (final Exception ignore)
        {
            tmpSet = null;
        }
    }

    return tmpSet;
}
 
源代码11 项目: TencentKona-8   文件: SelectorImpl.java
private void startSelector()
{
    try {
        selector = Selector.open();
    } catch (IOException e) {
        if (orb.transportDebugFlag) {
            dprint(".startSelector: Selector.open: IOException: ", e);
        }
        // REVISIT - better handling/reporting
        RuntimeException rte =
            new RuntimeException(".startSelector: Selector.open exception");
        rte.initCause(e);
        throw rte;
    }
    setDaemon(true);
    start();
    selectorStarted = true;
    if (orb.transportDebugFlag) {
        dprint(".startSelector: selector.start completed.");
    }
}
 
源代码12 项目: Mycat2   文件: NIOConnector.java
public NIOConnector(String name, NIOReactorPool reactorPool)
		throws IOException {
	super.setName(name);
	this.name = name;
	this.selector = Selector.open();
	this.reactorPool = reactorPool;
	this.connectQueue = new LinkedBlockingQueue<AbstractConnection>();
}
 
源代码13 项目: jdk8u_jdk   文件: AbstractSelectableChannel.java
private SelectionKey findKey(Selector sel) {
    synchronized (keyLock) {
        if (keys == null)
            return null;
        for (int i = 0; i < keys.length; i++)
            if ((keys[i] != null) && (keys[i].selector() == sel))
                return keys[i];
        return null;
    }
}
 
源代码14 项目: tracing-framework   文件: TestMessageIO.java
/** Waits for up to 1 second for the channel to process the op */
private static void await(SocketChannel channel, int op) throws IOException {
    Selector selector = Selector.open();
    SelectionKey key = channel.register(selector, op);
    try {
        assertEquals(true, awaitOp(selector, key, op));
    } finally {
        // Cancel key and close selector
        key.cancel();
        selector.close();
    }
}
 
源代码15 项目: tutorials   文件: EchoServer.java
public static void main(String[] args) throws IOException {
    Selector selector = Selector.open();
    ServerSocketChannel serverSocket = ServerSocketChannel.open();
    serverSocket.bind(new InetSocketAddress("localhost", 5454));
    serverSocket.configureBlocking(false);
    serverSocket.register(selector, SelectionKey.OP_ACCEPT);
    ByteBuffer buffer = ByteBuffer.allocate(256);

    while (true) {
        selector.select();
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        Iterator<SelectionKey> iter = selectedKeys.iterator();
        while (iter.hasNext()) {

            SelectionKey key = iter.next();

            if (key.isAcceptable()) {
                register(selector, serverSocket);
            }

            if (key.isReadable()) {
                answerWithEcho(buffer, key);
            }
            iter.remove();
        }
    }
}
 
源代码16 项目: netty.book.kor   文件: Main.java
private void acceptOP(SelectionKey key, Selector selector) throws IOException {

        ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
        SocketChannel socketChannel = serverChannel.accept();
        socketChannel.configureBlocking(false);

        System.out.println("Incoming connection from: " + socketChannel.getRemoteAddress());

        // write an welcome message
        socketChannel.write(ByteBuffer.wrap("Hello!\n".getBytes("UTF-8")));

        // register channel with selector for further I/O
        keepDataTrack.put(socketChannel, new ArrayList<byte[]>());
        socketChannel.register(selector, SelectionKey.OP_READ);
    }
 
源代码17 项目: SEAL   文件: NonBlockingFetcher.java
public static List<String> download(List<URL> urlList) {
  int numURLs = urlList.size();
  numUnfinished = numURLs;
  timeout = estimateTimeOut(numURLs);
  urls = urlList.toArray(new URL[numURLs]);
  works = new Work[numURLs];
  documents = new ArrayList<String>(numURLs);
  
  log.info("Connecting to servers (this may take a while)... ");
  startTime = System.currentTimeMillis();
  resolveHostNames();
  try {
    selector = Selector.open();
  } catch (IOException e) {
    log.error("IO error: " + e);
  }
  for (Work work : works) {
    if (work == null)
      numUnfinished--;
    else add(work);
  }
  Helper.printElapsedTime(startTime);
  
  log.info("Retrieving webpages... (will time out in " + timeout / 1000.0 + " seconds)");
  startTime = System.currentTimeMillis();
  retrieveWebPages();
  log.info("Number of unfinished URL(s): " + numUnfinished);
  Helper.printElapsedTime(startTime);
  
  log.info("Post processing each retrieved webpage...");
  startTime = System.currentTimeMillis();
  processDocuments();
  Helper.printElapsedTime(startTime);
  return documents;
}
 
源代码18 项目: JavaBase   文件: EchoNIOServer.java
public void start() throws IOException {
  Selector selector = Selector.open();
  //通过OPEN方法来打开一个未绑定的ServerSocketChannel 实例
  ServerSocketChannel server = ServerSocketChannel.open();
  //将该ServerSocketChannel绑定到指定ip
  server.bind(new InetSocketAddress(NIOServer.PORT));
  //设置是NIO 非阻塞模式
  server.configureBlocking(false);

  //将sever注册到指定Selector对象上
  server.register(selector, SelectionKey.OP_ACCEPT);

  while (!stop) {
    selector.select(2000);
    Set<SelectionKey> selectedKeys = selector.selectedKeys();
    Iterator<SelectionKey> it = selectedKeys.iterator();
    SelectionKey key;
    while (it.hasNext()) {
      key = it.next();
      it.remove();
      try {
        handleInput(selector, key);
      } catch (Exception e) {
        if (key != null) {
          key.cancel();
          if (key.channel() != null) {
            key.channel().close();
          }
        }
      }
    }
  }
}
 
源代码19 项目: openjdk-jdk9   文件: HttpClientImpl.java
SelectorManager(HttpClientImpl ref) throws IOException {
    super(null, null, "SelectorManager", 0, false);
    ownerRef = new WeakReference<>(ref);
    readyList = new ArrayList<>();
    registrations = new ArrayList<>();
    selector = Selector.open();
}
 
源代码20 项目: MediaLoader   文件: DispatchHandler.java
public DispatchHandler(InetAddress address, int port, CountDownLatch startSignal, TinyHttpd httpServer) throws IOException {
    mSelector = Selector.open();
    mServer = ServerSocketChannel.open();
    mServer.socket().bind(new InetSocketAddress(address, port));
    mServer.configureBlocking(false);
    mServer.register(mSelector, SelectionKey.OP_ACCEPT);
    mHttpServer = httpServer;
    mThreadStartSignal = startSignal;
}
 
源代码21 项目: freehealth-connector   文件: ServerImpl.java
ServerImpl(HttpServer wrapper, String protocol, InetSocketAddress addr, int backlog) throws IOException {
   this.protocol = protocol;
   this.wrapper = wrapper;
   this.logger = Logger.getLogger("com.sun.net.httpserver");
   ServerConfig.checkLegacyProperties(this.logger);
   this.https = protocol.equalsIgnoreCase("https");
   this.address = addr;
   this.contexts = new ContextList();
   this.schan = ServerSocketChannel.open();
   if (addr != null) {
      ServerSocket socket = this.schan.socket();
      socket.bind(addr, backlog);
      this.bound = true;
   }

   this.selector = Selector.open();
   this.schan.configureBlocking(false);
   this.listenerKey = this.schan.register(this.selector, 16);
   this.dispatcher = new ServerImpl.Dispatcher();
   this.idleConnections = Collections.synchronizedSet(new HashSet());
   this.allConnections = Collections.synchronizedSet(new HashSet());
   this.reqConnections = Collections.synchronizedSet(new HashSet());
   this.rspConnections = Collections.synchronizedSet(new HashSet());
   this.time = System.currentTimeMillis();
   this.timer = new Timer("server-timer", true);
   this.timer.schedule(new ServerImpl.ServerTimerTask(), (long)CLOCK_TICK, (long)CLOCK_TICK);
   if (timer1Enabled) {
      this.timer1 = new Timer("server-timer1", true);
      this.timer1.schedule(new ServerImpl.ServerTimerTask1(), TIMER_MILLIS, TIMER_MILLIS);
      this.logger.config("HttpServer timer1 enabled period in ms:  " + TIMER_MILLIS);
      this.logger.config("MAX_REQ_TIME:  " + MAX_REQ_TIME);
      this.logger.config("MAX_RSP_TIME:  " + MAX_RSP_TIME);
   }

   this.events = new LinkedList();
   this.logger.config("HttpServer created " + protocol + " " + addr);
}
 
源代码22 项目: Tomcat8-Source-Read   文件: NioEndpoint.java
/**
 *
 * @param block Should the write be blocking or not?
 * @param from the ByteBuffer containing the data to be written
 * 数据刷新到页面。
 * @throws IOException
 */
@Override
protected void doWrite(boolean block, ByteBuffer from) throws IOException {
    long writeTimeout = getWriteTimeout();
    Selector selector = null;
    try {
        selector = pool.get();
    } catch (IOException x) {
        // Ignore
    }
    try {
        /**
         * 将数据返回给页面。
         * {@link NioSelectorPool#write(java.nio.ByteBuffer, org.apache.tomcat.util.net.NioChannel, java.nio.channels.Selector, long, boolean)}
         */
        pool.write(from, getSocket(), selector, writeTimeout, block);
        if (block) {
            // Make sure we are flushed
            do {
                if (getSocket().flush(true, selector, writeTimeout)) {
                    break;
                }
            } while (true);
        }
        updateLastWrite();
    } finally {
        if (selector != null) {
            pool.put(selector);
        }
    }
    // If there is data left in the buffer the socket will be registered for
    // write further up the stack. This is to ensure the socket is only
    // registered for write once as both container and user code can trigger
    // write registration.
}
 
源代码23 项目: nassau   文件: SoupBinTCP.java
/**
 * Receive messages. Invoke the message listener on each message. Continue
 * until an End of Session packet is received or the end-of-stream is
 * reached.
 *
 * @param address the address
 * @param username the username
 * @param password the password
 * @param listener a message listener
 * @throws IOException if an I/O error occurs
 */
public static void receive(InetSocketAddress address, String username,
        String password, MessageListener listener) throws IOException {
    SocketChannel channel = SocketChannel.open();

    channel.connect(address);
    channel.configureBlocking(false);

    StatusListener statusListener = new StatusListener();

    try (Selector selector = Selector.open();
            SoupBinTCPClient client = new SoupBinTCPClient(channel, listener, statusListener)) {
        channel.register(selector, SelectionKey.OP_READ);

        LoginRequest message = new LoginRequest();

        message.setUsername(username);
        message.setPassword(password);
        message.setRequestedSession("");
        message.setRequestedSequenceNumber(1);

        client.login(message);

        while (statusListener.receive) {
            int numKeys = selector.select(TIMEOUT_MILLIS);

            if (numKeys > 0) {
                if (client.receive() < 0)
                    break;

                selector.selectedKeys().clear();
            }

            client.keepAlive();
        }
    }
}
 
源代码24 项目: tomcatsrc   文件: NioReceiver.java
/**
 * Register the given channel with the given selector for
 * the given operations of interest
 */
protected void registerChannel(Selector selector,
                               SelectableChannel channel,
                               int ops,
                               Object attach) throws Exception {
    if (channel == null)return; // could happen
    // set the new channel non-blocking
    channel.configureBlocking(false);
    // register it with the selector
    channel.register(selector, ops, attach);
}
 
源代码25 项目: SmartProxy   文件: ShadowsocksTunnel.java
public ShadowsocksTunnel(ShadowsocksConfig config,Selector selector) throws Exception {
	super(config.ServerAddress,selector);
	if(config.Encryptor==null){
		throw new Exception("Error: The Encryptor for ShadowsocksTunnel is null.");
	}
	m_Config=config;
	m_Encryptor=config.Encryptor;
}
 
public Reactor(int port, boolean isMaster) throws IOException {
    this.port = port;
    this.isMaster = isMaster;
    this.selector = Selector.open();
    if (isMaster) {
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        serverSocket.configureBlocking(false);
        SelectionKey selectionKey = this.serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        selectionKey.attach(new Object());
        System.out.println("start on " + port + " ...");
    } else {
        // TODO 如何
    }
}
 
源代码27 项目: tribaltrouble   文件: NetworkSelector.java
final Selector getSelector() {
	try {
		initSelector();
	} catch (IOException e) {
		throw new RuntimeException(e);
	}
	return selector;
}
 
源代码28 项目: getty   文件: NioEventLoop.java
public NioEventLoop(BaseConfig config, ChunkPool chunkPool) {
    this.config = config;
    this.chunkPool = chunkPool;
    this.workerThreadPool = new ThreadPool(ThreadPool.FixedThread, 2);
    //初始化数据输出类
    nioBufferWriter = new NioBufferWriter(chunkPool, config.getBufferWriterQueueSize(), config.getChunkPoolBlockTime());
    try {
        selector = new SelectedSelector(Selector.open());
    } catch (IOException e) {
        e.printStackTrace();
    }
}
 
@Override
public void open(final InetAddress nicAddress, final int port, final int maxBufferSize) throws IOException {
    stopped = false;
    datagramChannel = DatagramChannel.open();
    datagramChannel.configureBlocking(false);

    if (maxBufferSize > 0) {
        datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize);
        final int actualReceiveBufSize = datagramChannel.getOption(StandardSocketOptions.SO_RCVBUF);
        if (actualReceiveBufSize < maxBufferSize) {
            logger.warn("Attempted to set Socket Buffer Size to " + maxBufferSize + " bytes but could only set to "
                    + actualReceiveBufSize + "bytes. You may want to consider changing the Operating System's "
                    + "maximum receive buffer");
        }
    }

    // we don't have to worry about nicAddress being null here because InetSocketAddress already handles it
    datagramChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
    datagramChannel.socket().bind(new InetSocketAddress(nicAddress, port));

    // if a sending host and port were provided then connect to that specific address to only receive
    // datagrams from that host/port, otherwise we can receive datagrams from any host/port
    if (sendingHost != null && sendingPort != null) {
        datagramChannel.connect(new InetSocketAddress(sendingHost, sendingPort));
    }

    selector = Selector.open();
    datagramChannel.register(selector, SelectionKey.OP_READ);
}
 
源代码30 项目: IoTgo_Android_App   文件: SelectorManager.java
public void dumpKeyState(List<Object> dumpto)
{
    Selector selector=_selector;
    Set<SelectionKey> keys = selector.keys();
    dumpto.add(selector + " keys=" + keys.size());
    for (SelectionKey key: keys)
    {
        if (key.isValid())
            dumpto.add(key.attachment()+" iOps="+key.interestOps()+" rOps="+key.readyOps());
        else
            dumpto.add(key.attachment()+" iOps=-1 rOps=-1");
    }
}
 
 类所在包
 同包方法