类java.nio.channels.SelectionKey源码实例Demo

下面列出了怎么用java.nio.channels.SelectionKey的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: Tomcat7.0.67   文件: NioSender.java
public synchronized void setMessage(byte[] data,int offset, int length) throws IOException {
    if ( data != null ) {
        current = data;
        remaining = length;
        ackbuf.clear();
        if ( writebuf != null ) writebuf.clear();
        else writebuf = getBuffer(length);
        if ( writebuf.capacity() < length ) writebuf = getBuffer(length);
        
        //TODO use ByteBuffer.wrap to avoid copying the data.
        writebuf.put(data,offset,length);
        //writebuf.rewind();
        //set the limit so that we don't write non wanted data
        //writebuf.limit(length);
        writebuf.flip();
        if (isConnected()) {
            if (isUdpBased())
                dataChannel.register(getSelector(), SelectionKey.OP_WRITE, this);
            else
                socketChannel.register(getSelector(), SelectionKey.OP_WRITE, this);
        }
    }
}
 
源代码2 项目: Tomcat8-Source-Read   文件: NioReceiver.java
/**
 * Sample data handler method for a channel with data ready to read.
 * @param key A SelectionKey object associated with a channel
 *  determined by the selector to be ready for reading.  If the
 *  channel returns an EOF condition, it is closed here, which
 *  automatically invalidates the associated key.  The selector
 *  will then de-register the channel on the next select call.
 * @throws Exception IO error with channel
 */
protected void readDataFromSocket(SelectionKey key) throws Exception {
    NioReplicationTask task = (NioReplicationTask) getTaskPool().getRxTask();
    if (task == null) {
        // No threads/tasks available, do nothing, the selection
        // loop will keep calling this method until a
        // thread becomes available, the thread pool itself has a waiting mechanism
        // so we will not wait here.
        if (log.isDebugEnabled()) log.debug("No TcpReplicationThread available");
    } else {
        // invoking this wakes up the worker thread then returns
        //add task to thread pool
        task.serviceChannel(key);
        getExecutor().execute(task);
    }
}
 
源代码3 项目: SuitAgent   文件: Client.java
/**
 * 注册读写事件,轮训发生的事件
 * @throws IOException
 */
public void talk() throws IOException {
    socketChannel.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE|SelectionKey.OP_READ);
    while (selector.select() > 0){
        Set readyKeys = selector.selectedKeys();
        Iterator it = readyKeys.iterator();
        while (it.hasNext()){
            SelectionKey key = (SelectionKey) it.next();
            it.remove();
            if(key.isReadable()){
                receive(key);
            }
            if(shutdown){
                key.cancel();
                return;
            }
            if(key.isWritable()){
                send(key);
            }
        }
    }
}
 
源代码4 项目: openjdk-jdk8u   文件: SelectWrite.java
public static void main(String[] argv) throws Exception {
    try (ByteServer server = new ByteServer();
         SocketChannel sc = SocketChannel.open(server.address())) {

        server.acceptConnection();

        try (Selector sel = Selector.open()) {
            sc.configureBlocking(false);
            sc.register(sel, SelectionKey.OP_WRITE);
            sel.select();
            sel.selectedKeys().clear();
            if (sel.select() == 0) {
                throw new Exception("Select returned zero");
            }
        }
    }
}
 
源代码5 项目: starcor.xul   文件: XulHttpServer.java
void reply(XulHttpServerResponse serverResponse) {
	_response = serverResponse;

	serverResponse.addHeaderIfNotExists("Content-Type", "text/html")
		.addHeaderIfNotExists("Connection", "close");

	final String transferEncoding = _response.headers.get("Transfer-Encoding");
	_sendChunkedData = "chunked".equals(transferEncoding);
	serverResponse.prepareResponseData();

	_responseBuffer = ByteBuffer.wrap(serverResponse.getData(), 0, serverResponse.getDataSize());
	try {
		Selector selector = _server._selector;
		_socketChannel.register(selector, SelectionKey.OP_WRITE, this);
		selector.wakeup();
	} catch (ClosedChannelException e) {
		clear();
		XulLog.e(TAG, e);
	}
}
 
源代码6 项目: netty.book.kor   文件: NonBlockingServer.java
private void read(SelectionKey key) throws IOException {
    SocketChannel channel = (SocketChannel) key.channel();

    ByteBuffer buffer = ByteBuffer.allocate(8192);
    int numRead = -1;
    try {
        numRead = channel.read(buffer);
    }
    catch (IOException e) {
        e.printStackTrace();
    }

    if (numRead == -1) {
        this.dataMap.remove(channel);
        channel.close();
        key.cancel();
        return;
    }

    byte[] data = new byte[numRead];
    System.arraycopy(buffer.array(), 0, data, 0, numRead);

    // write back to client
    doEcho(key, data);
}
 
源代码7 项目: localization_nifi   文件: ReceivingClient.java
/**
 * Process the message that has arrived off the wire.
 */
@Override
void processData(SelectionKey selectionKey, ByteBuffer messageBuffer) throws IOException {
    byte[] message = new byte[messageBuffer.limit()];
    logger.debug("Received message(size=" + message.length + ")");
    messageBuffer.get(message);
    byte lastByteValue = message[message.length - 1];
    boolean partialMessage = false;
    if (lastByteValue != this.endOfMessageByte) {
        partialMessage = true;
        selectionKey.attach(1);
    } else {
        Integer wasLastPartial = (Integer) selectionKey.attachment();
        if (wasLastPartial != null) {
            if (wasLastPartial.intValue() == 1) {
                partialMessage = true;
                selectionKey.attach(0);
            }
        }
    }
    if (this.messageHandler != null) {
        this.messageHandler.handle(this.connectedAddress, message, partialMessage);
    }
}
 
源代码8 项目: netbeans   文件: WebSocketServerImpl.java
private void setHeaders(SelectionKey key , List<String> headerLines,
        byte[] content )
{
    if ( headerLines.size() >0 ){
        getContext(key).setRequest(headerLines.get(0));
    }
    Map<String,String> result = new HashMap<String, String>();
    for (String line : headerLines) {
        int index = line.indexOf(':');
        if ( index != -1 ){
            result.put( line.substring( 0, index), line.substring(index+1).trim());
        }
    }
    getContext(key).setHeaders(result);
    getContext(key).setContent(content);
}
 
源代码9 项目: simplewebserver   文件: HttpDecodeThread.java
public void doRead(SocketChannel channel, SelectionKey key) throws IOException {
    if (channel != null && channel.isOpen()) {
        Map.Entry<HttpRequestDeCoder, HttpResponse> codecEntry = applicationContext.getHttpDeCoderMap().get(channel.socket());
        ReadWriteSelectorHandler handler;
        if (codecEntry == null) {
            handler = simpleWebServer.getReadWriteSelectorHandlerInstance(channel, key);
            HttpRequestDeCoder requestDeCoder = new HttpRequestDecoderImpl(requestConfig, applicationContext, handler);
            codecEntry = new AbstractMap.SimpleEntry<HttpRequestDeCoder, HttpResponse>(requestDeCoder, new SimpleHttpResponse(requestDeCoder.getRequest(), responseConfig));
            applicationContext.getHttpDeCoderMap().put(channel.socket(), codecEntry);
        } else {
            handler = codecEntry.getKey().getRequest().getHandler();
        }
        LinkedBlockingDeque<RequestEvent> entryBlockingQueue = socketChannelBlockingQueueConcurrentHashMap.get(channel);
        if (entryBlockingQueue == null) {
            entryBlockingQueue = new LinkedBlockingDeque<>();
            socketChannelBlockingQueueConcurrentHashMap.put(channel, entryBlockingQueue);
        }
        entryBlockingQueue.add(new RequestEvent(key, FileCacheKit.generatorRequestTempFile(serverConfig.getPort(), handler.handleRead().array())));
        synchronized (this) {
            this.notify();
        }
    }
}
 
源代码10 项目: vespa   文件: Listener.java
/**
 * Perform write operation(s) on channel which is now ready for
 * writing
 */
private void performWrite(SelectionKey key) {
    if (Thread.currentThread().isInterrupted()) {
        return;
    }

    Connection c = (Connection) key.attachment();

    try {
        c.write();
    } catch (IOException e) {
        log.log(Level.FINE, " write failed", e);
        try {
            c.close();
        } catch (IOException e2) {// ignore
        }
    }
}
 
源代码11 项目: yuzhouwan   文件: NIOServer.java
void startServer() throws IOException {
    // 准备好一个选择器, 监控是否有链接 (OP_ACCEPT)
    SelectorLoop connectionBell = new SelectorLoop();

    // 准备好一个选择器, 监控是否有read事件 (OP_READ)
    readBell = new SelectorLoop();

    // 开启一个server channel来监听
    ServerSocketChannel ssc = ServerSocketChannel.open();
    // 开启非阻塞模式
    ssc.configureBlocking(false);

    ServerSocket socket = ssc.socket();
    socket.bind(new InetSocketAddress("localhost", SOCKET_PORT));

    // 给选择器规定好要监听报告的事件, 这个选择器只监听新连接事件
    ssc.register(connectionBell.getSelector(), SelectionKey.OP_ACCEPT);
    new Thread(connectionBell).start();
}
 
源代码12 项目: freehealth-connector   文件: ServerImpl.java
private void handleException(SelectionKey key, Exception e) {
   HttpConnection conn = (HttpConnection)key.attachment();
   if (e != null) {
      ServerImpl.this.logger.log(Level.FINER, "Dispatcher (2)", e);
   }

   ServerImpl.this.closeConnection(conn);
}
 
源代码13 项目: jlibs   文件: Reactor.java
void unregister(TCPServer server){
    if(DEBUG)
        println(server+".unregister");
    servers.remove(server);
    SelectionKey key = server.selectable.keyFor(selector);
    if(key!=null && key.isValid())
        key.cancel();
}
 
源代码14 项目: tribaltrouble   文件: NetworkSelector.java
final void cancelKey(SelectionKey key, Handler handler) {
	Object handler_key = null;
	if (!deterministic.isPlayback()) {
		handler_key = key.attachment();
		key.cancel();
	}
	handler_key = deterministic.log(handler_key);
	handler_map.remove(handler_key);
}
 
源代码15 项目: disruptor_thrift_server   文件: TDisruptorServer.java
/**
 * Must only be called in {@link AbstractSelectorThread#run()} and {@link AbstractSelectorThread#select()}. Overrides base
 * implementation.
 * <p>
 * Checks whether this selector thread is stopped. It's stopped, and returns true, if the enclosing {@code TDisruptorServer}
 * instance is stopped and:
 * <ul>
 * <li> this selector thread's {@code Selector} is closed, or
 * <li> all of this selector thread's registered {@code Message}s are inactive
 * </ul>
 * <p>
 * While checking the active state of messages, this method cleans up the selection key for inactive ones using
 * {@link #cleanupSelectionKey(SelectionKey)}. This method return false as soon as an active message is found.
 */
@Override
protected boolean isStopped()
{
    if (!isStopped)
    {
        return false;
    }
    else if (!selector.isOpen())
    {
        // selector thread closes itself, this must be called after drain
        return true;
    }
    // still in select loop
    Iterator<SelectionKey> keys = selector.keys().iterator();

    while(keys.hasNext())
    {
        SelectionKey key = keys.next();
        Message message = (Message) key.attachment();
        
        if (message != null && message.isActive())
        {
            return false;
        }
        // cleanup preemptively, key could still be in selected key set, but will be invalid
        cleanupSelectionKey(key);
    }
    return true;
}
 
源代码16 项目: j2objc   文件: ServerSocketChannelTest.java
public void testOpen() {
    MockServerSocketChannel testMSChnl = new MockServerSocketChannel(null);
    MockServerSocketChannel testMSChnlnotnull = new MockServerSocketChannel(
            SelectorProvider.provider());
    assertEquals(SelectionKey.OP_ACCEPT, testMSChnlnotnull.validOps());
    assertNull(testMSChnl.provider());
    assertNotNull(testMSChnlnotnull.provider());
    assertNotNull(this.serverChannel.provider());
    assertEquals(testMSChnlnotnull.provider(), this.serverChannel
            .provider());
}
 
源代码17 项目: jdk8u60   文件: SelectorImpl.java
private void handleDeferredRegistrations()
{
    synchronized (deferredRegistrations) {
        int deferredListSize = deferredRegistrations.size();
        for (int i = 0; i < deferredListSize; i++) {
            EventHandler eventHandler =
                (EventHandler)deferredRegistrations.get(i);
            if (orb.transportDebugFlag) {
                dprint(".handleDeferredRegistrations: " + eventHandler);
            }
            SelectableChannel channel = eventHandler.getChannel();
            SelectionKey selectionKey = null;
            try {
                selectionKey =
                    channel.register(selector,
                                     eventHandler.getInterestOps(),
                                     (Object)eventHandler);
            } catch (ClosedChannelException e) {
                if (orb.transportDebugFlag) {
                    dprint(".handleDeferredRegistrations: " + e);
                }
            }
            eventHandler.setSelectionKey(selectionKey);
        }
        deferredRegistrations.clear();
    }
}
 
源代码18 项目: bt   文件: DataReceivingLoop.java
@Override
public void registerChannel(SelectableChannel channel, ChannelHandlerContext context) {
    // use atomic wakeup-and-register to prevent blocking of registration,
    // if selection is resumed before call to register is performed
    // (there is a race between the message receiving loop and current thread)
    // TODO: move this to the main loop instead?
    selector.wakeupAndRegister(channel, SelectionKey.OP_READ, context);
}
 
源代码19 项目: TakinRPC   文件: NioClientProcessor.java
@Override
public void connect(SelectionKey key) {
    try {
        channel.socketChannel().finishConnect();
        key.attach(channel);
    } catch (IOException e) {
        eventHandler().exceptionCaught(channel, e);
        key.cancel();
        return;
    }
    key.interestOps(SelectionKey.OP_READ);
}
 
源代码20 项目: tracing-framework   文件: PubSubServer.java
public void UpdateSelector() {
    if (this.writer.canWrite()) {
        key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
    } else {
        key.interestOps(SelectionKey.OP_READ);
    }
}
 
源代码21 项目: localization_nifi   文件: ReceivingClient.java
private SocketChannel doConnect(InetSocketAddress addressToConnect) throws IOException {
    SocketChannel channel = SocketChannel.open();
    if (channel.connect(addressToConnect)) {
        channel.configureBlocking(false);
        channel.register(this.selector, SelectionKey.OP_READ);
    } else {
        throw new IllegalStateException("Failed to connect to Server at: " + addressToConnect);
    }
    return channel;
}
 
/**
 * Attempts to finish a connection
 * @param key
 */
void finishConnection(SelectionKey key)
{
	SocketChannel chan = (SocketChannel) key.channel();
	Session session = socChanMap.get(chan);

	if (chan.isConnectionPending())
	{
		try
		{
			if (session.getConnection().finishConnect())
			{
				session.halfConnected();
				session.login();
			}
			else
			{
				session.connecting();
			}
		}
		catch (IOException e)
		{
			session.markForRemoval();
			key.cancel();
			e.printStackTrace();
		}
	}
}
 
源代码23 项目: SmartProxy   文件: Tunnel.java
public void connect(InetSocketAddress destAddress) throws Exception{
	if(LocalVpnService.Instance.protect(m_InnerChannel.socket())){//����socket����vpn
		m_DestAddress=destAddress;
		m_InnerChannel.register(m_Selector, SelectionKey.OP_CONNECT,this);//ע�������¼�
		m_InnerChannel.connect(m_ServerEP);//����Ŀ��
	}else {
		throw new Exception("VPN protect socket failed.");
	}
}
 
源代码24 项目: WebSocket-for-Android   文件: SelectorManager.java
private void renewSelector()
{
    try
    {
        synchronized (this)
        {
            Selector selector=_selector;
            if (selector==null)
                return;
            final Selector new_selector = Selector.open();
            for (SelectionKey k: selector.keys())
            {
                if (!k.isValid() || k.interestOps()==0)
                    continue;

                final SelectableChannel channel = k.channel();
                final Object attachment = k.attachment();

                if (attachment==null)
                    addChange(channel);
                else
                    addChange(channel,attachment);
            }
            _selector.close();
            _selector=new_selector;
        }
    }
    catch(IOException e)
    {
        throw new RuntimeException("recreating selector",e);
    }
}
 
源代码25 项目: openjdk-jdk8u   文件: SelectorImpl.java
public void registerForEvent(EventHandler eventHandler)
{
    if (orb.transportDebugFlag) {
        dprint(".registerForEvent: " + eventHandler);
    }

    if (isClosed()) {
        if (orb.transportDebugFlag) {
            dprint(".registerForEvent: closed: " + eventHandler);
        }
        return;
    }

    if (eventHandler.shouldUseSelectThreadToWait()) {
        synchronized (deferredRegistrations) {
            deferredRegistrations.add(eventHandler);
        }
        if (! selectorStarted) {
            startSelector();
        }
        selector.wakeup();
        return;
    }

    switch (eventHandler.getInterestOps()) {
    case SelectionKey.OP_ACCEPT :
        createListenerThread(eventHandler);
        break;
    case SelectionKey.OP_READ :
        createReaderThread(eventHandler);
        break;
    default:
        if (orb.transportDebugFlag) {
            dprint(".registerForEvent: default: " + eventHandler);
        }
        throw new RuntimeException(
            "SelectorImpl.registerForEvent: unknown interest ops");
    }
}
 
源代码26 项目: TencentKona-8   文件: SctpChannelImpl.java
@Override
public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
    int newOps = 0;
    if ((ops & SelectionKey.OP_READ) != 0)
        newOps |= Net.POLLIN;
    if ((ops & SelectionKey.OP_WRITE) != 0)
        newOps |= Net.POLLOUT;
    if ((ops & SelectionKey.OP_CONNECT) != 0)
        newOps |= Net.POLLCONN;
    sk.selector.putEventOps(sk, newOps);
}
 
源代码27 项目: dragonwell8_jdk   文件: ReadAfterConnect.java
public static void main(String[] argv) throws Exception {
    try (ByteServer server = new ByteServer();
         SocketChannel sc = SocketChannel.open(server.address())) {

        server.acceptConnection();

        try (Selector sel = Selector.open()) {
            sc.configureBlocking(false);
            sc.register(sel, SelectionKey.OP_READ);
            // Previously channel would get selected here, although there is nothing to read
            if (sel.selectNow() != 0)
                throw new Exception("Select returned nonzero value");
        }
    }
}
 
源代码28 项目: feeyo-redisproxy   文件: Connection.java
@Override
public void doNextWriteCheck() {
	
	//检查是否正在写,看CAS更新writing值是否成功
	if ( !writing.compareAndSet(false, true) ) {
		return;
	}
	
	try {
		//利用缓存队列和写缓冲记录保证写的可靠性,返回true则为全部写入成功
		boolean noMoreData = write0();	
		
	    //如果全部写入成功而且写入队列为空(有可能在写入过程中又有新的Bytebuffer加入到队列),则取消注册写事件
           //否则,继续注册写事件
		if ( noMoreData && writeQueue.isEmpty() ) {
			if ( (processKey.isValid() && (processKey.interestOps() & SelectionKey.OP_WRITE) != 0)) {
				disableWrite();
			}
		} else {
			if ((processKey.isValid() && (processKey.interestOps() & SelectionKey.OP_WRITE) == 0)) {
				enableWrite(false);
			}
		}
		
	} catch (IOException e) {
		if ( LOGGER.isDebugEnabled() ) {
			LOGGER.debug("caught err:", e);
		}
		close("err:" + e);
	} finally {
		//CAS RESET
		writing.set(false);	
	}
}
 
源代码29 项目: jmeter-plugins   文件: SocketChannelWithTimeouts.java
protected SocketChannelWithTimeouts() throws IOException {
    super(null);
    log.debug("Creating socketChannel");
    selector = Selector.open();
    socketChannel = SocketChannel.open();
    socketChannel.configureBlocking(false);
    channelKey = socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
 
源代码30 项目: JavaInterview   文件: MultiplexerTimeServer.java
/**
 * 初始化多路复用器
 *
 * @param port
 *
 * */
public MultiplexerTimeServer(int port) {
    try {
        this.selector = Selector.open();
        this.serverSocketChannel = ServerSocketChannel.open();
        this.serverSocketChannel.configureBlocking(false);
        this.serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
        this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
        System.out.println("The Multiplexer Time Server is start on port:" + port);
    } catch (IOException e) {
        e.printStackTrace();
        System.exit(-1);
    }
}
 
 类所在包
 同包方法