下面列出了java.nio.channels.Selector#open() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Creates the network listener
*
* @param maxConnections
* The maximum number of connections
* @param maxOutbound
* The maximum number of outbound connections
* @param hostName
* The host name for this port or null
* @param listenPort
* The port to listen on
* @param staticAddresses
* Static peer address
* @param blacklist
* Peer blacklist
* @throws IOException
* I/O error
*/
public NetworkHandler(int maxConnections, int maxOutbound, String hostName, int listenPort,
PeerAddress[] staticAddresses, List<BlacklistEntry> blacklist) throws IOException {
this.maxConnections = maxConnections;
this.maxOutbound = maxOutbound;
this.hostName = hostName;
BTCLoader.listenPort = listenPort;
peerBlacklist.addAll(blacklist);
//
// Create the selector for listening for network events
//
networkSelector = Selector.open();
//
// Build the static peer address list
//
if (staticAddresses != null) {
staticConnections = true;
this.maxOutbound = Math.min(this.maxOutbound, staticAddresses.length);
for (PeerAddress address : staticAddresses) {
address.setStatic(true);
BTCLoader.peerAddresses.add(0, address);
BTCLoader.peerMap.put(address, address);
}
}
}
@Test
void selectorRemovesKeysOnChannelCloseWhenSelecting() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
SelectionKey key = source.register(selector, OP_READ);
assertTrue(selector.keys().contains(key));
source.close();
assertTrue(selector.keys().contains(key));
selector.selectNow();
assertFalse(selector.keys().contains(key));
}
private void startSelector()
{
try {
selector = Selector.open();
} catch (IOException e) {
if (orb.transportDebugFlag) {
dprint(".startSelector: Selector.open: IOException: " + e);
}
// REVISIT - better handling/reporting
RuntimeException rte =
new RuntimeException(".startSelector: Selector.open exception");
rte.initCause(e);
throw rte;
}
setDaemon(true);
start();
selectorStarted = true;
if (orb.transportDebugFlag) {
dprint(".startSelector: selector.start completed.");
}
}
/**
* In the case we are using the java select() method, this method is used to
* trash the buggy selector and create a new one, registering all the
* sockets on it.
*/
@Override
protected void registerNewSelector() throws IOException {
synchronized (selector) {
Set<SelectionKey> keys = selector.keys();
// Open a new selector
Selector newSelector = Selector.open();
// Loop on all the registered keys, and register them on the new selector
for (SelectionKey key : keys) {
SelectableChannel ch = key.channel();
// Don't forget to attache the session, and back !
NioSession session = (NioSession) key.attachment();
SelectionKey newKey = ch.register(newSelector, key.interestOps(), session);
session.setSelectionKey(newKey);
}
// Now we can close the old selector and switch it
selector.close();
selector = newSelector;
}
}
@SuppressWarnings("resource")
public static void main(String[] args) throws IOException {
var server = ServerSocketChannel.open();
server.bind(new InetSocketAddress(7777));
System.out.println("server bound to " + server.getLocalAddress());
var remote = SocketChannel.open();
remote.connect(new InetSocketAddress(InetAddress.getByName(Host.NAME), 7));
remote.configureBlocking(false);
var selector = Selector.open();
var scheduler = new Scheduler(selector);
System.out.println("accepting ...");
var client = server.accept();
client.configureBlocking(false);
var cont1 = new Continuation(SCOPE, runnable(scheduler, client, remote));
var cont2 = new Continuation(SCOPE, runnable(scheduler, remote, client));
scheduler.inject(cont1);
scheduler.inject(cont2);
scheduler.loop();
}
public static void main(String argv[]) throws Exception {
int waitTime = 4000;
Selector selector = Selector.open();
try {
selector.wakeup();
long t1 = System.currentTimeMillis();
selector.select(waitTime);
long t2 = System.currentTimeMillis();
long totalTime = t2 - t1;
if (totalTime > waitTime)
throw new RuntimeException("Test failed");
} finally {
selector.close();
}
}
private void startSelector()
{
try {
selector = Selector.open();
} catch (IOException e) {
if (orb.transportDebugFlag) {
dprint(".startSelector: Selector.open: IOException: ", e);
}
// REVISIT - better handling/reporting
RuntimeException rte =
new RuntimeException(".startSelector: Selector.open exception");
rte.initCause(e);
throw rte;
}
setDaemon(true);
start();
selectorStarted = true;
if (orb.transportDebugFlag) {
dprint(".startSelector: selector.start completed.");
}
}
public NIOAcceptor(String name, String bindIp, int port,
ConnectionFactory factory, NIOReactorPool reactorPool)
throws IOException {
super.setName(name);
this.port = port;
this.selector = Selector.open();
this.serverChannel = ServerSocketChannel.open();
this.serverChannel.configureBlocking(false);
/** 设置TCP属性 */
serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 1024 * 16 * 2);
// backlog=100
serverChannel.bind(new InetSocketAddress(bindIp, port), 100);
this.serverChannel.register(selector, SelectionKey.OP_ACCEPT);
this.factory = factory;
this.reactorPool = reactorPool;
}
private void startSelector()
{
try {
selector = Selector.open();
} catch (IOException e) {
if (orb.transportDebugFlag) {
dprint(".startSelector: Selector.open: IOException: ", e);
}
// REVISIT - better handling/reporting
RuntimeException rte =
new RuntimeException(".startSelector: Selector.open exception");
rte.initCause(e);
throw rte;
}
setDaemon(true);
start();
selectorStarted = true;
if (orb.transportDebugFlag) {
dprint(".startSelector: selector.start completed.");
}
}
public TcpProxyServer(int port) throws IOException {
m_Selector = Selector.open();
m_ServerSocketChannel = ServerSocketChannel.open();
m_ServerSocketChannel.configureBlocking(false);
m_ServerSocketChannel.socket().bind(new InetSocketAddress(port));
m_ServerSocketChannel.register(m_Selector, SelectionKey.OP_ACCEPT);
this.Port=(short) m_ServerSocketChannel.socket().getLocalPort();
System.out.printf("AsyncTcpServer listen on %d success.\n", this.Port&0xFFFF);
}
/** Waits for up to 1 second for the server to be acceptable */
private static void awaitAcceptable(ServerSocketChannel channel) throws IOException {
Selector selector = Selector.open();
SelectionKey key = channel.register(selector, SelectionKey.OP_ACCEPT);
try {
assertEquals(true, awaitOp(selector, key, SelectionKey.OP_ACCEPT));
} finally {
// Cancel key and close selector
key.cancel();
selector.close();
}
}
ClientThread(SocketAddress address) throws Exception {
this.address = address;
selector = Selector.open();
setDaemon(true);
}
/**
* 启动服务器。
*
* @param port 服务监听的端口
* @param selectTimeout {@link Selector}检查通道就绪状态的超时时间(单位:毫秒)
*/
private static void startServer(int port, int selectTimeout) {
ServerSocketChannel serverChannel = null;
try {
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket serverSocket = serverChannel.socket();
serverSocket.bind(new InetSocketAddress(port));
if (logger.isLoggable(Level.INFO)) {
logger.info("NIO echo网络服务启动完毕,监听端口:" +port);
}
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int selectNum = selector.select(selectTimeout);
if (0 == selectNum) {
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()) {
SelectionKey selectionKey = (SelectionKey) it.next();
// 接受新的Socket连接
if (selectionKey.isAcceptable()) {
acceptNew(selector, selectionKey);
}
// 读取并处理Socket的数据
if (selectionKey.isReadable()) {
readData(selector, selectionKey);
}
it.remove();
} // end of while iterator
}
} catch (IOException e) {
logger.log(Level.SEVERE, "处理网络连接出错", e);
}
}
ClientThread(SocketAddress address) throws Exception {
this.address = address;
selector = Selector.open();
setDaemon(true);
}
@Override
public void onCreate() {
// registerNetReceiver();
super.onCreate();
if (isOAndBoot) {
//android 8.0 boot
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
NotificationChannel channel = new NotificationChannel("vhosts_channel_id", "System", NotificationManager.IMPORTANCE_NONE);
NotificationManager manager = (NotificationManager) getSystemService(Context.NOTIFICATION_SERVICE);
manager.createNotificationChannel(channel);
Notification notification = new Notification.Builder(this, "vhosts_channel_id")
.setSmallIcon(R.mipmap.ic_launcher)
.setContentTitle("Virtual Hosts Running")
.build();
startForeground(1, notification);
}
isOAndBoot=false;
}
setupHostFile();
setupVPN();
if (vpnInterface == null) {
LogUtils.d(TAG, "unknow error");
stopVService();
return;
}
isRunning = true;
try {
udpSelector = Selector.open();
tcpSelector = Selector.open();
deviceToNetworkUDPQueue = new ConcurrentLinkedQueue<>();
deviceToNetworkTCPQueue = new ConcurrentLinkedQueue<>();
networkToDeviceQueue = new ConcurrentLinkedQueue<>();
udpSelectorLock = new ReentrantLock();
tcpSelectorLock = new ReentrantLock();
executorService = Executors.newFixedThreadPool(5);
executorService.submit(new UDPInput(networkToDeviceQueue, udpSelector, udpSelectorLock));
executorService.submit(new UDPOutput(deviceToNetworkUDPQueue, networkToDeviceQueue, udpSelector, udpSelectorLock, this));
executorService.submit(new TCPInput(networkToDeviceQueue, tcpSelector, tcpSelectorLock));
executorService.submit(new TCPOutput(deviceToNetworkTCPQueue, networkToDeviceQueue, tcpSelector, tcpSelectorLock, this));
executorService.submit(new VPNRunnable(vpnInterface.getFileDescriptor(),
deviceToNetworkUDPQueue, deviceToNetworkTCPQueue, networkToDeviceQueue));
LocalBroadcastManager.getInstance(this).sendBroadcast(new Intent(BROADCAST_VPN_STATE).putExtra("running", true));
LogUtils.i(TAG, "Started");
} catch (Exception e) {
// TODO: Here and elsewhere, we should explicitly notify the user of any errors
// and suggest that they stop the service, since we can't do it ourselves
LogUtils.e(TAG, "Error starting service", e);
stopVService();
}
}
public NIOConnector(String name) throws IOException {
super.setName(name);
this.name = name;
this.selector = Selector.open();
this.connectQueue = new LinkedBlockingQueue<BackendConnection>();
}
/**
* @tests java.nio.channel.Selector#select(long)
*/
public void test_selectJ_SelectorClosed() throws IOException {
assert_select_SelectorClosed(SelectType.TIMEOUT, 0);
selector = Selector.open();
assert_select_SelectorClosed(SelectType.TIMEOUT, WAIT_TIME);
}
public static void main(String[] argv) throws Exception {
// server: accept connection and write one byte
try (ByteServer server = new ByteServer();
SocketChannel sc = SocketChannel.open(server.address())) {
server.acceptConnection();
server.write(1);
try (Selector sel = Selector.open()) {
sc.read(ByteBuffer.allocate(1));
sc.configureBlocking(false);
sc.register(sel, SelectionKey.OP_READ);
// previously on Windows select would select channel here, although there was
// nothing to read
if (sel.selectNow() != 0)
throw new Exception("Select returned nonzero value");
}
}
// Now we will test a two reads combination
// server: accept connection and write two bytes
try (ByteServer server = new ByteServer();
SocketChannel sc = SocketChannel.open(server.address())) {
server.acceptConnection();
server.write(2);
try (Selector sel = Selector.open()) {
sc.configureBlocking(false);
sc.register(sel, SelectionKey.OP_READ);
if (sel.select(TIMEOUT) != 1)
throw new Exception("One selected key expected");
sel.selectedKeys().clear();
// previously on Windows a channel would get selected only once
if (sel.selectNow() != 1)
throw new Exception("One selected key expected");
// Previously on Windows two consequent reads would cause select()
// to select a channel, although there was nothing remaining to
// read in the channel
if (sc.read(ByteBuffer.allocate(1)) != 1)
throw new Exception("One byte expected");
if (sc.read(ByteBuffer.allocate(1)) != 1)
throw new Exception("One byte expected");
if (sel.selectNow() != 0)
throw new Exception("Select returned nonzero value");
}
}
}
private RW() throws IOException {
this.selector = Selector.open();
this.registerQueue = new ConcurrentLinkedQueue<>();
}
public static void main(String[] args) throws Exception {
InetAddress addr = InetAddress.getByName(null);
ServerSocketChannel sc = ServerSocketChannel.open();
sc.socket().bind(new InetSocketAddress(addr, 0));
SocketChannel.open(new InetSocketAddress(addr,
sc.socket().getLocalPort()));
SocketChannel accepted = sc.accept();
accepted.configureBlocking(false);
SocketChannel.open(new InetSocketAddress(addr,
sc.socket().getLocalPort()));
SocketChannel accepted2 = sc.accept();
accepted2.configureBlocking(false);
final Selector sel = Selector.open();
SelectionKey key2 = accepted2.register(sel, SelectionKey.OP_READ);
final SelectionKey[] key = new SelectionKey[]{
accepted.register(sel, SelectionKey.OP_READ)};
// thread that will be changing key[0].interestOps to OP_READ | OP_WRITE
new Thread() {
public void run() {
try {
for (int k = 0; k < 15; k++) {
for (int i = 0; i < 10000; i++) {
synchronized (notifyLock) {
synchronized (selectorLock) {
sel.wakeup();
key[0].interestOps(SelectionKey.OP_READ
| SelectionKey.OP_WRITE);
}
notified = false;
long beginTime = System.currentTimeMillis();
while (true) {
notifyLock.wait(5000);
if (notified) {
break;
}
long endTime = System.currentTimeMillis();
if (endTime - beginTime > 5000) {
succTermination = false;
// wake up main thread doing select()
sel.wakeup();
return;
}
}
}
}
}
succTermination = true;
// wake up main thread doing select()
sel.wakeup();
} catch (Exception e) {
System.out.println(e);
succTermination = true;
// wake up main thread doing select()
sel.wakeup();
}
}
}.start();
// main thread will be doing registering/deregistering with the sel
while (true) {
sel.select();
if (Boolean.TRUE.equals(succTermination)) {
System.out.println("Test passed");
sel.close();
sc.close();
break;
} else if (Boolean.FALSE.equals(succTermination)) {
System.out.println("Failed to pass the test");
sel.close();
sc.close();
throw new RuntimeException("Failed to pass the test");
}
synchronized (selectorLock) {
}
if (sel.selectedKeys().contains(key[0]) && key[0].isWritable()) {
synchronized (notifyLock) {
notified = true;
notifyLock.notify();
key[0].cancel();
sel.selectNow();
key2 = accepted2.register(sel, SelectionKey.OP_READ);
key[0] = accepted.register(sel, SelectionKey.OP_READ);
}
}
key2.cancel();
sel.selectedKeys().clear();
}
}