下面列出了怎么用java.nio.channels.Selector的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
void selectorRemovesKeysOnChannelCloseWhenSelecting() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
SelectionKey key = source.register(selector, OP_READ);
assertTrue(selector.keys().contains(key));
source.close();
assertTrue(selector.keys().contains(key));
selector.selectNow();
assertFalse(selector.keys().contains(key));
}
public Listener() throws IOException {
address = new InetSocketAddress(bindAddress, port);
// Create a new server socket and set to non blocking mode
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false);
// Bind the server socket to the local host and port
bind(acceptChannel.socket(), address, backlogLength);
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
// create a selector;
selector= Selector.open();
// Register accepts on the server socket with the selector.
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("IPC Server listener on " + port);
this.setDaemon(true);
}
public SelectorThread(final SelectorConfig sc, final IMMOExecutor<T> executor, final IPacketHandler<T> packetHandler, final IClientFactory<T> clientFactory, final IAcceptFilter acceptFilter) throws IOException
{
super.setName("SelectorThread-" + super.getId());
HELPER_BUFFER_SIZE = sc.HELPER_BUFFER_SIZE;
HELPER_BUFFER_COUNT = sc.HELPER_BUFFER_COUNT;
MAX_SEND_PER_PASS = sc.MAX_SEND_PER_PASS;
MAX_READ_PER_PASS = sc.MAX_READ_PER_PASS;
SLEEP_TIME = sc.SLEEP_TIME;
DIRECT_WRITE_BUFFER = ByteBuffer.allocateDirect(sc.WRITE_BUFFER_SIZE) .order(BYTE_ORDER);
WRITE_BUFFER = ByteBuffer.wrap(new byte[sc.WRITE_BUFFER_SIZE]).order(BYTE_ORDER);
READ_BUFFER = ByteBuffer.wrap(new byte[sc.READ_BUFFER_SIZE]).order(BYTE_ORDER);
STRING_BUFFER = new NioNetStringBuffer(64 * 1024);
_pendingClose = new NioNetStackList<MMOConnection<T>>();
_bufferPool = new ArrayList<>();
for (int i = 0; i < HELPER_BUFFER_COUNT; i++)
_bufferPool.add(_bufferPool.size(), ByteBuffer.wrap(new byte[HELPER_BUFFER_SIZE]).order(BYTE_ORDER));
_acceptFilter = acceptFilter;
_packetHandler = packetHandler;
_clientFactory = clientFactory;
_executor = executor;
_selector = Selector.open();
}
/**
* @tests AbstractSelectableChannel#configureBlocking(boolean)
*/
public void test_configureBlocking_Z_IllegalBlockingMode() throws Exception {
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
Selector acceptSelector = SelectorProvider.provider().openSelector();
SelectionKey acceptKey = sc.register(acceptSelector,
SelectionKey.OP_READ, null);
assertEquals(sc.keyFor(acceptSelector), acceptKey);
SelectableChannel getChannel = sc.configureBlocking(false);
assertEquals(getChannel, sc);
try {
sc.configureBlocking(true);
fail("Should throw IllegalBlockingModeException");
} catch (IllegalBlockingModeException e) {
// expected
}
}
@Override
public void run() {
final Selector selector = this.selector;
for (;;) {
++acceptCount;
try {
selector.select(1000L);
Set<SelectionKey> keys = selector.selectedKeys();
try {
for (SelectionKey key : keys) {
if (key.isValid() && key.isAcceptable()) {
accept();
} else {
key.cancel();
}
}
} finally {
keys.clear();
}
} catch (Throwable e) {
LOGGER.warn(getName(), e);
}
}
}
public static void main(String[] args) throws Exception {
final Selector sel = Selector.open();
Runnable r = new Runnable() {
public void run() {
try {
sel.select();
} catch (IOException x) {
x.printStackTrace();
}
}
};
// start thread to block in Selector
Thread t = new Thread(r);
t.start();
// give thread time to start
Thread.sleep(1000);
// interrupt, close, and wakeup is the magic sequence to provoke the NPE
t.interrupt();
sel.close();
sel.wakeup();
}
protected void clearOpWrite(AbstractNioChannel<?> channel) {
Selector selector = this.selector;
SelectionKey key = channel.channel.keyFor(selector);
if (key == null) {
return;
}
if (!key.isValid()) {
close(key);
return;
}
int interestOps = channel.getRawInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps);
channel.setRawInterestOpsNow(interestOps);
}
}
public static void main(String argv[]) throws Exception {
int waitTime = 4000;
Selector selector = Selector.open();
try {
selector.wakeup();
long t1 = System.currentTimeMillis();
selector.select(waitTime);
long t2 = System.currentTimeMillis();
long totalTime = t2 - t1;
if (totalTime > waitTime)
throw new RuntimeException("Test failed");
} finally {
selector.close();
}
}
protected Selector getSharedSelector() throws IOException {
if (SHARED && SHARED_SELECTOR == null) {
synchronized ( NioSelectorPool.class ) {
if ( SHARED_SELECTOR == null ) {
synchronized (Selector.class) {
// Selector.open() isn't thread safe
// http://bugs.sun.com/view_bug.do?bug_id=6427854
// Affects 1.6.0_29, fixed in 1.7.0_01
SHARED_SELECTOR = Selector.open();
}
log.info("Using a shared selector for servlet write/read");
}
}
}
return SHARED_SELECTOR;
}
public static NioSelectedKeySet keySet(final Selector selector)
{
NioSelectedKeySet tmpSet = null;
if (null != PUBLIC_SELECTED_KEYS_FIELD)
{
try
{
tmpSet = new NioSelectedKeySet();
SELECTED_KEYS_FIELD.set(selector, tmpSet);
PUBLIC_SELECTED_KEYS_FIELD.set(selector, tmpSet);
}
catch (final Exception ignore)
{
tmpSet = null;
}
}
return tmpSet;
}
private void startSelector()
{
try {
selector = Selector.open();
} catch (IOException e) {
if (orb.transportDebugFlag) {
dprint(".startSelector: Selector.open: IOException: ", e);
}
// REVISIT - better handling/reporting
RuntimeException rte =
new RuntimeException(".startSelector: Selector.open exception");
rte.initCause(e);
throw rte;
}
setDaemon(true);
start();
selectorStarted = true;
if (orb.transportDebugFlag) {
dprint(".startSelector: selector.start completed.");
}
}
public NIOConnector(String name, NIOReactorPool reactorPool)
throws IOException {
super.setName(name);
this.name = name;
this.selector = Selector.open();
this.reactorPool = reactorPool;
this.connectQueue = new LinkedBlockingQueue<AbstractConnection>();
}
private SelectionKey findKey(Selector sel) {
synchronized (keyLock) {
if (keys == null)
return null;
for (int i = 0; i < keys.length; i++)
if ((keys[i] != null) && (keys[i].selector() == sel))
return keys[i];
return null;
}
}
/** Waits for up to 1 second for the channel to process the op */
private static void await(SocketChannel channel, int op) throws IOException {
Selector selector = Selector.open();
SelectionKey key = channel.register(selector, op);
try {
assertEquals(true, awaitOp(selector, key, op));
} finally {
// Cancel key and close selector
key.cancel();
selector.close();
}
}
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress("localhost", 5454));
serverSocket.configureBlocking(false);
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer buffer = ByteBuffer.allocate(256);
while (true) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isAcceptable()) {
register(selector, serverSocket);
}
if (key.isReadable()) {
answerWithEcho(buffer, key);
}
iter.remove();
}
}
}
private void acceptOP(SelectionKey key, Selector selector) throws IOException {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverChannel.accept();
socketChannel.configureBlocking(false);
System.out.println("Incoming connection from: " + socketChannel.getRemoteAddress());
// write an welcome message
socketChannel.write(ByteBuffer.wrap("Hello!\n".getBytes("UTF-8")));
// register channel with selector for further I/O
keepDataTrack.put(socketChannel, new ArrayList<byte[]>());
socketChannel.register(selector, SelectionKey.OP_READ);
}
public static List<String> download(List<URL> urlList) {
int numURLs = urlList.size();
numUnfinished = numURLs;
timeout = estimateTimeOut(numURLs);
urls = urlList.toArray(new URL[numURLs]);
works = new Work[numURLs];
documents = new ArrayList<String>(numURLs);
log.info("Connecting to servers (this may take a while)... ");
startTime = System.currentTimeMillis();
resolveHostNames();
try {
selector = Selector.open();
} catch (IOException e) {
log.error("IO error: " + e);
}
for (Work work : works) {
if (work == null)
numUnfinished--;
else add(work);
}
Helper.printElapsedTime(startTime);
log.info("Retrieving webpages... (will time out in " + timeout / 1000.0 + " seconds)");
startTime = System.currentTimeMillis();
retrieveWebPages();
log.info("Number of unfinished URL(s): " + numUnfinished);
Helper.printElapsedTime(startTime);
log.info("Post processing each retrieved webpage...");
startTime = System.currentTimeMillis();
processDocuments();
Helper.printElapsedTime(startTime);
return documents;
}
public void start() throws IOException {
Selector selector = Selector.open();
//通过OPEN方法来打开一个未绑定的ServerSocketChannel 实例
ServerSocketChannel server = ServerSocketChannel.open();
//将该ServerSocketChannel绑定到指定ip
server.bind(new InetSocketAddress(NIOServer.PORT));
//设置是NIO 非阻塞模式
server.configureBlocking(false);
//将sever注册到指定Selector对象上
server.register(selector, SelectionKey.OP_ACCEPT);
while (!stop) {
selector.select(2000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(selector, key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
}
}
SelectorManager(HttpClientImpl ref) throws IOException {
super(null, null, "SelectorManager", 0, false);
ownerRef = new WeakReference<>(ref);
readyList = new ArrayList<>();
registrations = new ArrayList<>();
selector = Selector.open();
}
public DispatchHandler(InetAddress address, int port, CountDownLatch startSignal, TinyHttpd httpServer) throws IOException {
mSelector = Selector.open();
mServer = ServerSocketChannel.open();
mServer.socket().bind(new InetSocketAddress(address, port));
mServer.configureBlocking(false);
mServer.register(mSelector, SelectionKey.OP_ACCEPT);
mHttpServer = httpServer;
mThreadStartSignal = startSignal;
}
ServerImpl(HttpServer wrapper, String protocol, InetSocketAddress addr, int backlog) throws IOException {
this.protocol = protocol;
this.wrapper = wrapper;
this.logger = Logger.getLogger("com.sun.net.httpserver");
ServerConfig.checkLegacyProperties(this.logger);
this.https = protocol.equalsIgnoreCase("https");
this.address = addr;
this.contexts = new ContextList();
this.schan = ServerSocketChannel.open();
if (addr != null) {
ServerSocket socket = this.schan.socket();
socket.bind(addr, backlog);
this.bound = true;
}
this.selector = Selector.open();
this.schan.configureBlocking(false);
this.listenerKey = this.schan.register(this.selector, 16);
this.dispatcher = new ServerImpl.Dispatcher();
this.idleConnections = Collections.synchronizedSet(new HashSet());
this.allConnections = Collections.synchronizedSet(new HashSet());
this.reqConnections = Collections.synchronizedSet(new HashSet());
this.rspConnections = Collections.synchronizedSet(new HashSet());
this.time = System.currentTimeMillis();
this.timer = new Timer("server-timer", true);
this.timer.schedule(new ServerImpl.ServerTimerTask(), (long)CLOCK_TICK, (long)CLOCK_TICK);
if (timer1Enabled) {
this.timer1 = new Timer("server-timer1", true);
this.timer1.schedule(new ServerImpl.ServerTimerTask1(), TIMER_MILLIS, TIMER_MILLIS);
this.logger.config("HttpServer timer1 enabled period in ms: " + TIMER_MILLIS);
this.logger.config("MAX_REQ_TIME: " + MAX_REQ_TIME);
this.logger.config("MAX_RSP_TIME: " + MAX_RSP_TIME);
}
this.events = new LinkedList();
this.logger.config("HttpServer created " + protocol + " " + addr);
}
/**
*
* @param block Should the write be blocking or not?
* @param from the ByteBuffer containing the data to be written
* 数据刷新到页面。
* @throws IOException
*/
@Override
protected void doWrite(boolean block, ByteBuffer from) throws IOException {
long writeTimeout = getWriteTimeout();
Selector selector = null;
try {
selector = pool.get();
} catch (IOException x) {
// Ignore
}
try {
/**
* 将数据返回给页面。
* {@link NioSelectorPool#write(java.nio.ByteBuffer, org.apache.tomcat.util.net.NioChannel, java.nio.channels.Selector, long, boolean)}
*/
pool.write(from, getSocket(), selector, writeTimeout, block);
if (block) {
// Make sure we are flushed
do {
if (getSocket().flush(true, selector, writeTimeout)) {
break;
}
} while (true);
}
updateLastWrite();
} finally {
if (selector != null) {
pool.put(selector);
}
}
// If there is data left in the buffer the socket will be registered for
// write further up the stack. This is to ensure the socket is only
// registered for write once as both container and user code can trigger
// write registration.
}
/**
* Receive messages. Invoke the message listener on each message. Continue
* until an End of Session packet is received or the end-of-stream is
* reached.
*
* @param address the address
* @param username the username
* @param password the password
* @param listener a message listener
* @throws IOException if an I/O error occurs
*/
public static void receive(InetSocketAddress address, String username,
String password, MessageListener listener) throws IOException {
SocketChannel channel = SocketChannel.open();
channel.connect(address);
channel.configureBlocking(false);
StatusListener statusListener = new StatusListener();
try (Selector selector = Selector.open();
SoupBinTCPClient client = new SoupBinTCPClient(channel, listener, statusListener)) {
channel.register(selector, SelectionKey.OP_READ);
LoginRequest message = new LoginRequest();
message.setUsername(username);
message.setPassword(password);
message.setRequestedSession("");
message.setRequestedSequenceNumber(1);
client.login(message);
while (statusListener.receive) {
int numKeys = selector.select(TIMEOUT_MILLIS);
if (numKeys > 0) {
if (client.receive() < 0)
break;
selector.selectedKeys().clear();
}
client.keepAlive();
}
}
}
/**
* Register the given channel with the given selector for
* the given operations of interest
*/
protected void registerChannel(Selector selector,
SelectableChannel channel,
int ops,
Object attach) throws Exception {
if (channel == null)return; // could happen
// set the new channel non-blocking
channel.configureBlocking(false);
// register it with the selector
channel.register(selector, ops, attach);
}
public ShadowsocksTunnel(ShadowsocksConfig config,Selector selector) throws Exception {
super(config.ServerAddress,selector);
if(config.Encryptor==null){
throw new Exception("Error: The Encryptor for ShadowsocksTunnel is null.");
}
m_Config=config;
m_Encryptor=config.Encryptor;
}
public Reactor(int port, boolean isMaster) throws IOException {
this.port = port;
this.isMaster = isMaster;
this.selector = Selector.open();
if (isMaster) {
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
SelectionKey selectionKey = this.serverSocket.register(selector, SelectionKey.OP_ACCEPT);
selectionKey.attach(new Object());
System.out.println("start on " + port + " ...");
} else {
// TODO 如何
}
}
final Selector getSelector() {
try {
initSelector();
} catch (IOException e) {
throw new RuntimeException(e);
}
return selector;
}
public NioEventLoop(BaseConfig config, ChunkPool chunkPool) {
this.config = config;
this.chunkPool = chunkPool;
this.workerThreadPool = new ThreadPool(ThreadPool.FixedThread, 2);
//初始化数据输出类
nioBufferWriter = new NioBufferWriter(chunkPool, config.getBufferWriterQueueSize(), config.getChunkPoolBlockTime());
try {
selector = new SelectedSelector(Selector.open());
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void open(final InetAddress nicAddress, final int port, final int maxBufferSize) throws IOException {
stopped = false;
datagramChannel = DatagramChannel.open();
datagramChannel.configureBlocking(false);
if (maxBufferSize > 0) {
datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize);
final int actualReceiveBufSize = datagramChannel.getOption(StandardSocketOptions.SO_RCVBUF);
if (actualReceiveBufSize < maxBufferSize) {
logger.warn("Attempted to set Socket Buffer Size to " + maxBufferSize + " bytes but could only set to "
+ actualReceiveBufSize + "bytes. You may want to consider changing the Operating System's "
+ "maximum receive buffer");
}
}
// we don't have to worry about nicAddress being null here because InetSocketAddress already handles it
datagramChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
datagramChannel.socket().bind(new InetSocketAddress(nicAddress, port));
// if a sending host and port were provided then connect to that specific address to only receive
// datagrams from that host/port, otherwise we can receive datagrams from any host/port
if (sendingHost != null && sendingPort != null) {
datagramChannel.connect(new InetSocketAddress(sendingHost, sendingPort));
}
selector = Selector.open();
datagramChannel.register(selector, SelectionKey.OP_READ);
}
public void dumpKeyState(List<Object> dumpto)
{
Selector selector=_selector;
Set<SelectionKey> keys = selector.keys();
dumpto.add(selector + " keys=" + keys.size());
for (SelectionKey key: keys)
{
if (key.isValid())
dumpto.add(key.attachment()+" iOps="+key.interestOps()+" rOps="+key.readyOps());
else
dumpto.add(key.attachment()+" iOps=-1 rOps=-1");
}
}