下面列出了io.netty.channel.ChannelInitializer#io.netty.channel.socket.nio.NioSocketChannel 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void start(int maxThread) {
EventLoopGroup mainGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup(maxThread);
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(mainGroup, workGroup).
channel(NioServerSocketChannel.class).
childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
setupPipeline(ch.pipeline());
}
});
try {
serverChannel = serverBootstrap.bind(port).sync().channel();
serverChannel.closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testClient() throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress("localhost", PORT))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture f = b.connect().sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
/**
* This method will configure a worker EventLoopGroup and a Channel for use by a client. It will
* try to use the correct SocketChannel for the provided workerGroup.
*
* @param workerGroup uses EventLoopGroup in the ClientChannelConfiguration
* @return ClientChannelConfiguration
*/
public static ClientChannelConfiguration clientConfig(EventLoopGroup workerGroup) {
EventLoopGroup parent = workerGroup;
if (parent instanceof EventLoop) {
parent = ((EventLoop) workerGroup).parent();
}
Class<? extends Channel> channelClass;
if (parent instanceof EpollEventLoopGroup) {
channelClass = EpollSocketChannel.class;
} else if (parent instanceof NioEventLoopGroup) {
channelClass = NioSocketChannel.class;
} else {
throw new RuntimeException("Unsupported EventLoopGroup " + workerGroup.getClass());
}
return new ClientChannelConfiguration(workerGroup, channelClass);
}
/**
* 异步建立连接localHost
*
* @param hostAndPort 服务器地址
* @param sndBuffer socket发送缓冲区
* @param rcvBuffer socket接收缓冲区
* @param connectTimeoutMs 建立连接超时时间
* @param initializer channel初始化类,根据使用的协议(eg:tcp,ws) 和 序列化方式(eg:json,protoBuf)确定
* @return channelFuture 注意使用{@link ChannelFuture#sync()} 会抛出异常。
* 使用{@link ChannelFuture#await()} 和{@link ChannelFuture#isSuccess()} 安全处理。
* 此外,使用channel 需要调用 {@link Channel#isActive()}检查是否成功和远程建立连接
*/
public ChannelFuture connectAsyn(HostAndPort hostAndPort, int sndBuffer, int rcvBuffer, int connectTimeoutMs,
ChannelInitializer<SocketChannel> initializer) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(initializer);
bootstrap.option(ChannelOption.SO_KEEPALIVE, false);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.option(ChannelOption.SO_SNDBUF, sndBuffer);
bootstrap.option(ChannelOption.SO_RCVBUF, rcvBuffer);
bootstrap.option(ChannelOption.SO_LINGER, 0);
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, WRITE_BUFFER_WATER_MARK);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs);
return bootstrap.connect(hostAndPort.getHost(), hostAndPort.getPort());
}
/**
* Establishing TCP connection to server
*
* @param remoteAddress remote address
* */
public ChannelFuture connectToServer(final InetSocketAddress remoteAddress) {
if (remoteAddress == null) {
throw new IllegalStateException("remote address is null");
}
Bootstrap bootstrap = new Bootstrap().group(_upstreamWorkerGroup);
bootstrap.channelFactory(NioSocketChannel::new);
ServerChannelHandler serverChannelHandler = new ServerChannelHandler(this);
bootstrap.handler(new ChannelInitializer<Channel>() {
protected void initChannel(Channel ch)
throws Exception {
initChannelPipeline(ch.pipeline(), serverChannelHandler, _serverConnectionIdleTimeoutMsec);
_serverChannel = ch;
}
});
LOG.debug("Server channel is ready. About to connect....");
return bootstrap.connect(remoteAddress);
}
@SuppressFBWarnings("SIC_INNER_SHOULD_BE_STATIC_ANON")
private Channel connectServer(int port) throws Exception {
final OzymandiasClientHandler handler = new OzymandiasClientHandler(this);
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
SSLEngine engine = SSLContextFactory.getClientContext().createSSLEngine();
engine.setUseClientMode(true);
ch.pipeline().addLast(
new SslHandler(engine),
new ObjectEncoder(),
new ObjectDecoder(OzymandiasServer.maxObjectSize, ClassResolvers.cacheDisabled(null)),
handler);
}
});
return b.connect("127.0.0.1", port).sync().channel();
}
public void start() {
ClientChannelInitializer clientChannelInitializer = new ClientChannelInitializer();
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(clientChannelInitializer)
;
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
doConnect();
}
private HttpClient(){
EventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 客户端接收到的是httpResponse响应,所以要使用HttpResponseDecoder进行解码
ch.pipeline().addLast(new HttpResponseDecoder());
// 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码
ch.pipeline().addLast(new HttpRequestEncoder());
try {
SSLContext context = SslUtil.createSSLContext("JKS","inchat.jks","123456");
SSLEngine engine = context.createSSLEngine();
engine.setUseClientMode(true);
// engine.setNeedClientAuth(false);
ch.pipeline().addLast("ssl",new SslHandler(engine));
}catch (Exception e){
e.printStackTrace();
}
}
});
this.bootstrap = b;
}
public void connectTo(final Peer peer, final String host, final int port, final CompletableFuture<Void> futureToNotify) {
final PeerChannelHandler handler = new PeerChannelHandler(config, peer);
final PeerChannelInitializer initializer = new PeerChannelInitializer(config, encoder, peerEventLoopGroup, handler);
final Bootstrap clientBootstrap = new Bootstrap();
clientBootstrap.group(networkEventLoopGroup).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
.handler(initializer);
final ChannelFuture connectFuture = clientBootstrap.connect(host, port);
if (futureToNotify != null) {
connectFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
futureToNotify.complete(null);
LOGGER.info("Successfully connect to {}:{}", host, port);
} else {
futureToNotify.completeExceptionally(future.cause());
LOGGER.error("Could not connect to " + host + ":" + port, future.cause());
}
}
});
}
}
@Override
protected void doOpen() throws Throwable {
com.alibaba.dubbo.remoting.transport.netty4.NettyHelper.setNettyLoggerFactory();
bootstrap = new Bootstrap();
// config
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(WORKER_GROUP);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout());
final com.alibaba.dubbo.remoting.transport.netty4.NettyHandler nettyHandler = new com.alibaba.dubbo.remoting.transport.netty4.NettyHandler(getUrl(), this);
bootstrap.handler(new ChannelInitializer() {
public void initChannel(Channel ch) {
com.alibaba.dubbo.remoting.transport.netty4.NettyCodecAdapter adapter = new com.alibaba.dubbo.remoting.transport.netty4.NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ChannelPipeline channelPipeline = ch.pipeline();
channelPipeline.addLast("decoder", adapter.getDecoder());
channelPipeline.addLast("encoder", adapter.getEncoder());
channelPipeline.addLast("handler", nettyHandler);
}
});
}
public void start() {
new Thread(() -> {
group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(30232)) //"woodswang",
.handler(KyChannelInitializer.newInstance());
ChannelFuture channelFuture = null;
try {
channelFuture = bootstrap.connect().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
startListenerHandle(channelFuture, launchListener);
}).start();
}
public NettyServerClient(int serverType,int serverId, String host, int port) {
this.serverType = serverType;
this.serverId = serverId;
this.address = new InetSocketAddress(host, port);
netEventService = BeanHelper.getServiceBean(NetEventService.class);
eventService = BeanHelper.getServiceBean(EventService.class);
bootstrap = new Bootstrap(); // (1)
bootstrap.group(eventLoopGroup); // (2)
bootstrap.channel(NioSocketChannel.class); // (3)
bootstrap.option(ChannelOption.SO_KEEPALIVE, true); // (4)
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new DefaultNettyEncoder(),
new DefaultNettyDecoder(),
new NettyClientHandler()
);
}
});
}
@Override
protected void doStart(Listener listener) throws Throwable {
workerGroup = new NioEventLoopGroup(http_work, new DefaultThreadFactory(ThreadNames.T_HTTP_CLIENT));
b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.option(ChannelOption.TCP_NODELAY, true);
b.option(ChannelOption.SO_REUSEADDR, true);
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 4000);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("decoder", new HttpResponseDecoder());
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(maxContentLength));
ch.pipeline().addLast("encoder", new HttpRequestEncoder());
ch.pipeline().addLast("handler", new HttpClientHandler(NettyHttpClient.this));
}
});
timer = new HashedWheelTimer(new NamedThreadFactory(T_HTTP_TIMER), 1, TimeUnit.SECONDS, 64);
listener.onSuccess();
}
/**
* 初始化 连接后端真正服务器
*/
private void initRealServerBoot() {
//初始化
realServerBootstrap = new Bootstrap();
realServerGroup = new NioEventLoopGroup();
realServerBootstrap.group(realServerGroup);
realServerBootstrap.channel(NioSocketChannel.class);
realServerBootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TCPHandler());
ch.pipeline().addLast(new HttpResponseDecoder());
ch.pipeline().addLast(new HttpObjectAggregator(maxContentLength));
ch.pipeline().addLast(new HttpSendHandler());
}
});
}
private static Channel newClient(SocketAddress server) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(GROUP)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000)
.handler(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
fail("In this test the client is never receiving a message from the server.");
}
});
return bootstrap.connect(server)
.syncUninterruptibly()
.channel();
}
@Override
protected ChannelHandler newChannelInitializer() {
return new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline cp = ch.pipeline();
cp.addLast("frame", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 2, 0, 2));
cp.addLast("prepender", FRAME_PREPENDER);
cp.addLast("decoder", GPB_DECODER_HANDLER);
cp.addLast("encoder", GPB_ENCODER_HANDLER);
// handler
cp.addLast("handler", serverHandler);
// cp.addLast("handler", new ServerHandler());
}
};
}
@SuppressWarnings("unchecked")
@Override
public List<BootstrapFactory<Bootstrap>> clientSocket() {
return Arrays.asList(
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(KQUEUE_WORKER_GROUP).channel(KQueueSocketChannel.class);
}
},
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(nioWorkerGroup).channel(NioSocketChannel.class);
}
}
);
}
@Test
public void responseConnectionReused_shouldReleaseChannel() throws Exception {
ChannelFactory channelFactory = mock(ChannelFactory.class);
EventLoopGroup customEventLoopGroup = new NioEventLoopGroup(1);
NioSocketChannel channel = new NioSocketChannel();
when(channelFactory.newChannel()).thenAnswer((Answer<NioSocketChannel>) invocationOnMock -> channel);
SdkEventLoopGroup eventLoopGroup = SdkEventLoopGroup.create(customEventLoopGroup, channelFactory);
NettyNioAsyncHttpClient customClient =
(NettyNioAsyncHttpClient) NettyNioAsyncHttpClient.builder()
.eventLoopGroup(eventLoopGroup)
.maxConcurrency(1)
.build();
makeSimpleRequest(customClient);
verifyChannelRelease(channel);
assertThat(channel.isShutdown()).isFalse();
customClient.close();
eventLoopGroup.eventLoopGroup().shutdownGracefully().awaitUninterruptibly();
}
private void startSendMessage() {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.DEBUG));
p.addLast(new NettySimpleMessageHandler());
p.addLast(new SetMessageHandler(PsyncLatencyTest.this));
}
});
b.connect(master);
}
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
bootstrap = new Bootstrap();
// config
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(WORKER_GROUP);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout());
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
bootstrap.handler(new ChannelInitializer() {
public void initChannel(Channel ch) {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ChannelPipeline channelPipeline = ch.pipeline();
channelPipeline.addLast("decoder", adapter.getDecoder());
channelPipeline.addLast("encoder", adapter.getEncoder());
channelPipeline.addLast("handler", nettyHandler);
}
});
}
public void connect(String host, int port) throws Exception{
this.bootstrap = new Bootstrap();
final CountDownLatch latch = new CountDownLatch(1);
bootstrap.group(new NioEventLoopGroup(1))
.channel(NioSocketChannel.class)
.handler(new TrafficGeneratorClientHandler());
System.out.println("Creating connection");
ChannelFuture cf = bootstrap.connect(host,port);
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println("Connected");
if (future.isSuccess()) {
channel = future.channel();
} else {
future.cause().printStackTrace();
future.channel().close();
throw new RuntimeException(future.cause());
}
latch.countDown();
}
});
latch.await();
}
/**
* 重连
*/
public void doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup) {
try {
if (bootstrap != null) {
bootstrap.group(eventLoopGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new NettyClientInitializer());
bootstrap.remoteAddress(host, port);
f = bootstrap.connect().addListener((ChannelFuture futureListener) -> {
final EventLoop eventLoop = futureListener.channel().eventLoop();
if (!futureListener.isSuccess()) {
log.info("与服务端断开连接!在10s之后准备尝试重连!");
eventLoop.schedule(() -> doConnect(new Bootstrap(), eventLoop), 10, TimeUnit.SECONDS);
}
});
if(initFalg){
log.info("Netty客户端启动成功!");
initFalg=false;
}
}
} catch (Exception e) {
log.info("客户端连接失败!"+e.getMessage());
}
}
public static void newClientBootstrap(String host, int port, ChannelInitializer<SocketChannel> initializer){
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
ChannelFuture f = b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new LoggingHandler(LogLevel.INFO))
.handler(initializer)
.connect(host, port).sync();
f.channel().closeFuture().sync();
} catch (Exception e){
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
@Test
public void simple() throws Exception {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup(1))
.channel(NioSocketChannel.class)
.handler(new TrafficGeneratorClientHandler());
final CountDownLatch latch = new CountDownLatch(1);
bootstrap.connect("localhost",8010).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if(future.isSuccess()){
future.channel().writeAndFlush(Unpooled.buffer().capacity(256).writeZero(256));
}else {
System.err.println("Connection attempt failed");
future.cause().printStackTrace();
}
latch.countDown();
}
});
latch.await();
}
public static SslClientInitializer<NioSocketChannel>
createSslClientInitializerWithSystemTrustStoreAndClientAuthentication(
SslProvider sslProvider,
Function<Channel, String> hostProvider,
Function<Channel, Integer> portProvider,
Supplier<PrivateKey> privateKeySupplier,
Supplier<ImmutableList<X509Certificate>> certificateChainSupplier) {
return new SslClientInitializer<>(
sslProvider,
hostProvider,
portProvider,
ImmutableList.of(),
privateKeySupplier,
certificateChainSupplier);
}
public NetTCPClient(DataManager data, MessageReceivedListener listener) {
super(data, listener, "TCP Client");
EventLoopGroup bossGroup = new NioEventLoopGroup();
client = new Bootstrap();
client.group(bossGroup);
client.channel(NioSocketChannel.class);
client.handler(initializer);
}
public SlackSender() {
try {
sslCtx = SslContext.newClientContext(InsecureTrustManagerFactory.INSTANCE);
bootstrap.group(group)
.channel(Epoll.isAvailable() ? EpollSocketChannel.class : NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true);
} catch (SSLException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
String host = "127.0.0.1" ;
int port = 11211 ;
EventLoopGroup group = new NioEventLoopGroup() ;
try {
Bootstrap bootstrap = new Bootstrap() ;
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new CustomerHandleInitializer())
;
ChannelFuture future = bootstrap.connect(host, port).sync();
CustomProtocol customProtocol = new CustomProtocol() ;
customProtocol.setHeader(99999L);
customProtocol.setContent("你好 netty");
future.channel().writeAndFlush(customProtocol) ;
future.channel().closeFuture().sync() ;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
@Test
public void connectionInactive_shouldReleaseChannel() throws Exception {
ChannelFactory channelFactory = mock(ChannelFactory.class);
EventLoopGroup customEventLoopGroup = new NioEventLoopGroup(1);
NioSocketChannel channel = new NioSocketChannel();
when(channelFactory.newChannel()).thenAnswer((Answer<NioSocketChannel>) invocationOnMock -> channel);
SdkEventLoopGroup eventLoopGroup = SdkEventLoopGroup.create(customEventLoopGroup, channelFactory);
NettyNioAsyncHttpClient customClient =
(NettyNioAsyncHttpClient) NettyNioAsyncHttpClient.builder()
.eventLoopGroup(eventLoopGroup)
.maxConcurrency(1)
.build();
String body = randomAlphabetic(10);
URI uri = URI.create("http://localhost:" + mockServer.port());
SdkHttpRequest request = createRequest(uri);
RecordingResponseHandler recorder = new RecordingResponseHandler();
stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withBody(body)
.withStatus(500)
.withFault(Fault.RANDOM_DATA_THEN_CLOSE)));
customClient.execute(AsyncExecuteRequest.builder()
.request(request)
.requestContentPublisher(createProvider(""))
.responseHandler(recorder).build());
verifyChannelRelease(channel);
assertThat(channel.isShutdown()).isTrue();
customClient.close();
eventLoopGroup.eventLoopGroup().shutdownGracefully().awaitUninterruptibly();
}
private HttpClient() {
// Configure the client.
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new HttpClientInitializer(mHttpClientListener));
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECTION_TIMEOUT_MILLIS);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.option(ChannelOption.SO_RCVBUF, 65536 * 3); // added in 2017-07-14
bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator( 65536 * 3 )); // added in 2017-07-14
}