下面列出了怎么用io.netty.channel.AbstractChannel的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void startService() {
super.startService();
if(this.channel==null){
eventLoopGroup = new NioEventLoopGroup(3);
Bootstrap boot = NettyUtils.buildBootStrap(eventLoopGroup,this);
boot.remoteAddress(this.getHost(), this.getPort());
try {
ChannelFuture f = boot.connect().sync();
f.await();
this.channel = (AbstractChannel)f.channel();
this.fireStartNetListeners();
} catch (InterruptedException e) {
logger.info("interrupted start to exist");
this.stopService();
}
}
}
public static ChannelInitializer<AbstractChannel> createChannelWithDecoder(
final @NonNull BmpSessionFactory sessionFactory, final @NonNull BmpHandlerFactory hf,
final @NonNull BmpSessionListenerFactory slf) {
return new ChannelInitializer<AbstractChannel>() {
@Override
protected void initChannel(final AbstractChannel ch) throws Exception {
ch.pipeline().addLast(hf.getDecoders());
ch.pipeline().addLast(sessionFactory.getSession(ch, slf));
}
};
}
public static ChannelInitializer<AbstractChannel> createChannelWithEncoder(
final @NonNull BmpSessionFactory sessionFactory, final @NonNull BmpHandlerFactory hf,
final @NonNull BmpSessionListenerFactory slf) {
return new ChannelInitializer<AbstractChannel>() {
@Override
protected void initChannel(final AbstractChannel ch) throws Exception {
ch.pipeline().addLast(hf.getEncoders());
ch.pipeline().addLast(sessionFactory.getSession(ch, slf));
}
};
}
public RpcNettyConnector(AbstractChannel channel){
super(null);
this.channel = channel;
if(this.channel!=null){
this.setAddress(channel);
}
}
private RpcNettyConnector newNettyConnector(AbstractChannel channel){
RpcNettyConnector connector = new RpcNettyConnector(channel);
if(parentAcceptor!=null){
parentAcceptor.addConnectorListeners(connector);
connector.setExecutorService(parentAcceptor.getExecutorService());
connector.setExecutorSharable(true);
}else if(parentConnector!=null){
parentConnector.addConnectorListeners(connector);
}
return connector;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
RpcNettyConnector connector = this.newNettyConnector((AbstractChannel)ctx.channel());
connector.startService();
String channelKey = this.getChannelKey(ctx.channel());
if(channelKey!=null){
connectorMap.put(channelKey, connector);
}
super.channelRegistered(ctx);
}
@Test
public void testWriteFailsFastOnClosedChannel() throws Exception {
Bootstrap cb = new Bootstrap();
ServerBootstrap sb = new ServerBootstrap();
cb.group(group1)
.channel(LocalChannel.class)
.handler(new TestHandler());
sb.group(group2)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new TestHandler());
}
});
Channel sc = null;
Channel cc = null;
try {
// Start server
sc = sb.bind(TEST_ADDRESS).sync().channel();
// Connect to the server
cc = cb.connect(sc.localAddress()).sync().channel();
// Close the channel and write something.
cc.close().sync();
try {
cc.writeAndFlush(new Object()).sync();
fail("must raise a ClosedChannelException");
} catch (Exception e) {
assertThat(e, is(instanceOf(ClosedChannelException.class)));
// Ensure that the actual write attempt on a closed channel was never made by asserting that
// the ClosedChannelException has been created by AbstractUnsafe rather than transport implementations.
if (e.getStackTrace().length > 0) {
assertThat(
e.getStackTrace()[0].getClassName(), is(AbstractChannel.class.getName() +
"$AbstractUnsafe"));
e.printStackTrace();
}
}
} finally {
closeChannel(cc);
closeChannel(sc);
}
}
@Test
public void testWriteFailsFastOnClosedChannel() throws Exception {
EventLoopGroup clientGroup = new LocalEventLoopGroup();
EventLoopGroup serverGroup = new LocalEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
ServerBootstrap sb = new ServerBootstrap();
cb.group(clientGroup)
.channel(LocalChannel.class)
.handler(new TestHandler());
sb.group(serverGroup)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new TestHandler());
}
});
// Start server
sb.bind(addr).sync();
// Connect to the server
final Channel cc = cb.connect(addr).sync().channel();
// Close the channel and write something.
cc.close().sync();
try {
cc.writeAndFlush(new Object()).sync();
fail("must raise a ClosedChannelException");
} catch (Exception e) {
assertThat(e, is(instanceOf(ClosedChannelException.class)));
// Ensure that the actual write attempt on a closed channel was never made by asserting that
// the ClosedChannelException has been created by AbstractUnsafe rather than transport implementations.
if (e.getStackTrace().length > 0) {
assertThat(
e.getStackTrace()[0].getClassName(), is(AbstractChannel.class.getName() + "$AbstractUnsafe"));
e.printStackTrace();
}
}
serverGroup.shutdownGracefully();
clientGroup.shutdownGracefully();
serverGroup.terminationFuture().sync();
clientGroup.terminationFuture().sync();
}
ChannelInitializer<AbstractChannel> create(@NonNull BmpSessionFactory sessionFactory,
@NonNull BmpHandlerFactory hf, @NonNull BmpSessionListenerFactory slf);
private void setAddress(AbstractChannel channel){
InetSocketAddress address = (InetSocketAddress)channel.remoteAddress();
this.setHost(address.getHostName());
this.setPort(address.getPort());
}