下面列出了java.nio.channels.Selector#keys() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void closeHard(final Selector selector) {
try {
// Close channels
for (final SelectionKey key : selector.keys()) {
// Close the channel
closeHard(key.channel());
}
// Close the selector
selector.selectNow();
selector.close();
} catch (final Exception e) {
ignoreException(e, "closing hard");
}
}
protected void socketTimeouts() {
long now = System.currentTimeMillis();
if ( (now-lastCheck) < getSelectorTimeout() ) return;
//timeout
Selector tmpsel = this.selector.get();
Set<SelectionKey> keys = (isListening()&&tmpsel!=null)?tmpsel.keys():null;
if ( keys == null ) return;
for (Iterator<SelectionKey> iter = keys.iterator(); iter.hasNext();) {
SelectionKey key = iter.next();
try {
// if (key.interestOps() == SelectionKey.OP_READ) {
// //only timeout sockets that we are waiting for a read from
// ObjectReader ka = (ObjectReader) key.attachment();
// long delta = now - ka.getLastAccess();
// if (delta > (long) getTimeout()) {
// cancelledKey(key);
// }
// }
// else
if ( key.interestOps() == 0 ) {
//check for keys that didn't make it in.
ObjectReader ka = (ObjectReader) key.attachment();
if ( ka != null ) {
long delta = now - ka.getLastAccess();
if (delta > getTimeout() && (!ka.isAccessed())) {
if (log.isWarnEnabled())
log.warn("Channel key is registered, but has had no interest ops for the last "+getTimeout()+" ms. (cancelled:"+ka.isCancelled()+"):"+key+" last access:"+new java.sql.Timestamp(ka.getLastAccess())+" Possible cause: all threads used, perform thread dump");
ka.setLastAccess(now);
//key.interestOps(SelectionKey.OP_READ);
}//end if
} else {
cancelledKey(key);
}//end if
}//end if
}catch ( CancelledKeyException ckx ) {
cancelledKey(key);
}
}
lastCheck = System.currentTimeMillis();
}
void debugPrint(Selector selector) {
System.err.println("Selector: debugprint start");
Set<SelectionKey> keys = selector.keys();
for (SelectionKey key : keys) {
SelectableChannel c = key.channel();
int ops = key.interestOps();
System.err.printf("selector chan:%s ops:%d\n", c, ops);
}
System.err.println("Selector: debugprint end");
}
protected void socketTimeouts() {
long now = System.currentTimeMillis();
if ( (now-lastCheck) < getSelectorTimeout() ) return;
//timeout
Selector tmpsel = this.selector.get();
Set<SelectionKey> keys = (isListening()&&tmpsel!=null)?tmpsel.keys():null;
if ( keys == null ) return;
for (Iterator<SelectionKey> iter = keys.iterator(); iter.hasNext();) {
SelectionKey key = iter.next();
try {
// if (key.interestOps() == SelectionKey.OP_READ) {
// //only timeout sockets that we are waiting for a read from
// ObjectReader ka = (ObjectReader) key.attachment();
// long delta = now - ka.getLastAccess();
// if (delta > (long) getTimeout()) {
// cancelledKey(key);
// }
// }
// else
if ( key.interestOps() == 0 ) {
//check for keys that didn't make it in.
ObjectReader ka = (ObjectReader) key.attachment();
if ( ka != null ) {
long delta = now - ka.getLastAccess();
if (delta > getTimeout() && (!ka.isAccessed())) {
if (log.isWarnEnabled())
log.warn("Channel key is registered, but has had no interest ops for the last "+getTimeout()+" ms. (cancelled:"+ka.isCancelled()+"):"+key+" last access:"+new java.sql.Timestamp(ka.getLastAccess())+" Possible cause: all threads used, perform thread dump");
ka.setLastAccess(now);
//key.interestOps(SelectionKey.OP_READ);
}//end if
} else {
cancelledKey(key);
}//end if
}//end if
}catch ( CancelledKeyException ckx ) {
cancelledKey(key);
}
}
lastCheck = System.currentTimeMillis();
}
private void renewSelector()
{
try
{
synchronized (this)
{
Selector selector=_selector;
if (selector==null)
return;
final Selector new_selector = Selector.open();
for (SelectionKey k: selector.keys())
{
if (!k.isValid() || k.interestOps()==0)
continue;
final SelectableChannel channel = k.channel();
final Object attachment = k.attachment();
if (attachment==null)
addChange(channel);
else
addChange(channel,attachment);
}
_selector.close();
_selector=new_selector;
}
}
catch(IOException e)
{
throw new RuntimeException("recreating selector",e);
}
}
public void dumpKeyState(List<Object> dumpto)
{
Selector selector=_selector;
Set<SelectionKey> keys = selector.keys();
dumpto.add(selector + " keys=" + keys.size());
for (SelectionKey key: keys)
{
if (key.isValid())
dumpto.add(key.attachment()+" iOps="+key.interestOps()+" rOps="+key.readyOps());
else
dumpto.add(key.attachment()+" iOps=-1 rOps=-1");
}
}
private void renewSelector()
{
try
{
synchronized (this)
{
Selector selector=_selector;
if (selector==null)
return;
final Selector new_selector = Selector.open();
for (SelectionKey k: selector.keys())
{
if (!k.isValid() || k.interestOps()==0)
continue;
final SelectableChannel channel = k.channel();
final Object attachment = k.attachment();
if (attachment==null)
addChange(channel);
else
addChange(channel,attachment);
}
_selector.close();
_selector=new_selector;
}
}
catch(IOException e)
{
throw new RuntimeException("recreating selector",e);
}
}
public void dumpKeyState(List<Object> dumpto)
{
Selector selector=_selector;
Set<SelectionKey> keys = selector.keys();
dumpto.add(selector + " keys=" + keys.size());
for (SelectionKey key: keys)
{
if (key.isValid())
dumpto.add(key.attachment()+" iOps="+key.interestOps()+" rOps="+key.readyOps());
else
dumpto.add(key.attachment()+" iOps=-1 rOps=-1");
}
}
private void renewSelector()
{
try
{
synchronized (this)
{
Selector selector=_selector;
if (selector==null)
return;
final Selector new_selector = Selector.open();
for (SelectionKey k: selector.keys())
{
if (!k.isValid() || k.interestOps()==0)
continue;
final SelectableChannel channel = k.channel();
final Object attachment = k.attachment();
if (attachment==null)
addChange(channel);
else
addChange(channel,attachment);
}
_selector.close();
_selector=new_selector;
}
}
catch(IOException e)
{
throw new RuntimeException("recreating selector",e);
}
}
public void dumpKeyState(List<Object> dumpto)
{
Selector selector=_selector;
Set<SelectionKey> keys = selector.keys();
dumpto.add(selector + " keys=" + keys.size());
for (SelectionKey key: keys)
{
if (key.isValid())
dumpto.add(key.attachment()+" iOps="+key.interestOps()+" rOps="+key.readyOps());
else
dumpto.add(key.attachment()+" iOps=-1 rOps=-1");
}
}
protected void socketTimeouts() {
long now = System.currentTimeMillis();
if ( (now-lastCheck) < getSelectorTimeout() ) return;
//timeout
Selector tmpsel = this.selector.get();
Set<SelectionKey> keys = (isListening()&&tmpsel!=null)?tmpsel.keys():null;
if ( keys == null ) return;
for (Iterator<SelectionKey> iter = keys.iterator(); iter.hasNext();) {
SelectionKey key = iter.next();
try {
// if (key.interestOps() == SelectionKey.OP_READ) {
// //only timeout sockets that we are waiting for a read from
// ObjectReader ka = (ObjectReader) key.attachment();
// long delta = now - ka.getLastAccess();
// if (delta > (long) getTimeout()) {
// cancelledKey(key);
// }
// }
// else
if ( key.interestOps() == 0 ) {
//check for keys that didn't make it in.
ObjectReader ka = (ObjectReader) key.attachment();
if ( ka != null ) {
long delta = now - ka.getLastAccess();
if (delta > getTimeout() && (!ka.isAccessed())) {
if (log.isWarnEnabled())
log.warn(sm.getString(
"nioReceiver.threadsExhausted",
Integer.valueOf(getTimeout()),
Boolean.valueOf(ka.isCancelled()),
key,
new java.sql.Timestamp(ka.getLastAccess())));
ka.setLastAccess(now);
//key.interestOps(SelectionKey.OP_READ);
}//end if
} else {
cancelledKey(key);
}//end if
}//end if
}catch ( CancelledKeyException ckx ) {
cancelledKey(key);
}
}
lastCheck = System.currentTimeMillis();
}
private void readContent(Selector selector, SelectionKey sk) throws IOException {
SocketChannel sc = (SocketChannel) sk.channel();
ByteBuffer buff = ByteBuffer.allocate(1024);
StringBuilder content = new StringBuilder();
try {
int s;
while ((s = sc.read(buff)) > 0) {
buff.flip();
content.append(charset.decode(buff));
}
buff.clear();
if (s == -1) {
System.out.println("close");
sc.close();
}
if (content.length() == 0) {
return;
}
log.info("receive msg: {}", content.toString());
// TimeUnit.MILLISECONDS.sleep(700);
if ("stop".equalsIgnoreCase(content.toString())) {
stop();
log.info("stop the sever");
return;
}
sk.interestOps(SelectionKey.OP_READ);//设置成准备下次读取
} catch (Exception e) {
//从Selector中删除指定的SelectionKey
sk.cancel();
ResourceTool.close(sk.channel());
}
if (content.length() <= 0) {
return;
}
// 遍历selector里注册的所有SelectionKey
// 也就是广播消息出去
for (SelectionKey key : selector.keys()) {
Channel targetChannel = key.channel();//获取Channel
//如果改Channel是SocketChannel是SocketChannel对象
if (targetChannel instanceof SocketChannel) {
//将读到的内容写入到该Channel中去
SocketChannel dest = (SocketChannel) targetChannel;
dest.write(charset.encode(content.toString()));
}
}
}