下面列出了怎么用 io.netty.handler.codec.serialization.ObjectEncoder 的API类实例代码及写法,或者点击链接到github查看源代码。
private void bind() throws InterruptedException {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new ObjectEncoder());
p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
p.addLast(new ServerHandler());
}
});
ChannelFuture f = bootstrap.bind(port).sync();
if (f.isSuccess()) {
log.info(String.format(">>>>>>>>>>>> ligthconf server started, port:%s", port));
}
}
public void doOpen() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new ObjectDecoder(1024*1024, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
pipeline.addLast(new ObjectEncoder());
pipeline.addLast((SimpleChannelInboundHandler)handler);
}
});
serverBootstrap.option(ChannelOption.SO_BACKLOG,1024);
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE,true);
ChannelFuture future = serverBootstrap.bind(address,port).sync();
//future.channel().closeFuture().sync();
}finally{
//workerGroup.shutdownGracefully();
//bossGroup.shutdownGracefully();
}
}
private void bind() throws InterruptedException {
EventLoopGroup boss=new NioEventLoopGroup();
EventLoopGroup worker=new NioEventLoopGroup();
ServerBootstrap bootstrap=new ServerBootstrap();
bootstrap.group(boss,worker);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_BACKLOG, 128);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new ObjectEncoder());
p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
p.addLast(new NettyServerHandler());
}
});
ChannelFuture f= bootstrap.bind(port).sync();
if(f.isSuccess()){
System.out.println("server start---------------");
}
}
public void send() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new ObjectEncoder());
p.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
p.addLast(new PojoClientHandler());
}
});
ChannelFuture future = b.connect(Constants.HOST, Constants.PORT).sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
@SuppressFBWarnings("SIC_INNER_SHOULD_BE_STATIC_ANON")
private Channel connectServer(int port) throws Exception {
final OzymandiasClientHandler handler = new OzymandiasClientHandler(this);
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
SSLEngine engine = SSLContextFactory.getClientContext().createSSLEngine();
engine.setUseClientMode(true);
ch.pipeline().addLast(
new SslHandler(engine),
new ObjectEncoder(),
new ObjectDecoder(OzymandiasServer.maxObjectSize, ClassResolvers.cacheDisabled(null)),
handler);
}
});
return b.connect("127.0.0.1", port).sync().channel();
}
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(
new ObjectEncoder(),
new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
new ObjectEchoServerHandler());
}
});
// Bind and start to accept incoming connections.
b.bind(PORT).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
sslCtx = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
p.addLast(
new ObjectEncoder(),
new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
new ObjectEchoClientHandler());
}
});
// Start the connection attempt.
b.connect(HOST, PORT).sync().channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
private void connect() {
EventLoopGroup group = new NioEventLoopGroup(1);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// support the native serialization of Java
ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.
weakCachingConcurrentResolver(this.getClass().getClassLoader())));
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new ControllerClientHandler());
}
});
while (true) {
try {
Thread.sleep(1000);
channel = bootstrap.connect(host, port).sync().channel();
break;
} catch (Exception e) {
logger.error("\n\tController client startup fail!");
}
}
logger.debug("\n\tController client startup successful!");
}
private void bind() {
// configuring the NIO thread groups of the server
// NioEventLoopGroup is a thread group, and it's specialized for
// network event processing (it is actually a 'Reactor' thread group)
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(1);
try {
// ServerBootstrap is an assistant class of Netty for setting up the NIO server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// maximum number of connections
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.
weakCachingConcurrentResolver(this.getClass().getClassLoader())));
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new ControllerServerHandler());
}
});
// bind the port and wait for synchronization to succeed
ChannelFuture cf = bootstrap.bind(port).sync();
logger.info("\n\tController server startup successful!");
// wait for server-side listening port to close
// if there is no this line, server would be closed right away after setting up
cf.channel().closeFuture().sync();
} catch(Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private void connect() {
// configuring the NIO thread groups of the server
EventLoopGroup group = new NioEventLoopGroup(1);
// Bootstrap is an assistant class of Netty for setting up client
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.
weakCachingConcurrentResolver(this.getClass().getClassLoader())));
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new DataGeneratorClientHandler());
}
});
while (true) {
try {
Thread.sleep(1000);
channel = bootstrap.connect(host, port).sync().channel();
break;
} catch (Exception e) {
logger.info("\n\tData generator client startup fail!");
}
}
logger.info("\n\tData generator client startup successful!");
}
private void bind() {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(1);
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.
weakCachingConcurrentResolver(this.getClass().getClassLoader())));
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new DataGeneratorServerHandler());
}
});
ChannelFuture cf = bootstrap.bind(port).sync();
logger.info("\n\tData generator server startup successful!");
cf.channel().closeFuture().sync();
} catch(Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public void run() {
log.info("Setting up Federated Worker");
EventLoopGroup bossGroup = new NioEventLoopGroup(_nrThreads);
EventLoopGroup workerGroup = new NioEventLoopGroup(_nrThreads);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast("ObjectDecoder",
new ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.weakCachingResolver(ClassLoader.getSystemClassLoader())))
.addLast("ObjectEncoder", new ObjectEncoder())
.addLast("FederatedWorkerHandler", new FederatedWorkerHandler(_seq, _vars));
}
}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
try {
log.info("Starting Federated Worker server at port: " + _port);
ChannelFuture f = b.bind(_port).sync();
log.info("Started Federated Worker at port: " + _port);
f.channel().closeFuture().sync();
}
catch (InterruptedException e) {
log.error("Federated worker interrupted");
}
finally {
log.info("Federated Worker Shutting down.");
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
// p.addLast(new MessageEncoder());
// p.addLast(new MessageDecoder());
p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())));
p.addLast(new ObjectEncoder());
p.addLast(new ServerHandler());
}
@Override
protected void initChannel(Channel channel) throws Exception {
//IdleStateHandler检测心跳.
ChannelPipeline p = channel.pipeline();
p.addLast(new IdleStateHandler(20, 10, 0));
// p.addLast(new MessageDecoder());
// p.addLast(new MessageEncoder());
p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())));
p.addLast(new ObjectEncoder());
p.addLast(new ClientHandler());
}
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(
new ObjectEncoder(),
new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
new ObjectEchoServerHandler());
}
});
// Bind and start to accept incoming connections.
b.bind(PORT).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
sslCtx = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
p.addLast(
new ObjectEncoder(),
new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
new ObjectEchoClientHandler());
}
});
// Start the connection attempt.
b.connect(HOST, PORT).sync().channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ObjectEncoder());
ClassLoader classLoader = this.getClass().getClassLoader();
ClassResolver classResolver = ClassResolvers.weakCachingResolver(classLoader);
socketChannel.pipeline().addLast(new ObjectDecoder(classResolver));
socketChannel.pipeline().addLast(new ClientHandler());
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new ObjectEncoder());
p.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
p.addLast(new PojoServerHandler());
}
});
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
logger.info("server bind port:{}", port);
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public PeerChannelInitializer(Config config, ObjectEncoder encoder, EventExecutorGroup peerChannelHandlerExecutorGroup,
PeerChannelHandler peerChannelHandler) {
this.config = config;
this.encoder = encoder;
this.peerChannelHandlerExecutorGroup = peerChannelHandlerExecutorGroup;
this.peerChannelHandler = peerChannelHandler;
}
public ConnectionService(Config config, EventLoopGroup networkEventLoopGroup, EventLoopGroup peerEventLoopGroup,
ObjectEncoder encoder) {
this.config = config;
this.networkEventLoopGroup = networkEventLoopGroup;
this.peerEventLoopGroup = peerEventLoopGroup;
this.encoder = encoder;
}
@Override
public void initChannel(final SocketChannel channel) {
final ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(
new ReadTimeoutHandler(30, TimeUnit.SECONDS),
new ObjectEncoder(),
new ObjectDecoder(ClassResolvers.softCachingConcurrentResolver(null)),
clientHandler);
}
@Override
public void initChannel(final SocketChannel channel) {
final ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new ObjectEncoder(),
new ObjectDecoder(ClassResolvers.softCachingConcurrentResolver(null)),
serverHandler);
}
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContext.newServerContext(ssc.certificate(), ssc.privateKey());
} else {
sslCtx = null;
}
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(
new ObjectEncoder(),
new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
new ObjectEchoServerHandler());
}
});
// Bind and start to accept incoming connections.
b.bind(PORT).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
sslCtx = SslContext.newClientContext(InsecureTrustManagerFactory.INSTANCE);
} else {
sslCtx = null;
}
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
p.addLast(
new ObjectEncoder(),
new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
new ObjectEchoClientHandler());
}
});
// Start the connection attempt.
b.connect(HOST, PORT).sync().channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public void connect(int port, String host, final String filePath) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
// ch.pipeline().addLast(new StringEncoder());
// ch.pipeline().addLast(new FixedLengthFrameDecoder(100));
// ch.pipeline().addLast(new ChunkedWriteHandler());
// ch.pipeline().addLast(new StringDecoder());
// ch.pipeline().addLast(new EchoClientHandler());
// ch.pipeline().addLast(new
// LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
// ch.pipeline().addLast(new LengthFieldPrepender(4,false));
ch.pipeline().addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(null)));
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new EchoClientHandler(filePath));
}
});
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
@Override
protected void initChannel(SocketChannel ch)
throws Exception {
//使用Netty实现的线程池
// DefaultEventExecutorGroup e1=new DefaultEventExecutorGroup(16);
ChannelPipeline pipeline = ch.pipeline();
// pipeline.addLast("decoder", new MessageDecoder());
// pipeline.addLast("encoder", new MessageEncoder());
// pipeline.addLast(e1,"handler", new CommonHandler());
ch.pipeline().addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(null)));
ch.pipeline().addLast(new ObjectEncoder());
pipeline.addLast("handler", new EchoServerHandler());
}
public static void main(String[] args)
throws Exception {
// Configure SSL.git
final SslContext sslCtx;
if (SSL) {
sslCtx = SslContext.newClientContext(InsecureTrustManagerFactory.INSTANCE);
} else {
sslCtx = null;
}
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
p.addLast(new ObjectEncoder(),
new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
new HelloWorldClientHandler());
}
});
// Start the client.
ChannelFuture channelFuture = bootstrap.connect(HOST, PORT).sync();
// Wait until the connection is closed.
channelFuture.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//使用Netty实现的线程池
// DefaultEventExecutorGroup e1=new DefaultEventExecutorGroup(16);
ChannelPipeline pipeline = ch.pipeline();
// pipeline.addLast("decoder", new MessageDecoder());
// pipeline.addLast("encoder", new MessageEncoder());
// pipeline.addLast(e1,"handler", new CommonHandler());
ch.pipeline().addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(null)));
ch.pipeline().addLast(new ObjectEncoder());
pipeline.addLast("handler", new EchoServerHandler());
}
public void connect(int port, String host, final String filePath) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
// ch.pipeline().addLast(new StringEncoder());
// ch.pipeline().addLast(new FixedLengthFrameDecoder(100));
// ch.pipeline().addLast(new ChunkedWriteHandler());
// ch.pipeline().addLast(new StringDecoder());
// ch.pipeline().addLast(new EchoClientHandler());
// ch.pipeline().addLast(new
// LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
// ch.pipeline().addLast(new LengthFieldPrepender(4,false));
ch.pipeline().addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(null)));
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new EchoClientHandler(filePath));
}
});
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public void run() {
log.info("Setting up Federated Worker");
EventLoopGroup bossGroup = new NioEventLoopGroup(_nrThreads);
EventLoopGroup workerGroup = new NioEventLoopGroup(_nrThreads);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast("ObjectDecoder",
new ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.weakCachingResolver(ClassLoader.getSystemClassLoader())))
.addLast("ObjectEncoder", new ObjectEncoder())
.addLast("FederatedWorkerHandler", new FederatedWorkerHandler(_seq, _vars));
}
}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
try {
log.info("Starting Federated Worker server at port: " + _port);
ChannelFuture f = b.bind(_port).sync();
log.info("Started Federated Worker at port: " + _port);
f.channel().closeFuture().sync();
}
catch (InterruptedException e) {
log.error("Federated worker interrupted");
}
finally {
log.info("Federated Worker Shutting down.");
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}