下面列出了io.netty.channel.Channel#localAddress ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel userChannel = ctx.channel();
InetSocketAddress sa = (InetSocketAddress) userChannel.localAddress();
int port = sa.getPort();
log.info("a new connection connect to port:{}", port);
NatClientImage natClientImage = clientManager.getClient(port);
if (natClientImage == null) {
log.error("nat channel not ready! reject connect");
ctx.close();
return;
}
Long seq = natClientImage.onNewConnection(ctx.channel());
log.info("create a new connect from user endpoint, with port:{} ,client:{} ,seq:{}", port, natClientImage.getClientId(), seq);
userChannel.config().setOption(ChannelOption.AUTO_READ, false);
NatMessage natMessage = new NatMessage();
natMessage.setType(NatMessage.TYPE_CONNECT);
natMessage.setSerialNumber(seq);
natClientImage.getNatChannel().writeAndFlush(natMessage);
super.channelActive(ctx);
}
@Override
public void bind(SocketAddress address) throws IOException {
InetSocketAddress inetAddress = (InetSocketAddress) address;
ChannelFuture f = bootstrap.bind(inetAddress);
Channel channel = f.channel();
channelGroup.add(channel);
try {
f.sync();
final SocketAddress bound = channel.localAddress();
boundAddresses.put(bound, channel);
channel.closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
boundAddresses.remove(bound);
}
});
} catch (Exception e) {
throw Helper.toIOException(e);
}
}
private String getRequestURL(Channel channel,HttpRequest req){
StringBuffer url = new StringBuffer();
String scheme = isSecure(channel)?"https":"http";
InetSocketAddress addr = (InetSocketAddress)channel.localAddress();
int port = addr.getPort();
String urlPath = req.getUri();
url.append(scheme); // http, https
url.append("://");
url.append(EnFactory.getEnHost().getHostAddress());
if (("http".equalsIgnoreCase(scheme) && port != 80)
|| ("https".equalsIgnoreCase(scheme) && port != 443)) {
url.append(':');
url.append(port);
}
url.append(urlPath);
return url.toString();
}
@Override
public void bind(SocketAddress address) throws IOException {
InetSocketAddress inetAddress = (InetSocketAddress) address;
ChannelFuture f = bootstrap.bind(inetAddress);
Channel channel = f.channel();
channelGroup.add(channel);
try {
f.sync();
SocketAddress bound = channel.localAddress();
boundAddresses.put(bound, channel);
channel.closeFuture().addListener(fut -> {
boundAddresses.remove(bound);
});
} catch (Exception e) {
throw Helper.toIOException(e);
}
}
@Override
public void bind(SocketAddress address) throws IOException {
InetSocketAddress inetAddress = (InetSocketAddress) address;
ChannelFuture f = bootstrap.bind(inetAddress);
Channel channel = f.channel();
channelGroup.add(channel);
try {
f.sync();
SocketAddress bound = channel.localAddress();
boundAddresses.put(bound, channel);
channel.closeFuture().addListener(fut -> {
boundAddresses.remove(bound);
});
} catch (Exception e) {
throw Helper.toIOException(e);
}
}
private void removeListeningPorts(Collection<Integer> ports) {
if (cg == null) {
return;
}
Iterator<Channel> itr = cg.iterator();
while (itr.hasNext()) {
Channel c = itr.next();
SocketAddress addr = c.localAddress();
if (addr instanceof InetSocketAddress) {
InetSocketAddress inetAddr = (InetSocketAddress) addr;
Integer port = inetAddr.getPort();
if (ports.contains(port)) {
log.info("No longer listening for OF switch connections on {}", port);
c.close();
itr.remove();
}
}
}
}
/**
* Locates the best matching FilterChain to the channel from the current listener and if found
* returns the DownstreamTlsContext from that FilterChain, else null.
*/
@Nullable
public DownstreamTlsContext getDownstreamTlsContext(Channel channel) {
if (curListener != null && channel != null) {
SocketAddress localAddress = channel.localAddress();
checkState(
localAddress instanceof InetSocketAddress,
"Channel localAddress is expected to be InetSocketAddress");
InetSocketAddress localInetAddr = (InetSocketAddress) localAddress;
checkState(
port == localInetAddr.getPort(),
"Channel localAddress port does not match requested listener port");
List<FilterChain> filterChains = curListener.getFilterChains();
FilterChainComparator comparator = new FilterChainComparator(localInetAddr);
FilterChain bestMatch =
filterChains.isEmpty() ? null : Collections.max(filterChains, comparator);
if (bestMatch != null && comparator.isMatching(bestMatch.getFilterChainMatch())) {
return bestMatch.getDownstreamTlsContext();
}
}
return null;
}
private void closeChannle(ChannelHandlerContext ctx) {
if (ctx != null && ctx.channel() != null && ctx.channel().isActive()) {
Channel userChannel = ctx.channel();
InetSocketAddress sa = (InetSocketAddress) userChannel.localAddress();
logger.info("客户端({})认证失败或者连接异常", sa.getHostName());
ctx.channel().close();
}
}
public static Object getKey(String domain, Channel userChannel) {
Matcher matcher = ipPattern.matcher(domain);
if (matcher.find() || domain.startsWith("localhost")) {
// 通过ip 访问,则返回端口
InetSocketAddress sa = (InetSocketAddress) userChannel.localAddress();
return sa.getPort();
}
String[] all = domain.split(":");
domain = all[0];
return domain;
}
/**
* Parse the local address of the channel.
*
* @param channel
* @return
*/
public static String parseLocalAddress(final Channel channel) {
if (null == channel) {
return StringUtils.EMPTY;
}
final SocketAddress local = channel.localAddress();
return doParse(local != null ? local.toString().trim() : StringUtils.EMPTY);
}
/**
* Parse the local host port of the channel.
*
* @param channel
* @return int
*/
public static int parseLocalPort(final Channel channel) {
if (null == channel) {
return -1;
}
final InetSocketAddress local = (InetSocketAddress) channel.localAddress();
if (local != null) {
return local.getPort();
}
return -1;
}
@Override
protected void startUp() throws Exception {
channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
EventLoopGroup bossGroup = new NioEventLoopGroup(NUM_BOSS_THREADS,
new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("boss-thread").build());
EventLoopGroup workerGroup = new NioEventLoopGroup(NUM_WORKER_THREADS,
new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("worker-thread#%d").build());
bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
channelGroup.add(ch);
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("codec", new HttpServerCodec());
pipeline.addLast("compressor", new HttpContentCompressor());
pipeline.addLast("aggregator", new HttpObjectAggregator(MAX_INPUT_SIZE));
pipeline.addLast("handler", new ReportHandler());
}
});
Channel serverChannel = bootstrap.bind(new InetSocketAddress(host, 0)).sync().channel();
channelGroup.add(serverChannel);
bindAddress = (InetSocketAddress) serverChannel.localAddress();
url = URI.create(String.format("http://%s:%d", host, bindAddress.getPort())).toURL();
LOG.info("Tracker service started at {}", url);
}
/**
* When on the server, returns the bind address,
* when on the client, returns the remote address.
*
* @return {@link SocketAddress}
*/
default SocketAddress address(){
Channel c = channel();
if (c instanceof DatagramChannel) {
SocketAddress a = c.remoteAddress();
return a != null ? a : c.localAddress();
}
return c.remoteAddress();
}
/**
* Extract the local address from the current server.
* <p>
* Because no api is available to accomplish this, it currently uses a very ugly reflexive
* approach.
*
* @param server Server to extract local address from.
* @return an InetSocketAddress
* @throws Exception if something goes wrong (which it should).
*/
private InetSocketAddress extractInetSocketAddress(final Server server) throws Exception {
final ServerImpl impl = (ServerImpl) server;
final Field transportServerField = ServerImpl.class.getDeclaredField("transportServer");
transportServerField.setAccessible(true);
final Object transportServer = transportServerField.get(impl);
final Field channelField = transportServer.getClass().getDeclaredField("channel");
channelField.setAccessible(true);
final Channel channel = (Channel) channelField.get(transportServer);
return (InetSocketAddress) channel.localAddress();
}
@Nullable
@Override
@SuppressWarnings("unchecked")
public <A extends SocketAddress> A localAddress() {
final Channel ch = channel();
return ch != null ? (A) ch.localAddress() : null;
}
public void testMulticast(Bootstrap sb, Bootstrap cb) throws Throwable {
MulticastTestHandler mhandler = new MulticastTestHandler();
sb.handler(new SimpleChannelInboundHandler<Object>() {
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
// Nothing will be sent.
}
});
cb.handler(mhandler);
sb.option(ChannelOption.IP_MULTICAST_IF, NetUtil.LOOPBACK_IF);
sb.option(ChannelOption.SO_REUSEADDR, true);
cb.option(ChannelOption.IP_MULTICAST_IF, NetUtil.LOOPBACK_IF);
cb.option(ChannelOption.SO_REUSEADDR, true);
Channel sc = sb.bind(newSocketAddress()).sync().channel();
InetSocketAddress addr = (InetSocketAddress) sc.localAddress();
cb.localAddress(addr.getPort());
if (sc instanceof OioDatagramChannel) {
// skip the test for OIO, as it fails because of
// No route to host which makes no sense.
// Maybe a JDK bug ?
sc.close().awaitUninterruptibly();
return;
}
DatagramChannel cc = (DatagramChannel) cb.bind().sync().channel();
String group = "230.0.0.1";
InetSocketAddress groupAddress = SocketUtils.socketAddress(group, addr.getPort());
cc.joinGroup(groupAddress, NetUtil.LOOPBACK_IF).sync();
sc.writeAndFlush(new DatagramPacket(Unpooled.copyInt(1), groupAddress)).sync();
assertTrue(mhandler.await());
// leave the group
cc.leaveGroup(groupAddress, NetUtil.LOOPBACK_IF).sync();
// sleep a second to make sure we left the group
Thread.sleep(1000);
// we should not receive a message anymore as we left the group before
sc.writeAndFlush(new DatagramPacket(Unpooled.copyInt(1), groupAddress)).sync();
mhandler.await();
sc.close().awaitUninterruptibly();
cc.close().awaitUninterruptibly();
}
@SuppressWarnings("deprecation")
private void testSimpleSend0(Bootstrap sb, Bootstrap cb, ByteBuf buf, boolean bindClient,
final byte[] bytes, int count, WrapType wrapType)
throws Throwable {
Channel sc = null;
Channel cc = null;
try {
cb.handler(new SimpleChannelInboundHandler<Object>() {
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msgs) throws Exception {
// Nothing will be sent.
}
});
final CountDownLatch latch = new CountDownLatch(count);
sc = setupServerChannel(sb, bytes, latch);
if (bindClient) {
cc = cb.bind(newSocketAddress()).sync().channel();
} else {
cb.option(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true);
cc = cb.register().sync().channel();
}
InetSocketAddress addr = (InetSocketAddress) sc.localAddress();
for (int i = 0; i < count; i++) {
switch (wrapType) {
case DUP:
cc.write(new DatagramPacket(buf.retainedDuplicate(), addr));
break;
case SLICE:
cc.write(new DatagramPacket(buf.retainedSlice(), addr));
break;
case READ_ONLY:
cc.write(new DatagramPacket(buf.retain().asReadOnly(), addr));
break;
case NONE:
cc.write(new DatagramPacket(buf.retain(), addr));
break;
default:
throw new Error("unknown wrap type: " + wrapType);
}
}
// release as we used buf.retain() before
cc.flush();
assertTrue(latch.await(10, TimeUnit.SECONDS));
} finally {
// release as we used buf.retain() before
buf.release();
closeChannel(cc);
closeChannel(sc);
}
}
/**
* 处理tcp 请求
*/
private void tcpHandler(ChannelHandlerContext ctx, ByteBuf buf, Integer proxyType) throws Exception {
Channel userChannel = ctx.channel();
InetSocketAddress sa = (InetSocketAddress) userChannel.localAddress();
ProxyChannel proxyChannel = ServerBeanManager.getProxyChannelService().getServerProxy(sa.getPort());
if (proxyChannel == null) {
// 该端口还没有代理客户端
logger.error("端口{} 没有代理客户端", sa.getPort());
userChannel.close();
ReferenceCountUtil.release(buf);
return;
}
ClientNode node = ServerBeanManager.getClientService().get(proxyChannel.getClientKey());
if (node == null || node.getChannel() == null || node.getStatus() != CommonConstant.ClientStatus.ONLINE) {
logger.error("端口{} 没有代理客户端", sa.getPort());
userChannel.close();
ReferenceCountUtil.release(buf);
return;
}
//封装消息
Long sessionID = ServerBeanManager.getUserSessionService().getSessionID(userChannel);
//ProxyRealServer realServer = node.getServerPort2RealServer().get(sa.getPort());
byte[] data = new byte[buf.readableBytes()];
buf.readBytes(data);
buf.release();
Message message = new Message();
message.setClientChannel(node.getChannel());
message.setData(data);
message.setsPort(sa.getPort());
message.setSessionID(sessionID);
message.setType(CommonConstant.MessageType.TYPE_TRANSFER);
message.setProxyType(proxyType.byteValue());
logger.debug("来自{}端口的请求转发至客户端({})", sa.getPort(), node.getClientKey());
ServerBeanManager.getTransferService().toClient(message);
}
@Test
public void waitUntilActiveHandler_firesNegotiation() throws Exception {
EventLoopGroup elg = new DefaultEventLoopGroup(1);
SocketAddress addr = new LocalAddress("addr");
final AtomicReference<Object> event = new AtomicReference<>();
ChannelHandler next = new ChannelInboundHandlerAdapter() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
event.set(evt);
ctx.close();
}
};
Channel s = new ServerBootstrap()
.childHandler(new ChannelInboundHandlerAdapter())
.group(elg)
.channel(LocalServerChannel.class)
.bind(addr)
.sync()
.channel();
Channel c = new Bootstrap()
.handler(new WaitUntilActiveHandler(next))
.channel(LocalChannel.class).group(group)
.connect(addr)
.sync()
.channel();
c.pipeline().fireUserEventTriggered(ProtocolNegotiationEvent.DEFAULT);
SocketAddress localAddr = c.localAddress();
ProtocolNegotiationEvent expectedEvent = ProtocolNegotiationEvent.DEFAULT
.withAttributes(
Attributes.newBuilder()
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, localAddr)
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, addr)
.set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.NONE)
.build());
c.closeFuture().sync();
assertThat(event.get()).isInstanceOf(ProtocolNegotiationEvent.class);
ProtocolNegotiationEvent actual = (ProtocolNegotiationEvent) event.get();
assertThat(actual).isEqualTo(expectedEvent);
s.close();
elg.shutdownGracefully();
}
@Override
protected TransportEvent getTransportEvent(final byte[] message, final Channel channel) {
return new TransportEvent(message, channel.localAddress(), channel.remoteAddress());
}