下面列出了java.nio.channels.ClosedSelectorException#java.nio.channels.SelectionKey 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public synchronized void setMessage(byte[] data,int offset, int length) throws IOException {
if ( data != null ) {
current = data;
remaining = length;
ackbuf.clear();
if ( writebuf != null ) writebuf.clear();
else writebuf = getBuffer(length);
if ( writebuf.capacity() < length ) writebuf = getBuffer(length);
//TODO use ByteBuffer.wrap to avoid copying the data.
writebuf.put(data,offset,length);
//writebuf.rewind();
//set the limit so that we don't write non wanted data
//writebuf.limit(length);
writebuf.flip();
if (isConnected()) {
if (isUdpBased())
dataChannel.register(getSelector(), SelectionKey.OP_WRITE, this);
else
socketChannel.register(getSelector(), SelectionKey.OP_WRITE, this);
}
}
}
/**
* Sample data handler method for a channel with data ready to read.
* @param key A SelectionKey object associated with a channel
* determined by the selector to be ready for reading. If the
* channel returns an EOF condition, it is closed here, which
* automatically invalidates the associated key. The selector
* will then de-register the channel on the next select call.
* @throws Exception IO error with channel
*/
protected void readDataFromSocket(SelectionKey key) throws Exception {
NioReplicationTask task = (NioReplicationTask) getTaskPool().getRxTask();
if (task == null) {
// No threads/tasks available, do nothing, the selection
// loop will keep calling this method until a
// thread becomes available, the thread pool itself has a waiting mechanism
// so we will not wait here.
if (log.isDebugEnabled()) log.debug("No TcpReplicationThread available");
} else {
// invoking this wakes up the worker thread then returns
//add task to thread pool
task.serviceChannel(key);
getExecutor().execute(task);
}
}
/**
* 注册读写事件,轮训发生的事件
* @throws IOException
*/
public void talk() throws IOException {
socketChannel.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE|SelectionKey.OP_READ);
while (selector.select() > 0){
Set readyKeys = selector.selectedKeys();
Iterator it = readyKeys.iterator();
while (it.hasNext()){
SelectionKey key = (SelectionKey) it.next();
it.remove();
if(key.isReadable()){
receive(key);
}
if(shutdown){
key.cancel();
return;
}
if(key.isWritable()){
send(key);
}
}
}
}
public static void main(String[] argv) throws Exception {
try (ByteServer server = new ByteServer();
SocketChannel sc = SocketChannel.open(server.address())) {
server.acceptConnection();
try (Selector sel = Selector.open()) {
sc.configureBlocking(false);
sc.register(sel, SelectionKey.OP_WRITE);
sel.select();
sel.selectedKeys().clear();
if (sel.select() == 0) {
throw new Exception("Select returned zero");
}
}
}
}
void reply(XulHttpServerResponse serverResponse) {
_response = serverResponse;
serverResponse.addHeaderIfNotExists("Content-Type", "text/html")
.addHeaderIfNotExists("Connection", "close");
final String transferEncoding = _response.headers.get("Transfer-Encoding");
_sendChunkedData = "chunked".equals(transferEncoding);
serverResponse.prepareResponseData();
_responseBuffer = ByteBuffer.wrap(serverResponse.getData(), 0, serverResponse.getDataSize());
try {
Selector selector = _server._selector;
_socketChannel.register(selector, SelectionKey.OP_WRITE, this);
selector.wakeup();
} catch (ClosedChannelException e) {
clear();
XulLog.e(TAG, e);
}
}
private void read(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(8192);
int numRead = -1;
try {
numRead = channel.read(buffer);
}
catch (IOException e) {
e.printStackTrace();
}
if (numRead == -1) {
this.dataMap.remove(channel);
channel.close();
key.cancel();
return;
}
byte[] data = new byte[numRead];
System.arraycopy(buffer.array(), 0, data, 0, numRead);
// write back to client
doEcho(key, data);
}
/**
* Process the message that has arrived off the wire.
*/
@Override
void processData(SelectionKey selectionKey, ByteBuffer messageBuffer) throws IOException {
byte[] message = new byte[messageBuffer.limit()];
logger.debug("Received message(size=" + message.length + ")");
messageBuffer.get(message);
byte lastByteValue = message[message.length - 1];
boolean partialMessage = false;
if (lastByteValue != this.endOfMessageByte) {
partialMessage = true;
selectionKey.attach(1);
} else {
Integer wasLastPartial = (Integer) selectionKey.attachment();
if (wasLastPartial != null) {
if (wasLastPartial.intValue() == 1) {
partialMessage = true;
selectionKey.attach(0);
}
}
}
if (this.messageHandler != null) {
this.messageHandler.handle(this.connectedAddress, message, partialMessage);
}
}
private void setHeaders(SelectionKey key , List<String> headerLines,
byte[] content )
{
if ( headerLines.size() >0 ){
getContext(key).setRequest(headerLines.get(0));
}
Map<String,String> result = new HashMap<String, String>();
for (String line : headerLines) {
int index = line.indexOf(':');
if ( index != -1 ){
result.put( line.substring( 0, index), line.substring(index+1).trim());
}
}
getContext(key).setHeaders(result);
getContext(key).setContent(content);
}
public void doRead(SocketChannel channel, SelectionKey key) throws IOException {
if (channel != null && channel.isOpen()) {
Map.Entry<HttpRequestDeCoder, HttpResponse> codecEntry = applicationContext.getHttpDeCoderMap().get(channel.socket());
ReadWriteSelectorHandler handler;
if (codecEntry == null) {
handler = simpleWebServer.getReadWriteSelectorHandlerInstance(channel, key);
HttpRequestDeCoder requestDeCoder = new HttpRequestDecoderImpl(requestConfig, applicationContext, handler);
codecEntry = new AbstractMap.SimpleEntry<HttpRequestDeCoder, HttpResponse>(requestDeCoder, new SimpleHttpResponse(requestDeCoder.getRequest(), responseConfig));
applicationContext.getHttpDeCoderMap().put(channel.socket(), codecEntry);
} else {
handler = codecEntry.getKey().getRequest().getHandler();
}
LinkedBlockingDeque<RequestEvent> entryBlockingQueue = socketChannelBlockingQueueConcurrentHashMap.get(channel);
if (entryBlockingQueue == null) {
entryBlockingQueue = new LinkedBlockingDeque<>();
socketChannelBlockingQueueConcurrentHashMap.put(channel, entryBlockingQueue);
}
entryBlockingQueue.add(new RequestEvent(key, FileCacheKit.generatorRequestTempFile(serverConfig.getPort(), handler.handleRead().array())));
synchronized (this) {
this.notify();
}
}
}
/**
* Perform write operation(s) on channel which is now ready for
* writing
*/
private void performWrite(SelectionKey key) {
if (Thread.currentThread().isInterrupted()) {
return;
}
Connection c = (Connection) key.attachment();
try {
c.write();
} catch (IOException e) {
log.log(Level.FINE, " write failed", e);
try {
c.close();
} catch (IOException e2) {// ignore
}
}
}
void startServer() throws IOException {
// 准备好一个选择器, 监控是否有链接 (OP_ACCEPT)
SelectorLoop connectionBell = new SelectorLoop();
// 准备好一个选择器, 监控是否有read事件 (OP_READ)
readBell = new SelectorLoop();
// 开启一个server channel来监听
ServerSocketChannel ssc = ServerSocketChannel.open();
// 开启非阻塞模式
ssc.configureBlocking(false);
ServerSocket socket = ssc.socket();
socket.bind(new InetSocketAddress("localhost", SOCKET_PORT));
// 给选择器规定好要监听报告的事件, 这个选择器只监听新连接事件
ssc.register(connectionBell.getSelector(), SelectionKey.OP_ACCEPT);
new Thread(connectionBell).start();
}
private void handleException(SelectionKey key, Exception e) {
HttpConnection conn = (HttpConnection)key.attachment();
if (e != null) {
ServerImpl.this.logger.log(Level.FINER, "Dispatcher (2)", e);
}
ServerImpl.this.closeConnection(conn);
}
void unregister(TCPServer server){
if(DEBUG)
println(server+".unregister");
servers.remove(server);
SelectionKey key = server.selectable.keyFor(selector);
if(key!=null && key.isValid())
key.cancel();
}
final void cancelKey(SelectionKey key, Handler handler) {
Object handler_key = null;
if (!deterministic.isPlayback()) {
handler_key = key.attachment();
key.cancel();
}
handler_key = deterministic.log(handler_key);
handler_map.remove(handler_key);
}
/**
* Must only be called in {@link AbstractSelectorThread#run()} and {@link AbstractSelectorThread#select()}. Overrides base
* implementation.
* <p>
* Checks whether this selector thread is stopped. It's stopped, and returns true, if the enclosing {@code TDisruptorServer}
* instance is stopped and:
* <ul>
* <li> this selector thread's {@code Selector} is closed, or
* <li> all of this selector thread's registered {@code Message}s are inactive
* </ul>
* <p>
* While checking the active state of messages, this method cleans up the selection key for inactive ones using
* {@link #cleanupSelectionKey(SelectionKey)}. This method return false as soon as an active message is found.
*/
@Override
protected boolean isStopped()
{
if (!isStopped)
{
return false;
}
else if (!selector.isOpen())
{
// selector thread closes itself, this must be called after drain
return true;
}
// still in select loop
Iterator<SelectionKey> keys = selector.keys().iterator();
while(keys.hasNext())
{
SelectionKey key = keys.next();
Message message = (Message) key.attachment();
if (message != null && message.isActive())
{
return false;
}
// cleanup preemptively, key could still be in selected key set, but will be invalid
cleanupSelectionKey(key);
}
return true;
}
public void testOpen() {
MockServerSocketChannel testMSChnl = new MockServerSocketChannel(null);
MockServerSocketChannel testMSChnlnotnull = new MockServerSocketChannel(
SelectorProvider.provider());
assertEquals(SelectionKey.OP_ACCEPT, testMSChnlnotnull.validOps());
assertNull(testMSChnl.provider());
assertNotNull(testMSChnlnotnull.provider());
assertNotNull(this.serverChannel.provider());
assertEquals(testMSChnlnotnull.provider(), this.serverChannel
.provider());
}
private void handleDeferredRegistrations()
{
synchronized (deferredRegistrations) {
int deferredListSize = deferredRegistrations.size();
for (int i = 0; i < deferredListSize; i++) {
EventHandler eventHandler =
(EventHandler)deferredRegistrations.get(i);
if (orb.transportDebugFlag) {
dprint(".handleDeferredRegistrations: " + eventHandler);
}
SelectableChannel channel = eventHandler.getChannel();
SelectionKey selectionKey = null;
try {
selectionKey =
channel.register(selector,
eventHandler.getInterestOps(),
(Object)eventHandler);
} catch (ClosedChannelException e) {
if (orb.transportDebugFlag) {
dprint(".handleDeferredRegistrations: " + e);
}
}
eventHandler.setSelectionKey(selectionKey);
}
deferredRegistrations.clear();
}
}
@Override
public void registerChannel(SelectableChannel channel, ChannelHandlerContext context) {
// use atomic wakeup-and-register to prevent blocking of registration,
// if selection is resumed before call to register is performed
// (there is a race between the message receiving loop and current thread)
// TODO: move this to the main loop instead?
selector.wakeupAndRegister(channel, SelectionKey.OP_READ, context);
}
@Override
public void connect(SelectionKey key) {
try {
channel.socketChannel().finishConnect();
key.attach(channel);
} catch (IOException e) {
eventHandler().exceptionCaught(channel, e);
key.cancel();
return;
}
key.interestOps(SelectionKey.OP_READ);
}
public void UpdateSelector() {
if (this.writer.canWrite()) {
key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
} else {
key.interestOps(SelectionKey.OP_READ);
}
}
private SocketChannel doConnect(InetSocketAddress addressToConnect) throws IOException {
SocketChannel channel = SocketChannel.open();
if (channel.connect(addressToConnect)) {
channel.configureBlocking(false);
channel.register(this.selector, SelectionKey.OP_READ);
} else {
throw new IllegalStateException("Failed to connect to Server at: " + addressToConnect);
}
return channel;
}
/**
* Attempts to finish a connection
* @param key
*/
void finishConnection(SelectionKey key)
{
SocketChannel chan = (SocketChannel) key.channel();
Session session = socChanMap.get(chan);
if (chan.isConnectionPending())
{
try
{
if (session.getConnection().finishConnect())
{
session.halfConnected();
session.login();
}
else
{
session.connecting();
}
}
catch (IOException e)
{
session.markForRemoval();
key.cancel();
e.printStackTrace();
}
}
}
public void connect(InetSocketAddress destAddress) throws Exception{
if(LocalVpnService.Instance.protect(m_InnerChannel.socket())){//����socket����vpn
m_DestAddress=destAddress;
m_InnerChannel.register(m_Selector, SelectionKey.OP_CONNECT,this);//ע�������¼�
m_InnerChannel.connect(m_ServerEP);//����Ŀ��
}else {
throw new Exception("VPN protect socket failed.");
}
}
private void renewSelector()
{
try
{
synchronized (this)
{
Selector selector=_selector;
if (selector==null)
return;
final Selector new_selector = Selector.open();
for (SelectionKey k: selector.keys())
{
if (!k.isValid() || k.interestOps()==0)
continue;
final SelectableChannel channel = k.channel();
final Object attachment = k.attachment();
if (attachment==null)
addChange(channel);
else
addChange(channel,attachment);
}
_selector.close();
_selector=new_selector;
}
}
catch(IOException e)
{
throw new RuntimeException("recreating selector",e);
}
}
public void registerForEvent(EventHandler eventHandler)
{
if (orb.transportDebugFlag) {
dprint(".registerForEvent: " + eventHandler);
}
if (isClosed()) {
if (orb.transportDebugFlag) {
dprint(".registerForEvent: closed: " + eventHandler);
}
return;
}
if (eventHandler.shouldUseSelectThreadToWait()) {
synchronized (deferredRegistrations) {
deferredRegistrations.add(eventHandler);
}
if (! selectorStarted) {
startSelector();
}
selector.wakeup();
return;
}
switch (eventHandler.getInterestOps()) {
case SelectionKey.OP_ACCEPT :
createListenerThread(eventHandler);
break;
case SelectionKey.OP_READ :
createReaderThread(eventHandler);
break;
default:
if (orb.transportDebugFlag) {
dprint(".registerForEvent: default: " + eventHandler);
}
throw new RuntimeException(
"SelectorImpl.registerForEvent: unknown interest ops");
}
}
@Override
public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
int newOps = 0;
if ((ops & SelectionKey.OP_READ) != 0)
newOps |= Net.POLLIN;
if ((ops & SelectionKey.OP_WRITE) != 0)
newOps |= Net.POLLOUT;
if ((ops & SelectionKey.OP_CONNECT) != 0)
newOps |= Net.POLLCONN;
sk.selector.putEventOps(sk, newOps);
}
public static void main(String[] argv) throws Exception {
try (ByteServer server = new ByteServer();
SocketChannel sc = SocketChannel.open(server.address())) {
server.acceptConnection();
try (Selector sel = Selector.open()) {
sc.configureBlocking(false);
sc.register(sel, SelectionKey.OP_READ);
// Previously channel would get selected here, although there is nothing to read
if (sel.selectNow() != 0)
throw new Exception("Select returned nonzero value");
}
}
}
@Override
public void doNextWriteCheck() {
//检查是否正在写,看CAS更新writing值是否成功
if ( !writing.compareAndSet(false, true) ) {
return;
}
try {
//利用缓存队列和写缓冲记录保证写的可靠性,返回true则为全部写入成功
boolean noMoreData = write0();
//如果全部写入成功而且写入队列为空(有可能在写入过程中又有新的Bytebuffer加入到队列),则取消注册写事件
//否则,继续注册写事件
if ( noMoreData && writeQueue.isEmpty() ) {
if ( (processKey.isValid() && (processKey.interestOps() & SelectionKey.OP_WRITE) != 0)) {
disableWrite();
}
} else {
if ((processKey.isValid() && (processKey.interestOps() & SelectionKey.OP_WRITE) == 0)) {
enableWrite(false);
}
}
} catch (IOException e) {
if ( LOGGER.isDebugEnabled() ) {
LOGGER.debug("caught err:", e);
}
close("err:" + e);
} finally {
//CAS RESET
writing.set(false);
}
}
protected SocketChannelWithTimeouts() throws IOException {
super(null);
log.debug("Creating socketChannel");
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
channelKey = socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
/**
* 初始化多路复用器
*
* @param port
*
* */
public MultiplexerTimeServer(int port) {
try {
this.selector = Selector.open();
this.serverSocketChannel = ServerSocketChannel.open();
this.serverSocketChannel.configureBlocking(false);
this.serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
System.out.println("The Multiplexer Time Server is start on port:" + port);
} catch (IOException e) {
e.printStackTrace();
System.exit(-1);
}
}