下面列出了io.netty.channel.ChannelFactory#io.netty.channel.socket.DatagramChannel 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void setupUDP(InetSocketAddress addr, int ssrc) {
tries.set(0);
remoteAddress = addr;
logger.info("Attempting UDP discovery, address: {}, ssrc: {}", addr, ssrc);
if (udpSocket != null && udpSocket.isConnected()) {
udpSocket.close();
}
setupNetty(addr, new ChannelInitializer<DatagramChannel>() {
@Override
protected void initChannel(DatagramChannel ch) {
udpSocket = ch;
ch.pipeline().addLast("handler", new UDPHolepunchHandler(VoiceWebsocket.this));
holePunch(addr, ssrc);
}
}).exceptionally(err -> {
close(ConnectionStatus.ERROR_UDP_UNABLE_TO_CONNECT);
return null;
});
}
public UdpServerChannel(int ioThreads) {
if (ioThreads < 1) {
throw new IllegalArgumentException("IO threads cound can't be less than 1");
}
boolean epollAvailabe = Epoll.isAvailable();
if (!epollAvailabe) {
ioThreads = 1;
}
group = epollAvailabe ? new EpollEventLoopGroup(ioThreads) : new NioEventLoopGroup(ioThreads);
Class<? extends DatagramChannel> channel = epollAvailabe ? EpollDatagramChannel.class : NioDatagramChannel.class;
ChannelInitializer<Channel> initializer = new ChannelInitializer<Channel>() {
final ReadRouteChannelHandler ioReadRoute = new ReadRouteChannelHandler();
@Override
protected void initChannel(Channel ioChannel) throws Exception {
ioChannel.pipeline().addLast(ioReadRoute);
}
};
while (ioThreads-- > 0) {
Bootstrap ioBootstrap = new Bootstrap().group(group).channel(channel).handler(initializer);
if (epollAvailabe) {
ioBootstrap.option(UnixChannelOption.SO_REUSEPORT, true);
}
ioBootstraps.add(ioBootstrap);
}
}
@Override
@SuppressWarnings("unchecked")
public <CHANNEL extends Channel> CHANNEL getChannel(Class<CHANNEL> channelClass) {
if (channelClass.equals(SocketChannel.class)) {
return (CHANNEL) new EpollSocketChannel();
}
if (channelClass.equals(ServerSocketChannel.class)) {
return (CHANNEL) new EpollServerSocketChannel();
}
if (channelClass.equals(DatagramChannel.class)) {
return (CHANNEL) new EpollDatagramChannel();
}
if (channelClass.equals(DomainSocketChannel.class)) {
return (CHANNEL) new EpollDomainSocketChannel();
}
if (channelClass.equals(ServerDomainSocketChannel.class)) {
return (CHANNEL) new EpollServerDomainSocketChannel();
}
throw new IllegalArgumentException("Unsupported channel type: " + channelClass.getSimpleName());
}
@Override
@SuppressWarnings("unchecked")
public <CHANNEL extends Channel> CHANNEL getChannel(Class<CHANNEL> channelClass) {
if (channelClass.equals(SocketChannel.class)) {
return (CHANNEL) new KQueueSocketChannel();
}
if (channelClass.equals(ServerSocketChannel.class)) {
return (CHANNEL) new KQueueServerSocketChannel();
}
if (channelClass.equals(DatagramChannel.class)) {
return (CHANNEL) new KQueueDatagramChannel();
}
if (channelClass.equals(DomainSocketChannel.class)) {
return (CHANNEL) new KQueueDomainSocketChannel();
}
if (channelClass.equals(ServerDomainSocketChannel.class)) {
return (CHANNEL) new KQueueServerDomainSocketChannel();
}
throw new IllegalArgumentException("Unsupported channel type: " + channelClass.getSimpleName());
}
public void sendAck(int channelId, DatagramChannel ch) {
if (DEBUG_SEND) Log.debug(TAG, "sendAck");
int ack, ackBits;
synchronized (receivedPackets) {
ack = receivedPackets.generateAck();
ackBits = receivedPackets.generateAckBits(ack);
}
ByteBuf packet = ch.alloc().directBuffer(config.packetHeaderSize);
int headerSize = Packet.writeAck(packet, channelId, ack, ackBits);
if (headerSize < 0) {
Log.error(TAG, "failed to write ack");
ReliableEndpoint.stats.NUM_ACKS_INVALID++;
return;
}
channel.onPacketTransmitted(packet);
ch.writeAndFlush(packet);
}
/**
* Recursively binds the given bootstrap to the given interfaces.
*
* @param bootstrap the bootstrap to bind
* @param ifaces an iterator of interfaces to which to bind
* @param port the port to which to bind
* @param future the future to completed once the bootstrap has been bound to all provided interfaces
*/
private void bind(Bootstrap bootstrap, Iterator<String> ifaces, int port, CompletableFuture<Void> future) {
if (ifaces.hasNext()) {
String iface = ifaces.next();
bootstrap.bind(iface, port).addListener((ChannelFutureListener) f -> {
if (f.isSuccess()) {
log.info("UDP server listening for connections on {}:{}", iface, port);
channel = (DatagramChannel) f.channel();
bind(bootstrap, ifaces, port, future);
} else {
log.warn("Failed to bind TCP server to port {}:{} due to {}", iface, port, f.cause());
future.completeExceptionally(f.cause());
}
});
} else {
future.complete(null);
}
}
public static void main(final String[] args) throws Exception {
final UAS uas = new UAS();
final EventLoopGroup udpGroup = new NioEventLoopGroup();
final Bootstrap b = new Bootstrap();
b.group(udpGroup)
.channel(NioDatagramChannel.class)
.handler(new ChannelInitializer<DatagramChannel>() {
@Override
protected void initChannel(final DatagramChannel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new SipMessageDatagramDecoder());
pipeline.addLast("encoder", new SipMessageEncoder());
pipeline.addLast("handler", uas);
}
});
final InetSocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 5060);
b.bind(socketAddress).sync().channel().closeFuture().await();
}
/**
* @deprecated Override {@link #newNameResolver(EventLoop, ChannelFactory, DnsServerAddressStreamProvider)}.
*/
@Deprecated
protected AddressResolver<InetSocketAddress> newResolver(
EventLoop eventLoop, ChannelFactory<? extends DatagramChannel> channelFactory,
DnsServerAddressStreamProvider nameServerProvider) throws Exception {
final NameResolver<InetAddress> resolver = new InflightNameResolver<InetAddress>(
eventLoop,
newNameResolver(eventLoop, channelFactory, nameServerProvider),
resolvesInProgress,
resolveAllsInProgress);
return newAddressResolver(eventLoop, resolver);
}
/**
* Creates a new {@link NameResolver}. Override this method to create an alternative {@link NameResolver}
* implementation or override the default configuration.
*/
protected NameResolver<InetAddress> newNameResolver(EventLoop eventLoop,
ChannelFactory<? extends DatagramChannel> channelFactory,
DnsServerAddressStreamProvider nameServerProvider)
throws Exception {
return new DnsNameResolverBuilder(eventLoop)
.channelFactory(channelFactory)
.nameServerProvider(nameServerProvider)
.build();
}
private void ensureBound() {
if (!isActive()) {
throw new IllegalStateException(
DatagramChannel.class.getName() +
" must be bound to join a group.");
}
}
/**
* Test try to reproduce issue #1335
*/
@Test
public void testBindMultiple() throws Exception {
DefaultChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
NioEventLoopGroup group = new NioEventLoopGroup();
try {
for (int i = 0; i < 100; i++) {
Bootstrap udpBootstrap = new Bootstrap();
udpBootstrap.group(group).channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// Discard
ReferenceCountUtil.release(msg);
}
});
DatagramChannel datagramChannel = (DatagramChannel) udpBootstrap
.bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
channelGroup.add(datagramChannel);
}
Assert.assertEquals(100, channelGroup.size());
} finally {
channelGroup.close().sync();
group.shutdownGracefully().sync();
}
}
/**
* Returns the correct {@link Class} to use with the given {@link EventLoopGroup}.
*
* @param group the {@link EventLoopGroup} for which the class is needed
* @return the class that should be used for bootstrapping
*/
public static Class<? extends DatagramChannel> datagramChannel(EventLoopGroup group) {
if (useEpoll(group)) {
return EpollDatagramChannel.class;
} else if (useKQueue(group)) {
return KQueueDatagramChannel.class;
} else {
return NioDatagramChannel.class;
}
}
private void createServer(Listener listener, EventLoopGroup eventLoopGroup, ChannelFactory<? extends DatagramChannel> channelFactory) {
this.eventLoopGroup = eventLoopGroup;
try {
Bootstrap b = new Bootstrap();
b.group(eventLoopGroup)//默认是根据机器情况创建Channel,如果机器支持ipv6,则无法使用ipv4的地址加入组播
.channelFactory(channelFactory)
.option(ChannelOption.SO_BROADCAST, true)
.handler(getChannelHandler());
initOptions(b);
//直接绑定端口,不要指定host,不然收不到组播消息
b.bind(port).addListener(future -> {
if (future.isSuccess()) {
logger.info("udp server start success on:{}", port);
if (listener != null) listener.onSuccess(port);
} else {
logger.error("udp server start failure on:{}", port, future.cause());
if (listener != null) listener.onFailure(future.cause());
}
});
} catch (Exception e) {
logger.error("udp server start exception", e);
if (listener != null) listener.onFailure(e);
throw new ServiceException("udp server start exception, port=" + port, e);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
connection.init(ctx.channel(), false);
if (multicastAddress != null) {
((DatagramChannel) ctx.channel()).joinGroup(multicastAddress, networkInterface, null).addListener(future -> {
if (future.isSuccess()) {
LOGGER.info("join multicast group success, channel={}, group={}", ctx.channel(), multicastAddress);
} else {
LOGGER.error("join multicast group error, channel={}, group={}", ctx.channel(), multicastAddress, future.cause());
}
});
}
LOGGER.info("init udp channel={}", ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
connection.close();
if (multicastAddress != null) {
((DatagramChannel) ctx.channel()).leaveGroup(multicastAddress, networkInterface, null).addListener(future -> {
if (future.isSuccess()) {
LOGGER.info("leave multicast group success, channel={}, group={}", ctx.channel(), multicastAddress);
} else {
LOGGER.error("leave multicast group error, channel={}, group={}", ctx.channel(), multicastAddress, future.cause());
}
});
}
LOGGER.info("disconnect udp channel={}, connection={}", ctx.channel(), connection);
}
protected ChannelHandler setupUdpChannel(MetricResolver metricResolver, UdpClientPool udpClientPool) {
return new ChannelInitializer<DatagramChannel>() {
@Override
protected void initChannel(DatagramChannel ch) throws Exception {
ch.pipeline().addLast("logger", new LoggingHandler());
ch.pipeline().addLast("packetDecoder", new UdpPacketToByteBuf());
ch.pipeline().addLast("buffer", new MetricsBufferDecoder());
ch.pipeline().addLast("frame", new DelimiterBasedFrameDecoder(65536, true, Delimiters.lineDelimiter()));
ch.pipeline().addLast("putDecoder", new UdpDecoder());
ch.pipeline().addLast("udpRelayHandler", new UdpRelayHandler(metricResolver, udpClientPool));
}
};
}
protected ChannelHandler setupUdpChannel() {
return new ChannelInitializer<DatagramChannel>() {
@Override
protected void initChannel(DatagramChannel ch) throws Exception {
ch.pipeline().addLast("logger", new LoggingHandler());
ch.pipeline().addLast("packetDecoder", new UdpPacketToByteBuf());
ch.pipeline().addLast("buffer", new MetricsBufferDecoder());
ch.pipeline().addLast("frame", new DelimiterBasedFrameDecoder(65536, true, Delimiters.lineDelimiter()));
ch.pipeline().addLast("putDecoder", new UdpDecoder());
ch.pipeline().addLast("putHandler", new TcpPutHandler(dataStore));
}
};
}
@Override
protected ChannelHandler setupUdpChannel() {
return new ChannelInitializer<DatagramChannel>() {
@Override
protected void initChannel(DatagramChannel ch) throws Exception {
ch.pipeline().addLast("logger", new LoggingHandler());
ch.pipeline().addLast("packetDecoder", new UdpPacketToByteBuf());
ch.pipeline().addLast("buffer", new MetricsBufferDecoder());
ch.pipeline().addLast("frame", new DelimiterBasedFrameDecoder(8192, true, Delimiters.lineDelimiter()));
ch.pipeline().addLast("putDecoder", new UdpDecoder());
ch.pipeline().addLast("capture", udpRequests);
}
};
}
@Override
protected Class<? extends Channel> channelType(boolean isDomainSocket) {
if (isDomainSocket) {
throw new UnsupportedOperationException();
}
return DatagramChannel.class;
}
@Override
protected ChannelFactory<? extends Channel> connectionFactory(EventLoopGroup elg, boolean isDomainSocket) {
if (isDomainSocket) {
throw new UnsupportedOperationException();
}
if (isPreferNative()) {
return () -> loopResources().onChannel(DatagramChannel.class, elg);
}
else {
return () -> new NioDatagramChannel(family());
}
}
@Override
protected Class<? extends Channel> channelType(boolean isDomainSocket) {
if (isDomainSocket) {
throw new UnsupportedOperationException();
}
return DatagramChannel.class;
}
@Override
protected ChannelFactory<? extends Channel> connectionFactory(EventLoopGroup elg, boolean isDomainSocket) {
if (isDomainSocket) {
throw new UnsupportedOperationException();
}
if (isPreferNative()) {
return () -> loopResources().onChannel(DatagramChannel.class, elg);
}
else {
return () -> new NioDatagramChannel(family());
}
}
/**
* When on the server, returns the bind address,
* when on the client, returns the remote address.
*
* @return {@link SocketAddress}
*/
default SocketAddress address(){
Channel c = channel();
if (c instanceof DatagramChannel) {
SocketAddress a = c.remoteAddress();
return a != null ? a : c.localAddress();
}
return c.remoteAddress();
}
@Override
@SuppressWarnings("unchecked")
public <CHANNEL extends Channel> Class<? extends CHANNEL> getChannelClass(Class<CHANNEL> channelClass) {
if (channelClass.equals(SocketChannel.class)) {
return (Class<? extends CHANNEL>) EpollSocketChannel.class;
}
if (channelClass.equals(ServerSocketChannel.class)) {
return (Class<? extends CHANNEL>) EpollServerSocketChannel.class;
}
if (channelClass.equals(DatagramChannel.class)) {
return (Class<? extends CHANNEL>) EpollDatagramChannel.class;
}
throw new IllegalArgumentException("Unsupported channel type: " + channelClass.getSimpleName());
}
@Override
@SuppressWarnings("unchecked")
public <CHANNEL extends Channel> Class<? extends CHANNEL> getChannelClass(Class<CHANNEL> channelClass) {
if (channelClass.equals(SocketChannel.class)) {
return (Class<? extends CHANNEL>) KQueueSocketChannel.class;
}
if (channelClass.equals(ServerSocketChannel.class)) {
return (Class<? extends CHANNEL>) KQueueServerSocketChannel.class;
}
if (channelClass.equals(DatagramChannel.class)) {
return (Class<? extends CHANNEL>) KQueueDatagramChannel.class;
}
throw new IllegalArgumentException("Unsupported channel type: " + channelClass.getSimpleName());
}
@Override
@SuppressWarnings("unchecked")
public <CHANNEL extends Channel> CHANNEL getChannel(Class<CHANNEL> channelClass) {
if (channelClass.equals(SocketChannel.class)) {
return (CHANNEL) new NioSocketChannel();
}
if (channelClass.equals(ServerSocketChannel.class)) {
return (CHANNEL) new NioServerSocketChannel();
}
if (channelClass.equals(DatagramChannel.class)) {
return (CHANNEL) new NioDatagramChannel();
}
throw new IllegalArgumentException("Unsupported channel type: " + channelClass.getSimpleName());
}
@Override
@SuppressWarnings("unchecked")
public <CHANNEL extends Channel> Class<? extends CHANNEL> getChannelClass(Class<CHANNEL> channelClass) {
if (channelClass.equals(SocketChannel.class)) {
return (Class<? extends CHANNEL>) NioSocketChannel.class;
}
if (channelClass.equals(ServerSocketChannel.class)) {
return (Class<? extends CHANNEL>) NioServerSocketChannel.class;
}
if (channelClass.equals(DatagramChannel.class)) {
return (Class<? extends CHANNEL>) NioDatagramChannel.class;
}
throw new IllegalArgumentException("Unsupported channel type: " + channelClass.getSimpleName());
}
public static Class<? extends DatagramChannel> getDatagramChannelClass(EventLoopGroup eventLoopGroup) {
if (eventLoopGroup instanceof EpollEventLoopGroup) {
return EpollDatagramChannel.class;
} else {
return NioDatagramChannel.class;
}
}
private void ensureBound() {
if (!isActive()) {
throw new IllegalStateException(
DatagramChannel.class.getName() +
" must be bound to join a group.");
}
}
/**
* Test try to reproduce issue #1335
*/
@Test
public void testBindMultiple() throws Exception {
DefaultChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
NioEventLoopGroup group = new NioEventLoopGroup();
try {
for (int i = 0; i < 100; i++) {
Bootstrap udpBootstrap = new Bootstrap();
udpBootstrap.group(group).channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// Discard
ReferenceCountUtil.release(msg);
}
});
DatagramChannel datagramChannel = (DatagramChannel) udpBootstrap
.bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
channelGroup.add(datagramChannel);
}
Assert.assertEquals(100, channelGroup.size());
} finally {
channelGroup.close().sync();
group.shutdownGracefully().sync();
}
}