下面列出了怎么用io.netty.channel.socket.nio.NioDatagramChannel的API类实例代码及写法,或者点击链接到github查看源代码。
public static void main(final String[] args) throws Exception {
final UAS uas = new UAS();
final EventLoopGroup udpGroup = new NioEventLoopGroup();
final Bootstrap b = new Bootstrap();
b.group(udpGroup)
.channel(NioDatagramChannel.class)
.handler(new ChannelInitializer<DatagramChannel>() {
@Override
protected void initChannel(final DatagramChannel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new SipMessageDatagramDecoder());
pipeline.addLast("encoder", new SipMessageEncoder());
pipeline.addLast("handler", uas);
}
});
final InetSocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 5060);
b.bind(socketAddress).sync().channel().closeFuture().await();
}
public List<BootstrapFactory<Bootstrap>> datagramSocket() {
return Arrays.asList(
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(nioWorkerGroup).channel(NioDatagramChannel.class);
}
},
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(oioWorkerGroup).channel(OioDatagramChannel.class)
.option(ChannelOption.SO_TIMEOUT, OIO_SO_TIMEOUT);
}
}
);
}
private static DnsNameResolverBuilder newResolver(boolean decodeToUnicode,
DnsServerAddressStreamProvider dnsServerAddressStreamProvider) {
DnsNameResolverBuilder builder = new DnsNameResolverBuilder(group.next())
.dnsQueryLifecycleObserverFactory(new TestRecursiveCacheDnsQueryLifecycleObserverFactory())
.channelType(NioDatagramChannel.class)
.maxQueriesPerResolve(1)
.decodeIdn(decodeToUnicode)
.optResourceEnabled(false)
.ndots(1);
if (dnsServerAddressStreamProvider == null) {
builder.nameServerProvider(new SingletonDnsServerAddressStreamProvider(dnsServer.localAddress()));
} else {
builder.nameServerProvider(new MultiDnsServerAddressStreamProvider(dnsServerAddressStreamProvider,
new SingletonDnsServerAddressStreamProvider(dnsServer.localAddress())));
}
return builder;
}
TestReceiver(SpanBytesDecoder decoder) throws Exception {
channel = new Bootstrap()
.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.RCVBUF_ALLOCATOR, DEFAULT_RECV_BUF_ALLOCATOR)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new SimpleChannelInboundHandler<DatagramPacket>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) {
byte[] b = new byte[msg.content().readableBytes()];
msg.content().readBytes(b);
decoder.decode(b, queue);
}
});
}
})
.localAddress(localAddress(0))
.bind().sync().channel();
}
private CompletableFuture<Bootstrap> setupNetty(InetSocketAddress address, ChannelHandler handler) {
var future = new CompletableFuture<Bootstrap>();
var bootstrap = new Bootstrap()
.group(vertx.nettyEventLoopGroup());
if (Epoll.isAvailable()) {
logger.info("epoll support is available, using it for UDP connections.");
bootstrap.channel(EpollDatagramChannel.class);
} else {
logger.info("epoll unavailable, falling back to NIO.");
bootstrap.channel(NioDatagramChannel.class);
}
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
bootstrap.option(ChannelOption.IP_TOS, 0x10 | 0x08); // IPTOS_LOWDELAY | IPTOS_THROUGHPUT
bootstrap.handler(handler).connect(address).addListener(res -> {
if (res.isSuccess()) {
future.complete(bootstrap);
} else {
future.completeExceptionally(res.cause());
}
});
return future;
}
public UDPServerSocket(ThreadedLogger logger, int port, String interfaz) {
this.logger = logger;
try {
bootstrap = new Bootstrap();
group = new NioEventLoopGroup();
bootstrap
.group(group)
.channel(NioDatagramChannel.class)
.handler(this);
channel = bootstrap.bind(interfaz, port).sync().channel();
} catch (Exception e) {
this.logger.critical(FastAppender.get(interfaz, ":", port, " 上でサーバーを開けませんでした。"));
this.logger.critical("同じポートで複数のサーバーを一度に開いていませんか?");
System.exit(1);
}
}
public UDPServerSocket(ThreadedLogger logger, int port, String interfaz) {
this.logger = logger;
try {
bootstrap = new Bootstrap()
.channel(EPOLL ? EpollDatagramChannel.class : NioDatagramChannel.class)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.handler(this)
.group(EPOLL ? new EpollEventLoopGroup() : new NioEventLoopGroup());
this.logger.info("Epoll Status is " + EPOLL);
channel = bootstrap.bind(interfaz, port).sync().channel();
} catch (Exception e) {
this.logger.critical("**** FAILED TO BIND TO " + interfaz + ":" + port + "!");
this.logger.critical("Perhaps a server is already running on that port?");
System.exit(1);
}
}
public UDPServerSocket(ThreadedLogger logger, int port, String interfaz) {
this.logger = logger;
try {
bootstrap = new Bootstrap();
group = new NioEventLoopGroup();
bootstrap
.group(group)
.channel(NioDatagramChannel.class)
.handler(this);
channel = bootstrap.bind(interfaz, port).sync().channel();
} catch (Exception e) {
this.logger.critical("**** FAILED TO BIND TO " + interfaz + ":" + port + "!");
this.logger.critical("Perhaps a server is already running on that port?");
System.exit(1);
}
}
public UDPServerSocket(ThreadedLogger logger, int port, String interfaz) {
this.logger = logger;
try {
bootstrap = new Bootstrap();
group = new NioEventLoopGroup();
bootstrap
.group(group)
.channel(NioDatagramChannel.class)
.handler(this);
channel = bootstrap.bind(interfaz, port).sync().channel();
} catch (InterruptedException e) {
this.logger.critical("**** FAILED TO BIND TO " + interfaz + ":" + port + "!");
this.logger.critical("-------------------------------------------------");
this.logger.critical("There may be another server running on that port!");
this.logger.critical("--------------------------------------------------");
System.exit(1);
}
}
public UdpServerChannel(int ioThreads) {
if (ioThreads < 1) {
throw new IllegalArgumentException("IO threads cound can't be less than 1");
}
boolean epollAvailabe = Epoll.isAvailable();
if (!epollAvailabe) {
ioThreads = 1;
}
group = epollAvailabe ? new EpollEventLoopGroup(ioThreads) : new NioEventLoopGroup(ioThreads);
Class<? extends DatagramChannel> channel = epollAvailabe ? EpollDatagramChannel.class : NioDatagramChannel.class;
ChannelInitializer<Channel> initializer = new ChannelInitializer<Channel>() {
final ReadRouteChannelHandler ioReadRoute = new ReadRouteChannelHandler();
@Override
protected void initChannel(Channel ioChannel) throws Exception {
ioChannel.pipeline().addLast(ioReadRoute);
}
};
while (ioThreads-- > 0) {
Bootstrap ioBootstrap = new Bootstrap().group(group).channel(channel).handler(initializer);
if (epollAvailabe) {
ioBootstrap.option(UnixChannelOption.SO_REUSEPORT, true);
}
ioBootstraps.add(ioBootstrap);
}
}
static AsyncFuture<Server> setup(
final AsyncFramework async, final CollectdChannelHandler handler, final InetAddress host,
final int port
) {
final EventLoopGroup group = new NioEventLoopGroup();
final Bootstrap b = new Bootstrap();
b
.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(handler);
final ResolvableFuture<Server> future = async.future();
b.bind(host, port).addListener((ChannelFutureListener) f -> {
if (f.isSuccess()) {
future.resolve(new Server(async, f.channel()));
} else {
future.fail(
f.cause() != null ? f.cause() : new RuntimeException("Failed to bind"));
}
});
return future;
}
private CompletableFuture<Void> bootstrapServer() {
Bootstrap serverBootstrap = new Bootstrap()
.group(group)
.channelFactory(() -> new NioDatagramChannel(InternetProtocolFamily.IPv4))
.handler(new SimpleChannelInboundHandler<Object>() {
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
// Nothing will be sent.
}
})
.option(ChannelOption.IP_MULTICAST_IF, iface)
.option(ChannelOption.SO_REUSEADDR, true);
CompletableFuture<Void> future = new CompletableFuture<>();
serverBootstrap.bind(localAddress).addListener((ChannelFutureListener) f -> {
if (f.isSuccess()) {
serverChannel = f.channel();
future.complete(null);
} else {
future.completeExceptionally(f.cause());
}
});
return future;
}
private CompletableFuture<Void> bootstrap() {
Bootstrap serverBootstrap = new Bootstrap()
.group(group)
.channel(NioDatagramChannel.class)
.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
@Override
protected void channelRead0(ChannelHandlerContext context, DatagramPacket packet) throws Exception {
byte[] payload = new byte[packet.content().readInt()];
packet.content().readBytes(payload);
Message message = SERIALIZER.decode(payload);
Map<BiConsumer<Address, byte[]>, Executor> listeners = NettyUnicastService.this.listeners.get(message.subject());
if (listeners != null) {
listeners.forEach((consumer, executor) ->
executor.execute(() -> consumer.accept(message.source(), message.payload())));
}
}
})
.option(ChannelOption.RCVBUF_ALLOCATOR, new DefaultMaxBytesRecvByteBufAllocator())
.option(ChannelOption.SO_BROADCAST, true)
.option(ChannelOption.SO_REUSEADDR, true);
return bind(serverBootstrap);
}
@Bean(name = "serverBootstrap")
public Bootstrap bootstrap() {
group = group();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioDatagramChannel.class)
.handler(channelInitializer);
Map<ChannelOption<?>, Object> udpChannelOptions = udpChannelOptions();
Set<ChannelOption<?>> keySet = udpChannelOptions.keySet();
for (@SuppressWarnings("rawtypes")
ChannelOption option : keySet) {
b.option(option, udpChannelOptions.get(option));
}
return b;
}
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new QuoteOfTheMomentServerHandler());
b.bind(PORT).sync().channel().closeFuture().await();
} finally {
group.shutdownGracefully();
}
}
public List<BootstrapComboFactory<Bootstrap, Bootstrap>> datagram() {
// Make the list of Bootstrap factories.
List<BootstrapFactory<Bootstrap>> bfs = Arrays.asList(
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(nioWorkerGroup).channelFactory(new ChannelFactory<Channel>() {
@Override
public Channel newChannel() {
return new NioDatagramChannel(InternetProtocolFamily.IPv4);
}
@Override
public String toString() {
return NioDatagramChannel.class.getSimpleName() + ".class";
}
});
}
},
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(oioWorkerGroup).channel(OioDatagramChannel.class)
.option(ChannelOption.SO_TIMEOUT, OIO_SO_TIMEOUT);
}
}
);
// Populare the combinations.
return combo(bfs, bfs);
}
private DnsNameResolverBuilder newResolver() {
return new DnsNameResolverBuilder(group.next())
.channelType(NioDatagramChannel.class)
.nameServerProvider(new SingletonDnsServerAddressStreamProvider(dnsServer.localAddress()))
.maxQueriesPerResolve(1)
.optResourceEnabled(false)
.ndots(1);
}
private static DnsNameResolverBuilder newResolver(EventLoopGroup group) {
return new DnsNameResolverBuilder(group.next())
.channelType(NioDatagramChannel.class)
.nameServerProvider(
new SingletonDnsServerAddressStreamProvider(SocketUtils.socketAddress("8.8.8.8", 53)))
.maxQueriesPerResolve(1)
.optResourceEnabled(false)
.ndots(1);
}
/**
* Test try to reproduce issue #1335
*/
@Test
public void testBindMultiple() throws Exception {
DefaultChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
NioEventLoopGroup group = new NioEventLoopGroup();
try {
for (int i = 0; i < 100; i++) {
Bootstrap udpBootstrap = new Bootstrap();
udpBootstrap.group(group).channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// Discard
ReferenceCountUtil.release(msg);
}
});
DatagramChannel datagramChannel = (DatagramChannel) udpBootstrap
.bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
channelGroup.add(datagramChannel);
}
Assert.assertEquals(100, channelGroup.size());
} finally {
channelGroup.close().sync();
group.shutdownGracefully().sync();
}
}
@Override
public List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> datagram() {
// Make the list of Bootstrap factories.
@SuppressWarnings("unchecked")
List<BootstrapFactory<Bootstrap>> bfs = Arrays.asList(
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(nioWorkerGroup).channelFactory(new ChannelFactory<Channel>() {
@Override
public Channel newChannel() {
return new NioDatagramChannel(InternetProtocolFamily.IPv4);
}
@Override
public String toString() {
return NioDatagramChannel.class.getSimpleName() + ".class";
}
});
}
},
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(KQUEUE_WORKER_GROUP).channel(KQueueDatagramChannel.class);
}
}
);
return combo(bfs, bfs);
}
@Override
public List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> datagram() {
// Make the list of Bootstrap factories.
@SuppressWarnings("unchecked")
List<BootstrapFactory<Bootstrap>> bfs = Arrays.asList(
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(nioWorkerGroup).channelFactory(new ChannelFactory<Channel>() {
@Override
public Channel newChannel() {
return new NioDatagramChannel(InternetProtocolFamily.IPv4);
}
@Override
public String toString() {
return NioDatagramChannel.class.getSimpleName() + ".class";
}
});
}
},
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollDatagramChannel.class);
}
}
);
return combo(bfs, bfs);
}
public void setup(int port, SimpleChannelInboundHandler<DatagramPacket> handler) throws InterruptedException {
Bootstrap b = new Bootstrap();
EventLoopGroup group = new NioEventLoopGroup();
b.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true).handler(handler);
b.bind(port).sync();// .channel().closeFuture().await();
logger.info(String.format("UdpServer bind:%s", port));
}
public void setup(UdpClientHandler handler, boolean wait) throws InterruptedException {
if (!isInit) {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.option(ChannelOption.SO_BROADCAST, true);
b.group(group).channel(NioDatagramChannel.class).handler(handler);
channel = b.bind(0).sync().channel();
sender = UdpSenderFactory.getInstance().get(channel);
isInit = true;
if (wait) {
channel.closeFuture().await();
}
}
}
/**
* Returns the correct {@link Class} to use with the given {@link EventLoopGroup}.
*
* @param group the {@link EventLoopGroup} for which the class is needed
* @return the class that should be used for bootstrapping
*/
public static Class<? extends DatagramChannel> datagramChannel(EventLoopGroup group) {
if (useEpoll(group)) {
return EpollDatagramChannel.class;
} else if (useKQueue(group)) {
return KQueueDatagramChannel.class;
} else {
return NioDatagramChannel.class;
}
}
public void enable(QueryUdpConfig config) throws Exception {
Bootstrap bootstrap = new Bootstrap().group(this.eventGroup = new NioEventLoopGroup())
.channel(NioDatagramChannel.class)
.localAddress(config.queryudp_getBindAddress())
.handler(new QueryUdpHandler(config.queryudp_getPlayable()));
bootstrap.bind().sync();
this.running = true;
}
private void start() throws Exception {
NioEventLoopGroup group = new NioEventLoopGroup(1);
try {
while (!shutdown) {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioDatagramChannel.class)
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
public void initChannel(NioDatagramChannel ch)
throws Exception {
ch.pipeline().addLast(stats.udp);
ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
ch.pipeline().addLast(new PacketDecoder());
MessageHandler messageHandler = new MessageHandler(ch, backupManager);
backupManager.setMessageHandler(messageHandler);
ch.pipeline().addLast(messageHandler);
}
});
channel = b.bind(port).sync().channel();
logger.info("Backup server started, bind port {}", port);
channel.closeFuture().sync();
if (shutdown) {
logger.info("Shutdown backup BackupServer");
break;
}
logger.warn("Restart backup server ...");
}
} catch (Exception e) {
logger.error("Start backup server with port {} failed.", port, e);
} finally {
group.shutdownGracefully().sync();
}
}
public void init(ChannelConfig channelConfig) {
if(channelConfig.isUseConvChannel()){
int convIndex = 0;
if(channelConfig.KcpTag){
convIndex+=Ukcp.KCP_TAG;
}
if(channelConfig.getFecDataShardCount()!=0&&channelConfig.getFecParityShardCount()!=0){
convIndex+= Fec.fecHeaderSizePlus2;
}
channelManager = new ConvChannelManager(convIndex);
}else{
channelManager = new ClientAddressChannelManager();
}
int cpuNum = Runtime.getRuntime().availableProcessors();
if (disruptorExecutorPool == null) {
this.disruptorExecutorPool = new DisruptorExecutorPool();
for (int i = 0; i < cpuNum; i++) {
disruptorExecutorPool.createDisruptorProcessor("disruptorExecutorPool" + i);
}
}
nioEventLoopGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());
bootstrap = new Bootstrap();
bootstrap.channel(NioDatagramChannel.class);
bootstrap.group(nioEventLoopGroup);
bootstrap.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioDatagramChannel ch) {
ChannelPipeline cp = ch.pipeline();
if(channelConfig.isCrc32Check()){
Crc32Encode crc32Encode = new Crc32Encode();
Crc32Decode crc32Decode = new Crc32Decode();
cp.addLast(crc32Encode);
cp.addLast(crc32Decode);
}
cp.addLast(new ClientChannelHandler(channelManager));
}
});
Runtime.getRuntime().addShutdownHook(new Thread(() -> stop()));
}
public NetUDPServer(DataManager data, MessageReceivedListener listener) {
super(data, "UDP");
b = new Bootstrap();
EventLoopGroup group = new NioEventLoopGroup();
b.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true)
.handler(new UDPHandler(data, listener));
}
private void createNioServer(Listener listener) {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(
1, new DefaultThreadFactory(ThreadNames.T_GATEWAY_WORKER)
);
eventLoopGroup.setIoRatio(100);
createServer(listener, eventLoopGroup, () -> new NioDatagramChannel(IPv4));//默认是根据机器情况创建Channel,如果机器支持ipv6,则无法使用ipv4的地址加入组播
}
/**
* <p>Creates a new service using the specified {@link InetSocketAddress} to listen on and utilizing
* the callback specified to notify listeners of source log events</p>
*
* @param listenAddress
* An {@link InetSocketAddress} where the listen service will bind or listen on
* @param logEventCallback
* A {@link Consumer} callback that will be called once a log event has been received
*/
public SourceLogListenService(InetSocketAddress listenAddress, Consumer<SourceLogEntry> logEventCallback) {
this.listenAddress = listenAddress;
bootstrap = new Bootstrap()
.localAddress(this.listenAddress)
.channel(NioDatagramChannel.class)
.group(listenWorkGroup)
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioDatagramChannel ch) throws Exception {
ch.pipeline().addLast(new SourceLogListenHandler(logEventCallback));
}
});
SourceLogListenService service = this;
//Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
log.debug("Service Interrupted. Shutting down gracefully.");
service.shutdown();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
});
}