下面列出了怎么用java.nio.channels.SocketChannel的API类实例代码及写法,或者点击链接到github查看源代码。
void initialise(SSLContext ctx, SocketChannel channel) {
try {
channel.configureBlocking(false);
engine = ctx.createSSLEngine();
engine.setUseClientMode(!isAcceptor);
if (isAcceptor) {
engine.setNeedClientAuth(true);
}
outboundApplicationData = ByteBuffer.allocateDirect(engine.getSession().getApplicationBufferSize());
outboundEncodedData = ByteBuffer.allocateDirect(engine.getSession().getPacketBufferSize());
inboundApplicationData = ByteBuffer.allocateDirect(engine.getSession().getApplicationBufferSize());
inboundEncodedData = ByteBuffer.allocateDirect(engine.getSession().getPacketBufferSize());
// eliminates array creation on each call to SSLEngine.wrap()
precomputedWrapArray = new ByteBuffer[]{outboundApplicationData};
precomputedUnwrapArray = new ByteBuffer[]{inboundApplicationData};
new Handshaker().performHandshake(engine, channel);
} catch (IOException e) {
throw new RuntimeException("Unable to perform handshake at " + Instant.now(), e);
}
}
@SuppressWarnings("resource")
public static void main(String[] args) throws IOException {
var server = ServerSocketChannel.open();
server.bind(new InetSocketAddress(7777));
System.out.println("server bound to " + server.getLocalAddress());
var remote = SocketChannel.open();
remote.connect(new InetSocketAddress(InetAddress.getByName(Host.NAME), 7));
//remote.configureBlocking(false);
System.out.println("accepting ...");
var client = server.accept();
//client.configureBlocking(false);
var executor = Executors.newSingleThreadExecutor();
//var executor = ForkJoinPool.commonPool();
Thread.builder().virtual(executor).task(runnable(client, remote)).start();
Thread.builder().virtual(executor).task(runnable(remote, client)).start();
}
protected SSLEngine newSslEngine(SocketChannel channel) throws IOException
{
SSLEngine sslEngine;
if (channel != null)
{
String peerHost = channel.socket().getInetAddress().getHostAddress();
int peerPort = channel.socket().getPort();
sslEngine = _sslContextFactory.newSslEngine(peerHost, peerPort);
}
else
{
sslEngine = _sslContextFactory.newSslEngine();
}
sslEngine.setUseClientMode(true);
sslEngine.beginHandshake();
return sslEngine;
}
protected ChannelDispatcher createChannelReader(final ProcessContext context, final String protocol, final BlockingQueue<ByteBuffer> bufferPool,
final BlockingQueue<RawSyslogEvent> events, final int maxConnections,
final SSLContextService sslContextService, final Charset charset) throws IOException {
final EventFactory<RawSyslogEvent> eventFactory = new RawSyslogEventFactory();
if (UDP_VALUE.getValue().equals(protocol)) {
return new DatagramChannelDispatcher(eventFactory, bufferPool, events, getLogger());
} else {
// if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
SSLContext sslContext = null;
SslContextFactory.ClientAuth clientAuth = null;
if (sslContextService != null) {
final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
sslContext = sslContextService.createSSLContext(SslContextFactory.ClientAuth.valueOf(clientAuthValue));
clientAuth = SslContextFactory.ClientAuth.valueOf(clientAuthValue);
}
final ChannelHandlerFactory<RawSyslogEvent<SocketChannel>, AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>();
return new SocketChannelDispatcher(eventFactory, handlerFactory, bufferPool, events, getLogger(), maxConnections, sslContext, clientAuth, charset);
}
}
Session accept() {
try {
SocketChannel fix = serverChannel.accept();
if (fix == null)
return null;
try {
fix.setOption(StandardSocketOptions.TCP_NODELAY, true);
fix.configureBlocking(false);
return new Session(orderEntry, fix, config, instruments);
} catch (IOException e1) {
fix.close();
return null;
}
} catch (IOException e2) {
return null;
}
}
public static SocketChannel connect(SocketAddress remote, final int timeoutMillis) {
SocketChannel sc = null;
try {
sc = SocketChannel.open();
sc.configureBlocking(true);
sc.socket().setSoLinger(false, -1);
sc.socket().setTcpNoDelay(true);
sc.socket().setReceiveBufferSize(1024 * 64);
sc.socket().setSendBufferSize(1024 * 64);
sc.socket().connect(remote, timeoutMillis);
sc.configureBlocking(false);
return sc;
} catch (Exception e) {
if (sc != null) {
try {
sc.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
return null;
}
/** Register a channel
* @param channel
* @param att Attached Object
*/
public void register(SocketChannel channel, Object att)
{
// The ++ increment here is not atomic, but it does not matter.
// so long as the value changes sometimes, then connections will
// be distributed over the available sets.
int s=_set++;
if (s<0)
s=-s;
s=s%_selectSets;
SelectSet[] sets=_selectSet;
if (sets!=null)
{
SelectSet set=sets[s];
set.addChange(channel,att);
set.wakeup();
}
}
private final void finishConnection(final SelectionKey key, final MMOConnection<T> con)
{
try
{
((SocketChannel) key.channel()).finishConnect();
}
catch (IOException e)
{
con.getClient().onForcedDisconnection();
closeConnectionImpl(key, con);
}
// key might have been invalidated on finishConnect()
if (key.isValid())
{
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
}
}
private static boolean doRead(SocketChannel channel, Work work) throws IOException {
buffer.clear();
boolean done = false;
int numBytesRead = channel.read(buffer);
if (numBytesRead == -1) {
work.success = true;
done = true;
} else if (numBytesRead > 0) {
buffer.flip();
int bufSizeRemaining = work.buffer.remaining();
// see if the bytes read can fit into the buffer
if (bufSizeRemaining >= numBytesRead) {
// buffer has enough space left
work.buffer.put(buffer);
} else if (bufSizeRemaining > 0) {
// buffer doesn't have enough space, will discard bytes that don't fit
buffer.limit(bufSizeRemaining);
work.buffer.put(buffer);
}
}
return done;
}
/**
* todo: check that code is right
*
* @return
*/
public boolean checkOpen() {
SocketChannel channel = channel();
boolean open = !hasClosed() && channel.isOpen() && channel.isConnected();
if (open) {
ByteBuffer allocate = ByteBuffer.allocate(0);
boolean close;
try {
close = -1 == channel.read(allocate);
} catch (IOException e) {
close = true;
}
if (close) {
this.close(false, "check open");
return false;
}
return true;
}
return false;
}
@Test
public void testProcess() throws IOException {
// TODO FIX
AbstractReferenceCounted.disableReferenceTracing();
try (@NotNull EventLoop eg = new EventGroup(true)) {
eg.start();
TCPRegistry.createServerSocketChannelFor(desc);
@NotNull AcceptorEventHandler eah = new AcceptorEventHandler(desc,
simpleTcpEventHandlerFactory(EchoRequestHandler::new, wireType),
VanillaNetworkContext::new);
eg.addHandler(eah);
SocketChannel sc = TCPRegistry.createSocketChannel(desc);
sc.configureBlocking(false);
// testThroughput(sc);
testLatency(desc, wireType, sc);
eg.stop();
TcpChannelHub.closeAllHubs();
}
}
private void sendFile(String filename, SocketChannel sock) throws IOException {
if (filename.isEmpty()) {
return;
}
FileInputStream is = new FileInputStream(new File(filename));
FileChannel source = is.getChannel();
ByteBuffer sendBuf = ByteBuffer.allocateDirect(fileSendingChunk);
while (source.read(sendBuf) > 0) {
sendBuf.flip();
if (log.isDebugEnabled()) {
log.debug("Sending " + sendBuf);
}
sock.write(sendBuf);
sendBuf.rewind();
}
source.close();
}
/**
* 该方法仅Reactor自身创建的主动连接使用
*/
@SuppressWarnings("unchecked")
protected void processConnectKey(SelectionKey curKey) throws IOException {
T session = (T) curKey.attachment();
setCurSession(session);
SocketChannel channel = (SocketChannel) curKey.channel();
NIOHandler curNIOHandler = session.getCurNIOHandler();
if (curNIOHandler instanceof BackendNIOHandler) {
BackendNIOHandler handler = (BackendNIOHandler) curNIOHandler;
try {
if (channel.finishConnect()) {
handler.onConnect(curKey, session, true, null);
}else {
handler.onConnect(curKey, session, false, new ConnectException());
}
} catch (Exception e) {
handler.onConnect(curKey, session, false, e);
}
}
}
/**
* Initialize the SocketStream with a new Socket.
*
* @param s the new socket.
*/
public void init(SocketChannel s)
{
_s = s;
try {
s.setOption(StandardSocketOptions.TCP_NODELAY, true);
} catch (Exception e) {
e.printStackTrace();;
}
//_is = null;
//_os = null;
_needsFlush = false;
_readBuffer.clear().flip();
_writeBuffer.clear();
}
@Override
public void handle(SocketChannel socketChannel, IOException e, ChannelContext channelContext, String customMsg)
{
String reasonString = "";
if (channelContext != null)
{
StringBuilder buffer = new StringBuilder();
String _customMsg = customMsg == null ? "IOException" : customMsg;
buffer.append(channelContext.getId() + " " + _customMsg);
if (e != null)
{
log.error(buffer.toString(), e);
reasonString = e.getMessage();
} else
{
log.error(buffer.toString());
}
}
NioUtils.remove(channelContext, reasonString);
}
/**
* @throws IOException
*/
private void initialize() throws IOException {
try {
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_CONNECT);
socketChannel.connect(new InetSocketAddress(hostIp, hostListenningPort));
} catch (Exception e) {
e.printStackTrace();
}
new TCPClientReadThread(this, selector, imei);
}
public FrontendConnection make(SocketChannel channel) throws IOException {
Socket socket = channel.socket();
socket.setReceiveBufferSize(socketRecvBuffer);
socket.setSendBufferSize(socketSendBuffer);
socket.setTcpNoDelay(true);
socket.setKeepAlive(true);
FrontendConnection c = getConnection(channel);
c.setPacketHeaderSize(packetHeaderSize);
c.setMaxPacketSize(maxPacketSize);
c.setWriteQueue(new BufferQueue(writeQueueCapcity));
c.setIdleTimeout(idleTimeout);
c.setCharset(charset);
return c;
}
private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
{
SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
LOG.debug("created {}",endp);
endPointOpened(endp);
_endPoints.put(endp,this);
return endp;
}
public void setTcpNoDelay(boolean on) {
this.tcpNoDelay = on;
if (this.session != null && this.session instanceof TCPSession) {
try {
((SocketChannel) ((TCPSession) this.session).getChannel()).socket().setTcpNoDelay(on);
} catch (Exception ex) {
logger.error(ex.getLocalizedMessage());
}
}
}
@SuppressWarnings("unchecked")
private void finishConnect(SelectionKey key, Object att) {
ClosableConnection c = (ClosableConnection) att;
try {
//做原生NIO连接是否完成的判断和操作
if (finishConnect(c, (SocketChannel) c.getSocketChannel())) {
clearSelectionKey(key);
//c.setId( ConnectIdGenerator.getINSTNCE().getId() );
//与特定NIOReactor绑定监听读写
NIOReactor reactor = reactorPool.getNextReactor();
reactor.postRegister(c);
}
} catch (Throwable e) {
String host = "";
int port = 0;
if (c != null) {
host = c.getHost();
port = c.getPort();
}
LOGGER.warn("caught err : host=" + host + ":" + port, e);
//异常, 将key清空
clearSelectionKey(key);
c.close(e.toString());
c.getHandler().onConnectFailed(c, new Exception(e.getMessage()));
}
}
public ClearVolumeTCPClientRunnable(SocketChannel pSocketChannel,
VolumeSinkInterface pVolumeSink,
int pMaxInUseVolumes)
{
mSocketChannel = pSocketChannel;
mVolumeSink = pVolumeSink;
mVolumeManager = mVolumeSink.getManager();
}
protected void logTrace(final Exception e, final SelectionKey key, final int loc) {
if (s_logger.isTraceEnabled()) {
Socket socket = null;
if (key != null) {
final SocketChannel ch = (SocketChannel)key.channel();
if (ch != null) {
socket = ch.socket();
}
}
s_logger.trace("Location " + loc + ": Socket " + socket + " closed on read. Probably -1 returned.");
}
}
private void writeOP(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
List<byte[]> channelData = keepDataTrack.get(socketChannel);
Iterator<byte[]> its = channelData.iterator();
while (its.hasNext()) {
byte[] it = its.next();
its.remove();
socketChannel.write(ByteBuffer.wrap(it));
}
key.interestOps(SelectionKey.OP_READ);
}
private void close(SocketChannel sc) {
if (sc == null) { return; }
try {
sc.close();
} catch (IOException ex) {
LOG.error("[CRAFT-ATOM-NIO] Close exception", ex);
}
}
private void writeBuffer(SocketChannel channel, HTTPResponse response) throws IOException
{
int byteRead = channel.write(response.buffer);
if(byteRead == -1)
{
throw new IOException("End of Stream");
}
}
/**
* Send a response to a request id
* @param requestID request id
* @param message message
* @return true if response was accepted
*/
public boolean sendResponse(RequestID requestID, Message message) {
if (!requestChannels.containsKey(requestID)) {
LOG.log(Level.SEVERE, "Trying to send a response to non-existing request");
return false;
}
SocketChannel channel = requestChannels.get(requestID);
if (channel == null) {
LOG.log(Level.SEVERE, "Channel is NULL for response");
}
if (!workerChannels.containsKey(channel) && !channel.equals(clientChannel)) {
LOG.log(Level.WARNING, "Failed to send response on disconnected socket");
return false;
}
TCPMessage tcpMessage = sendMessage(message, requestID, channel);
if (tcpMessage != null) {
requestChannels.remove(requestID);
return true;
} else {
return false;
}
}
private byte[] readFrameDataCheck(ByteBuffer byteBuffer,
SocketChannel socketChannel, int frameType, long length ,
boolean hasMask ) throws IOException
{
int shift = (int)(length>>32);
if ( shift != 0 ){
throw new RuntimeException("Data frame is too big. " +
"Cannot handle it. Implementation should be rewritten.");
}
return readFrameData(byteBuffer, socketChannel, frameType, (int)length , hasMask );
}
ChannelHandler(SocketChannel socket, int ops, Transport transport) throws IOException {
this.socket = socket;
socket.configureBlocking(false);
key = socket.register(selector, ops, this);
this.transport = transport;
transport.setContext(this);
}
public StandardCommsSession(final String hostname, final int port, final int timeoutMillis) throws IOException {
socketChannel = SocketChannel.open();
socketChannel.socket().connect(new InetSocketAddress(hostname, port), timeoutMillis);
socketChannel.configureBlocking(false);
in = new SocketChannelInputStream(socketChannel);
bufferedIn = new InterruptableInputStream(new BufferedInputStream(in));
out = new SocketChannelOutputStream(socketChannel);
bufferedOut = new InterruptableOutputStream(new BufferedOutputStream(out));
this.hostname = hostname;
this.port = port;
}
@Test(enabled=false, timeOut = 15000)
public void testDirectConnectionToEchoServer() throws IOException {
SocketChannel client = SocketChannel.open();
try {
client.connect(new InetSocketAddress("localhost", doubleEchoServer.getServerSocketPort()));
writeToSocket(client, "Knock\n".getBytes());
String response = readFromSocket(client);
client.close();
assertEquals(response, "Knock Knock\n");
} finally {
client.close();
}
}