下面列出了怎么用java.nio.channels.spi.SelectorProvider的API类实例代码及写法,或者点击链接到github查看源代码。
public static void main(String[] args) throws Exception {
SelectorProvider sp = SelectorProvider.provider();
Pipe p = sp.openPipe();
Pipe.SinkChannel sink = p.sink();
Pipe.SourceChannel source = p.source();
byte[] someBytes = new byte[0];
ByteBuffer outgoingdata = ByteBuffer.wrap(someBytes);
int totalWritten = 0;
int written = sink.write(outgoingdata);
if (written < 0)
throw new Exception("Write failed");
ByteBuffer incomingdata = ByteBuffer.allocateDirect(0);
int read = source.read(incomingdata);
if (read < 0)
throw new Exception("Read EOF");
sink.close();
source.close();
}
/**
* Initialize the selector
* @return The selector we'll be monitoring
* @throws IOException When the selector can't be created (Usually when the port is already in use)
*/
private Selector initSelector() throws IOException {
// Create a new selector
Selector socketSelector = SelectorProvider.provider().openSelector();
// Create a new non-blocking server socket channel
this.serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
// Bind the server socket to the specified address and port
InetSocketAddress isa = new InetSocketAddress(this.port);
serverChannel.socket().bind(isa);
// Register the server socket channel, indicating an interest in
// accepting new connections
serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);
log.info("Listening on port " + this.port + "...");
return socketSelector;
}
public static void main(String[] args) throws Exception {
SelectorProvider sp = SelectorProvider.provider();
Pipe p = sp.openPipe();
Pipe.SinkChannel sink = p.sink();
Pipe.SourceChannel source = p.source();
byte[] someBytes = new byte[0];
ByteBuffer outgoingdata = ByteBuffer.wrap(someBytes);
int totalWritten = 0;
int written = sink.write(outgoingdata);
if (written < 0)
throw new Exception("Write failed");
ByteBuffer incomingdata = ByteBuffer.allocateDirect(0);
int read = source.read(incomingdata);
if (read < 0)
throw new Exception("Read EOF");
sink.close();
source.close();
}
public static void main(String[] args) throws Exception {
SelectorProvider sp = SelectorProvider.provider();
Pipe p = sp.openPipe();
Pipe.SinkChannel sink = p.sink();
Pipe.SourceChannel source = p.source();
byte[] someBytes = new byte[0];
ByteBuffer outgoingdata = ByteBuffer.wrap(someBytes);
int totalWritten = 0;
int written = sink.write(outgoingdata);
if (written < 0)
throw new Exception("Write failed");
ByteBuffer incomingdata = ByteBuffer.allocateDirect(0);
int read = source.read(incomingdata);
if (read < 0)
throw new Exception("Read EOF");
sink.close();
source.close();
}
static void test(TestServers.DayTimeServer dayTimeServer) throws Exception {
InetSocketAddress isa
= new InetSocketAddress(dayTimeServer.getAddress(),
dayTimeServer.getPort());
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
sc.connect(isa);
// Prepare a selector
Selector selector = SelectorProvider.provider().openSelector();
try {
SelectionKey key = sc.register(selector, SelectionKey.OP_CONNECT);
int keysAdded = selector.select();
if (keysAdded > 0) {
keysAdded = selector.select(1000);
if (keysAdded > 0)
throw new Exception("Same key reported added twice");
}
} finally {
selector.close();
sc.close();
}
}
public static void main(String[] args) throws Exception {
SelectorProvider sp = SelectorProvider.provider();
Pipe p = sp.openPipe();
Pipe.SinkChannel sink = p.sink();
Pipe.SourceChannel source = p.source();
byte[] someBytes = new byte[0];
ByteBuffer outgoingdata = ByteBuffer.wrap(someBytes);
int totalWritten = 0;
int written = sink.write(outgoingdata);
if (written < 0)
throw new Exception("Write failed");
ByteBuffer incomingdata = ByteBuffer.allocateDirect(0);
int read = source.read(incomingdata);
if (read < 0)
throw new Exception("Read EOF");
sink.close();
source.close();
}
public void testOpen() {
MockServerSocketChannel testMSChnl = new MockServerSocketChannel(null);
MockServerSocketChannel testMSChnlnotnull = new MockServerSocketChannel(
SelectorProvider.provider());
assertEquals(SelectionKey.OP_ACCEPT, testMSChnlnotnull.validOps());
assertNull(testMSChnl.provider());
assertNotNull(testMSChnlnotnull.provider());
assertNotNull(this.serverChannel.provider());
assertEquals(testMSChnlnotnull.provider(), this.serverChannel
.provider());
}
/**
* Initializes a new instance of this class.
*/
public SctpServerChannelImpl(SelectorProvider provider)
throws IOException {
//TODO: update provider remove public modifier
super(provider);
this.fd = SctpNet.socket(true);
this.fdVal = IOUtil.fdVal(fd);
this.state = ChannelState.INUSE;
}
/**
* Constructor for normal connecting sockets
*/
public SctpChannelImpl(SelectorProvider provider) throws IOException {
//TODO: update provider remove public modifier
super(provider);
this.fd = SctpNet.socket(true);
this.fdVal = IOUtil.fdVal(fd);
this.state = ChannelState.UNCONNECTED;
}
/**
* Creates a new client manager which uses Java NIO for socket management. Uses a single thread to handle all select
* calls.
*/
public NioClientManager() {
try {
selector = SelectorProvider.provider().openSelector();
} catch (IOException e) {
throw new RuntimeException(e); // Shouldn't ever happen
}
}
SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, InetSocketAddress isa)
throws IOException
{
super(sp);
this.fd = fd;
this.fdVal = IOUtil.fdVal(fd);
synchronized (stateLock) {
this.localAddress = Net.localAddress(fd);
this.remoteAddress = isa;
this.state = ST_CONNECTED;
}
}
/**
* Initializes a new instance of this class.
*/
public SctpServerChannelImpl(SelectorProvider provider)
throws IOException {
//TODO: update provider remove public modifier
super(provider);
this.fd = SctpNet.socket(true);
this.fdVal = IOUtil.fdVal(fd);
this.state = ChannelState.INUSE;
}
private static DatagramChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each DatagramChannel.open() otherwise.
*
* See <a href="See https://github.com/netty/netty/issues/2308">#2308</a>.
*/
return provider.openDatagramChannel();
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}
@Override
public void start(int port) throws IOException {
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress("0.0.0.0", port));
serverSocketChannel.configureBlocking(false);
selector = SelectorProvider.provider().openSelector();
serverThread = new ServerThread();
serverThread.start();
}
public ProxyThreadPools(SelectorProvider selectorProvider, int incomingAcceptorThreads, int incomingWorkerThreads, int outgoingWorkerThreads, String serverGroupName, int serverGroupId) {
clientToProxyAcceptorPool = new NioEventLoopGroup(incomingAcceptorThreads, new CategorizedThreadFactory(serverGroupName, "ClientToProxyAcceptor", serverGroupId), selectorProvider);
clientToProxyWorkerPool = new NioEventLoopGroup(incomingWorkerThreads, new CategorizedThreadFactory(serverGroupName, "ClientToProxyWorker", serverGroupId), selectorProvider);
clientToProxyWorkerPool.setIoRatio(90);
proxyToServerWorkerPool = new NioEventLoopGroup(outgoingWorkerThreads, new CategorizedThreadFactory(serverGroupName, "ProxyToServerWorker", serverGroupId), selectorProvider);
proxyToServerWorkerPool.setIoRatio(90);
}
public static void main(String[] args) throws Exception {
for (int x=0; x<100; x++) {
SelectorProvider sp = SelectorProvider.provider();
Pipe p = sp.openPipe();
Pipe.SinkChannel sink = p.sink();
Pipe.SourceChannel source = p.source();
ByteBuffer outgoingdata = ByteBuffer.allocateDirect(10);
byte[] someBytes = new byte[10];
generator.nextBytes(someBytes);
outgoingdata.put(someBytes);
outgoingdata.flip();
int totalWritten = 0;
while (totalWritten < 10) {
int written = sink.write(outgoingdata);
if (written < 0)
throw new Exception("Write failed");
totalWritten += written;
}
ByteBuffer incomingdata = ByteBuffer.allocateDirect(10);
int totalRead = 0;
do {
int bytesRead = source.read(incomingdata);
if (bytesRead > 0)
totalRead += bytesRead;
} while(totalRead < 10);
for(int i=0; i<10; i++)
if (outgoingdata.get(i) != incomingdata.get(i))
throw new Exception("Pipe failed");
sink.close();
source.close();
}
}
static void scaleTest(TestServers.DayTimeServer daytimeServer)
throws Exception
{
InetAddress myAddress = daytimeServer.getAddress();
InetSocketAddress isa
= new InetSocketAddress(myAddress, daytimeServer.getPort());
for (int j=0; j<LIMIT; j++) {
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
boolean connected = sc.connect(isa);
if (!connected) {
Selector RSelector = SelectorProvider.provider().openSelector();
SelectionKey RKey = sc.register (RSelector, SelectionKey.OP_CONNECT);
while (!connected) {
int keysAdded = RSelector.select(100);
if (keysAdded > 0) {
Set<SelectionKey> readyKeys = RSelector.selectedKeys();
Iterator<SelectionKey> i = readyKeys.iterator();
while (i.hasNext()) {
SelectionKey sk = i.next();
SocketChannel nextReady = (SocketChannel)sk.channel();
connected = nextReady.finishConnect();
}
readyKeys.clear();
}
}
RSelector.close();
}
readAndClose(sc);
}
}
private void init() throws IOException {
provider = SelectorProvider.provider();
datagramChannel = provider.openDatagramChannel();
datagramChannel.socket().setSoTimeout(this.readTimeout);
InetSocketAddress address = new InetSocketAddress(this.host, this.port);
session = new UdpSession(this, address);
connectModel = ConnectModel.CLIENT;
this.connectType = ConnectType.UDP;
}
public void testValidOps() {
MockDatagramChannel testMock = new MockDatagramChannel(SelectorProvider
.provider());
MockDatagramChannel testMocknull = new MockDatagramChannel(null);
int val = this.channel1.validOps();
assertEquals(5, val);
assertEquals(val, testMock.validOps());
assertEquals(val, testMocknull.validOps());
}
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
static void test(TestServers.DayTimeServer daytimeServer) throws Exception {
InetSocketAddress isa
= new InetSocketAddress(daytimeServer.getAddress(),
daytimeServer.getPort());
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
final boolean immediatelyConnected = sc.connect(isa);
Selector selector = SelectorProvider.provider().openSelector();
try {
SelectionKey key = sc.register(selector, SelectionKey.OP_CONNECT);
int keysAdded = selector.select();
if (keysAdded > 0) {
boolean result = sc.finishConnect();
if (result) {
keysAdded = selector.select(5000);
// 4750573: keysAdded should not be incremented when op is dropped
// from a key already in the selected key set
if (keysAdded > 0)
throw new Exception("Test failed: 4750573 detected");
Set<SelectionKey> sel = selector.selectedKeys();
Iterator<SelectionKey> i = sel.iterator();
SelectionKey sk = i.next();
// 4737146: isConnectable should be false while connected
if (sk.isConnectable())
throw new Exception("Test failed: 4737146 detected");
}
} else {
if (!immediatelyConnected) {
throw new Exception("Select failed");
} else {
System.out.println("IsConnectable couldn't be fully tested for "
+ System.getProperty("os.name"));
}
}
} finally {
sc.close();
selector.close();
}
}
static void test1(TestServers.DayTimeServer daytimeServer) throws Exception {
Selector selector = SelectorProvider.provider().openSelector();
InetAddress myAddress = daytimeServer.getAddress();
InetSocketAddress isa
= new InetSocketAddress(myAddress,
daytimeServer.getPort());
for (int j=0; j<LIMIT; j++) {
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
boolean result = sc.connect(isa);
// On some platforms - given that we're using a local server,
// we may not enter into the if () { } statement below...
if (!result) {
SelectionKey key = sc.register(selector,
SelectionKey.OP_CONNECT);
while (!result) {
int keysAdded = selector.select(100);
if (keysAdded > 0) {
Set readyKeys = selector.selectedKeys();
Iterator i = readyKeys.iterator();
while (i.hasNext()) {
SelectionKey sk = (SelectionKey)i.next();
SocketChannel nextReady =
(SocketChannel)sk.channel();
result = nextReady.finishConnect();
}
}
}
key.cancel();
}
read(sc);
}
selector.close();
}
public static void main(String[] args) throws Exception {
// Load necessary classes ahead of time
DatagramChannel dc = DatagramChannel.open();
Exception se = new SocketException();
SelectorProvider sp = SelectorProvider.provider();
Pipe p = sp.openPipe();
ServerSocketChannel ssc = ServerSocketChannel.open();
test1();
test2();
test3();
test4();
}
public SctpMultiChannelImpl(SelectorProvider provider)
throws IOException {
//TODO: update provider, remove public modifier
super(provider);
this.fd = SctpNet.socket(false /*one-to-many*/);
this.fdVal = IOUtil.fdVal(fd);
}
/**
* Initializes a new instance of this class.
*/
public SctpServerChannelImpl(SelectorProvider provider)
throws IOException {
//TODO: update provider remove public modifier
super(provider);
this.fd = SctpNet.socket(true);
this.fdVal = IOUtil.fdVal(fd);
this.state = ChannelState.INUSE;
}
/**
* Initializes a new instance of this class.
*/
public SctpServerChannelImpl(SelectorProvider provider)
throws IOException {
//TODO: update provider remove public modifier
super(provider);
this.fd = SctpNet.socket(true);
this.fdVal = IOUtil.fdVal(fd);
this.state = ChannelState.INUSE;
}
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
public SctpMultiChannelImpl(SelectorProvider provider)
throws IOException {
//TODO: update provider, remove public modifier
super(provider);
this.fd = SctpNet.socket(false /*one-to-many*/);
this.fdVal = IOUtil.fdVal(fd);
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
* See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
*/
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
private static DatagramChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each DatagramChannel.open() otherwise.
*
* See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
*/
return provider.openDatagramChannel();
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}