下面列出了java.nio.channels.Selector#wakeup() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Close Selector.
*
* @see org.apache.catalina.tribes.transport.ReceiverBase#stop()
*/
protected void stopListening() {
setListen(false);
Selector selector = this.selector.get();
if (selector != null) {
try {
// Unlock the thread if is is blocked waiting for input
selector.wakeup();
// Wait for the receiver thread to finish
int count = 0;
while (running && count < 50) {
Thread.sleep(100);
count ++;
}
if (running) {
log.warn(sm.getString("NioReceiver.stop.threadRunning"));
}
closeSelector();
} catch (Exception x) {
log.error("Unable to close cluster receiver selector.", x);
} finally {
this.selector.set(null);
}
}
}
public static void main(String[] args) throws Exception {
final Selector sel = Selector.open();
Runnable r = new Runnable() {
public void run() {
try {
sel.select();
} catch (IOException x) {
x.printStackTrace();
}
}
};
// start thread to block in Selector
Thread t = new Thread(r);
t.start();
// give thread time to start
Thread.sleep(1000);
// interrupt, close, and wakeup is the magic sequence to provoke the NPE
t.interrupt();
sel.close();
sel.wakeup();
}
@Test
void cancelledKeyRemovedFromChannel() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
for (int i = 0; i < 1000; ++i) {
assertNull(source.keyFor(selector));
SelectionKey key = source.register(selector, OP_READ);
selector.selectedKeys().clear();
selector.selectNow();
key.cancel();
selector.wakeup();
selector.selectedKeys().clear();
selector.selectNow();
}
}
public static void main(String argv[]) throws Exception {
int waitTime = 4000;
Selector selector = Selector.open();
try {
selector.wakeup();
long t1 = System.currentTimeMillis();
selector.select(waitTime);
long t2 = System.currentTimeMillis();
long totalTime = t2 - t1;
if (totalTime > waitTime)
throw new RuntimeException("Test failed");
} finally {
selector.close();
}
}
public void wakeup()
{
try
{
Selector selector = _selector;
if (selector!=null)
selector.wakeup();
}
catch(Exception e)
{
addChange(new ChangeTask()
{
public void run()
{
renewSelector();
}
});
renewSelector();
}
}
protected final void registerTask(Runnable task) {
taskQueue.add(task);
Selector selector = this.selector;
if (selector != null) {
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
} else {
if (taskQueue.remove(task)) {
// the selector was null this means the Worker has already been
// shutdown.
throw new RejectedExecutionException("Worker has already been shutdown");
}
}
}
public static void main(String argv[]) throws Exception {
int waitTime = 4000;
Selector selector = Selector.open();
try {
selector.wakeup();
long t1 = System.currentTimeMillis();
selector.select(waitTime);
long t2 = System.currentTimeMillis();
long totalTime = t2 - t1;
if (totalTime > waitTime)
throw new RuntimeException("Test failed");
} finally {
selector.close();
}
}
@Test
void cancelledKeyRemovedFromChannel() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
for (int i = 0; i < 1000; ++i) {
assertNull(source.keyFor(selector));
SelectionKey key = source.register(selector, OP_READ);
selector.selectedKeys().clear();
selector.selectNow();
key.cancel();
selector.wakeup();
selector.selectedKeys().clear();
selector.selectNow();
}
}
public static void main(String argv[]) throws Exception {
int waitTime = 4000;
Selector selector = Selector.open();
try {
selector.wakeup();
long t1 = System.currentTimeMillis();
selector.select(waitTime);
long t2 = System.currentTimeMillis();
long totalTime = t2 - t1;
if (totalTime > waitTime)
throw new RuntimeException("Test failed");
} finally {
selector.close();
}
}
/**
* Close Selector.
*
* @see org.apache.catalina.tribes.transport.ReceiverBase#stop()
*/
protected void stopListening() {
setListen(false);
Selector selector = this.selector.get();
if (selector != null) {
try {
// Unlock the thread if is is blocked waiting for input
selector.wakeup();
// Wait for the receiver thread to finish
int count = 0;
while (running && count < 50) {
Thread.sleep(100);
count ++;
}
if (running) {
log.warn(sm.getString("NioReceiver.stop.threadRunning"));
}
closeSelector();
} catch (Exception x) {
log.error("Unable to close cluster receiver selector.", x);
} finally {
this.selector.set(null);
}
}
}
public static void main(String argv[]) throws Exception {
int waitTime = 4000;
Selector selector = Selector.open();
try {
selector.wakeup();
long t1 = System.currentTimeMillis();
selector.select(waitTime);
long t2 = System.currentTimeMillis();
long totalTime = t2 - t1;
if (totalTime > waitTime)
throw new RuntimeException("Test failed");
} finally {
selector.close();
}
}
void reply(XulHttpServerResponse serverResponse) {
_response = serverResponse;
serverResponse.addHeaderIfNotExists("Content-Type", "text/html")
.addHeaderIfNotExists("Connection", "close");
final String transferEncoding = _response.headers.get("Transfer-Encoding");
_sendChunkedData = "chunked".equals(transferEncoding);
serverResponse.prepareResponseData();
_responseBuffer = ByteBuffer.wrap(serverResponse.getData(), 0, serverResponse.getDataSize());
try {
Selector selector = _server._selector;
_socketChannel.register(selector, SelectionKey.OP_WRITE, this);
selector.wakeup();
} catch (ClosedChannelException e) {
clear();
XulLog.e(TAG, e);
}
}
public void run(Timeout timeout) throws Exception {
// This is needed to prevent a possible race that can lead to a NPE
// when the selector is closed before this is run
//
// See https://github.com/netty/netty/issues/685
Selector selector = NioClientBoss.this.selector;
if (selector != null) {
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
}
public void addEvent(Runnable event) {
Selector selector = this.selector.get();
if ( selector != null ) {
synchronized (events) {
events.add(event);
}
if ( log.isTraceEnabled() ) log.trace("Adding event to selector:"+event);
if ( isListening() ) selector.wakeup();
}
}
@Test
void selectorRemovesKeysOnChannelCloseWhileSelecting() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
SelectionKey key = source.register(selector, OP_READ);
assertTrue(selector.keys().contains(key));
CountDownLatch latch = new CountDownLatch(1);
Future<?> job = executor.submit(() -> {
latch.countDown();
try {
selector.select();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
latch.await();
Thread.sleep(100);
source.close();
selector.wakeup();
job.get();
assertFalse(selector.keys().contains(key));
}
@Test
void selectorRemovesKeysOnCancelWhileSelecting() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
SelectionKey key = source.register(selector, OP_READ);
assertTrue(selector.keys().contains(key));
CountDownLatch latch = new CountDownLatch(1);
Future<?> job = executor.submit(() -> {
latch.countDown();
try {
selector.select();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
latch.await();
Thread.sleep(100);
key.cancel();
assertTrue(selector.keys().contains(key));
assertSame(key, source.keyFor(selector));
selector.wakeup();
job.get();
assertFalse(selector.keys().contains(key));
assertNull(source.keyFor(selector));
}
@Override
public synchronized void shutdown()
{
shutdownFlag.set(true);
Selector srvSel = serverSelector;
if (srvSel != null)
{
srvSel.wakeup();
}
}
public void setInterestOps(final AbstractNioChannel<?> channel, final ChannelFuture future, final int interestOps) {
boolean iothread = isIoThread(channel);
if (!iothread) {
channel.getPipeline().execute(new Runnable() {
public void run() {
setInterestOps(channel, future, interestOps);
}
});
return;
}
boolean changed = false;
try {
Selector selector = this.selector;
SelectionKey key = channel.channel.keyFor(selector);
// Override OP_WRITE flag - a user cannot change this flag.
int newInterestOps = interestOps & ~Channel.OP_WRITE | channel.getRawInterestOps() & Channel.OP_WRITE;
if (key == null || selector == null) {
if (channel.getRawInterestOps() != newInterestOps) {
changed = true;
}
// Not registered to the worker yet.
// Set the rawInterestOps immediately; RegisterTask will pick it up.
channel.setRawInterestOpsNow(newInterestOps);
future.setSuccess();
if (changed) {
if (iothread) {
fireChannelInterestChanged(channel);
} else {
fireChannelInterestChangedLater(channel);
}
}
return;
}
if (channel.getRawInterestOps() != newInterestOps) {
changed = true;
key.interestOps(newInterestOps);
if (Thread.currentThread() != thread &&
wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
channel.setRawInterestOpsNow(newInterestOps);
}
future.setSuccess();
if (changed) {
fireChannelInterestChanged(channel);
}
} catch (CancelledKeyException e) {
// setInterestOps() was called on a closed channel.
ClosedChannelException cce = new ClosedChannelException();
future.setFailure(cce);
fireExceptionCaught(channel, cce);
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
void notifyWritable() throws IOException {
if (_responseBuffer == null) {
return;
}
final SocketChannel socketChannel = _socketChannel;
socketChannel.write(_responseBuffer);
if (!_responseBuffer.hasRemaining()) {
if (_response.hasUserBodyStream()) {
final Selector selector = _server._selector;
final XulHttpServerHandler attachment = this;
socketChannel.register(selector, 0, attachment);
selector.wakeup();
_server._reactorPool.execute(new Runnable() {
@Override
public void run() {
try {
int beginOffset = _sendChunkedData ? 32 : 0;
int endOffset = _sendChunkedData ? 2 : 0;
int sizeLimit = _sendChunkedData ? 8192 : -1;
if (_response == null || !_response.prepareUserBodyData(beginOffset, endOffset, sizeLimit)) {
terminate();
return;
}
int dataSize = _response.getDataSize();
if (dataSize <= 0) {
if (_sendChunkedData) {
_response.writeStream(null);
_responseBuffer = ByteBuffer.wrap("0\r\n\r\n".getBytes());
} else {
terminate();
return;
}
} else {
final byte[] data = _response.getData();
if (_sendChunkedData) {
String dataLength = String.format("%X\r\n", dataSize);
final byte[] dataLengthBytes = dataLength.getBytes();
beginOffset -= dataLengthBytes.length;
System.arraycopy(dataLengthBytes, 0, data, beginOffset, dataLengthBytes.length);
dataSize += dataLengthBytes.length;
data[beginOffset + dataSize++] = '\r';
data[beginOffset + dataSize++] = '\n';
}
_responseBuffer = ByteBuffer.wrap(data, beginOffset, dataSize);
}
socketChannel.register(selector, SelectionKey.OP_WRITE, attachment);
selector.wakeup();
} catch (Exception e) {
terminate();
XulLog.e(TAG, e);
}
}
});
} else {
socketChannel.close();
}
return;
}
}
@Override
public Peer connect(InetSocketAddress address, Node node) throws IOException
{
Selector srvSel = serverSelector;
Peer peer;
if (srvSel != null)
{
SocketChannel socketChannel = null;
try
{
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.socket().setTcpNoDelay(true);
String peerId = address.getAddress().getHostAddress() + ":" + address.getPort();
SelectionKey connKey;
synchronized (syncObj)
{
srvSel.wakeup();
boolean connected = socketChannel.connect(address);
if (connected)
{
// if connect is true, we will never receive an OP_CONNECT
// even if we register for it.
// as the controller does not know about this peer (we didnt return yet)
// we will register for no operation.
// As soon as the controller tries to send a message, that will trigger the OP_WRITE anyways
connKey = socketChannel.register(srvSel, 0);
}
else
{
// if connect returns false we will receive OP_CONNECT
// and we will need to call the finishConnection()
connKey = socketChannel.register(srvSel, OP_CONNECT);
}
peer = createTcpConnectorPeer(peerId, connKey, true, node);
connKey.attach(peer);
if (connected)
{
// May throw SSLException
peer.connectionEstablished();
connObserver.outboundConnectionEstablished(peer);
}
else
{
connObserver.outboundConnectionEstablishing(peer);
}
try
{
node.setPeer(privilegedAccCtx, peer);
}
catch (AccessDeniedException accDeniedExc)
{
throw new ImplementationError(
"TcpConnectorService privileged access context not authorized for node.setPeer() " +
"called from connect()",
accDeniedExc
);
}
}
}
catch (IOException ioExc)
{
try
{
if (socketChannel != null)
{
socketChannel.close();
}
}
catch (IOException ignored)
{
}
throw ioExc;
}
catch (IllegalBlockingModeException blkModeException)
{
throw new IOException(
"Connect request failed - Non-blocking I/O mode requested, but not supported"
);
}
catch (IllegalSelectorException | ClosedSelectorException |
CancelledKeyException connExc)
{
throw new IOException(
"Connect request failed - Connector service '" + serviceInstanceName + "' state changed " +
"while the operation was in progress"
);
}
}
else
{
throw new IOException(
"Connect request failed - Connector service '" + serviceInstanceName + "' is stopped"
);
}
return peer;
}