下面列出了io.netty.channel.socket.oio.OioSocketChannel#io.netty.channel.oio.OioEventLoopGroup 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void main(String[] args) throws Exception {
EventLoopGroup group = new OioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(RxtxChannel.class)
.handler(new ChannelInitializer<RxtxChannel>() {
@Override
public void initChannel(RxtxChannel ch) throws Exception {
ch.pipeline().addLast(
new LineBasedFrameDecoder(32768),
new StringEncoder(),
new StringDecoder(),
new RxtxClientHandler()
);
}
});
ChannelFuture f = b.connect(new RxtxDeviceAddress(PORT)).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public void server(int port) throws Exception {
final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
EventLoopGroup group = new OioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(group).channel(OioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);
}
});
}
});
ChannelFuture f = b.bind().sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception {
EventLoopGroup group = new OioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(RxtxChannel.class)
.handler(new ChannelInitializer<RxtxChannel>() {
@Override
public void initChannel(RxtxChannel ch) throws Exception {
ch.pipeline().addLast(
new LineBasedFrameDecoder(32768),
new StringEncoder(),
new StringDecoder(),
new RxtxClientHandler()
);
}
});
ChannelFuture f = b.connect(new RxtxDeviceAddress(PORT)).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new OioEventLoopGroup(1);
EventLoopGroup workerGroup = new OioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(OioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerHandler());
}
});
ChannelFuture f = b.bind(8888).sync();
f.channel().closeFuture().sync();
}
finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
private Future<?> startServerInPooledThread() {
if (!started.compareAndSet(false, true)) {
return null;
}
return myApplication.executeOnPooledThread(() -> {
try {
ImportantFolderLocker locker = StartupUtil.getLocker();
BuiltInServer mainServer = locker instanceof ImportantFolderLockerViaBuiltInServer ? ((ImportantFolderLockerViaBuiltInServer)locker).getServer() : null;
if (mainServer == null || mainServer.getEventLoopGroup() instanceof OioEventLoopGroup) {
server = BuiltInServer.start(1, getDefaultPort(), PORTS_COUNT, false, null);
}
else {
server = BuiltInServer.start(mainServer.getEventLoopGroup(), false, getDefaultPort(), PORTS_COUNT, true, null);
}
bindCustomPorts(server);
}
catch (Throwable e) {
LOG.info(e);
NOTIFICATION_GROUP.getValue().createNotification("Cannot start internal HTTP server. Git integration, Some plugins may operate with errors. " +
"Please check your firewall settings and restart " + ApplicationNamesInfo.getInstance().getFullProductName(),
NotificationType.ERROR).notify(null);
return;
}
LOG.info("built-in server started, port " + server.getPort());
Disposer.register(myApplication, server);
});
}
public void server(int port)
throws Exception {
final ByteBuf buf =
Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", StandardCharsets.UTF_8));
EventLoopGroup group = new OioEventLoopGroup();
try {
//创建 ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
b.group(group)
//使用 OioEventLoopGroup以允许阻塞模式(旧的I/O)
.channel(OioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
//指定 ChannelInitializer,对于每个已接受的连接都调用它
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(
//添加一个 ChannelInboundHandlerAdapter以拦截和处理事件
new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(
ChannelHandlerContext ctx)
throws Exception {
ctx.writeAndFlush(buf.duplicate())
.addListener(
//将消息写到客户端,并添加 ChannelFutureListener,
//以便消息一被写完就关闭连接
ChannelFutureListener.CLOSE);
}
});
}
});
//绑定服务器以接受连接
ChannelFuture f = b.bind().sync();
f.channel().closeFuture().sync();
} finally {
//释放所有的资源
group.shutdownGracefully().sync();
}
}
@Test(timeout = 3000)
public void testAddInListenerOio() throws Throwable {
testAddInListener(new OioSocketChannel(), new OioEventLoopGroup(1));
}
@Override
protected EventLoopGroup newEventLoopGroup() {
return new OioEventLoopGroup();
}
public RpcClient(OioEventLoopGroup eventLoopGroup, EventExecutorGroup eventExecutor) {
this(eventLoopGroup, eventExecutor, OioSocketChannel.class);
}
public RpcServer(OioEventLoopGroup eventLoopGroup, EventExecutorGroup eventExecutor, @Assisted SocketAddress address) {
this(eventLoopGroup, eventExecutor, OioServerSocketChannel.class, address);
}
@Test
public void worksWithOioEventLoopGroupFactory() {
assertThat(resolveSocketChannelFactory(new OioEventLoopGroup()).newChannel()).isInstanceOf(OioSocketChannel.class);
}
public static Bootstrap oioClientBootstrap() {
Bootstrap bootstrap = new Bootstrap().group(new OioEventLoopGroup(1, PooledThreadExecutor.INSTANCE)).channel(OioSocketChannel.class);
bootstrap.option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true);
return bootstrap;
}