类io.netty.channel.socket.nio.NioDatagramChannel源码实例Demo

下面列出了怎么用io.netty.channel.socket.nio.NioDatagramChannel的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: sipstack   文件: UAS.java
public static void main(final String[] args) throws Exception {
    final UAS uas = new UAS();
    final EventLoopGroup udpGroup = new NioEventLoopGroup();

    final Bootstrap b = new Bootstrap();
    b.group(udpGroup)
    .channel(NioDatagramChannel.class)
    .handler(new ChannelInitializer<DatagramChannel>() {
        @Override
        protected void initChannel(final DatagramChannel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast("decoder", new SipMessageDatagramDecoder());
            pipeline.addLast("encoder", new SipMessageEncoder());
            pipeline.addLast("handler", uas);
        }
    });

    final InetSocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 5060);
    b.bind(socketAddress).sync().channel().closeFuture().await();
}
 
源代码2 项目: netty-4.1.22   文件: SocketTestPermutation.java
public List<BootstrapFactory<Bootstrap>> datagramSocket() {
    return Arrays.asList(
            new BootstrapFactory<Bootstrap>() {
                @Override
                public Bootstrap newInstance() {
                    return new Bootstrap().group(nioWorkerGroup).channel(NioDatagramChannel.class);
                }
            },
            new BootstrapFactory<Bootstrap>() {
                @Override
                public Bootstrap newInstance() {
                    return new Bootstrap().group(oioWorkerGroup).channel(OioDatagramChannel.class)
                            .option(ChannelOption.SO_TIMEOUT, OIO_SO_TIMEOUT);
                }
            }
    );
}
 
源代码3 项目: netty-4.1.22   文件: DnsNameResolverTest.java
private static DnsNameResolverBuilder newResolver(boolean decodeToUnicode,
                                                  DnsServerAddressStreamProvider dnsServerAddressStreamProvider) {
    DnsNameResolverBuilder builder = new DnsNameResolverBuilder(group.next())
            .dnsQueryLifecycleObserverFactory(new TestRecursiveCacheDnsQueryLifecycleObserverFactory())
            .channelType(NioDatagramChannel.class)
            .maxQueriesPerResolve(1)
            .decodeIdn(decodeToUnicode)
            .optResourceEnabled(false)
            .ndots(1);

    if (dnsServerAddressStreamProvider == null) {
        builder.nameServerProvider(new SingletonDnsServerAddressStreamProvider(dnsServer.localAddress()));
    } else {
        builder.nameServerProvider(new MultiDnsServerAddressStreamProvider(dnsServerAddressStreamProvider,
                new SingletonDnsServerAddressStreamProvider(dnsServer.localAddress())));
    }

    return builder;
}
 
源代码4 项目: servicetalk   文件: UdpReporterTest.java
TestReceiver(SpanBytesDecoder decoder) throws Exception {
    channel = new Bootstrap()
            .group(group)
            .channel(NioDatagramChannel.class)
            .option(ChannelOption.RCVBUF_ALLOCATOR, DEFAULT_RECV_BUF_ALLOCATOR)
            .handler(new ChannelInitializer<Channel>() {
                @Override
                protected void initChannel(Channel ch) {
                    ch.pipeline().addLast(new SimpleChannelInboundHandler<DatagramPacket>() {
                        @Override
                        protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) {
                            byte[] b = new byte[msg.content().readableBytes()];
                            msg.content().readBytes(b);
                            decoder.decode(b, queue);
                        }
                    });
                }
            })
            .localAddress(localAddress(0))
            .bind().sync().channel();
}
 
源代码5 项目: kyoko   文件: VoiceWebsocket.java
private CompletableFuture<Bootstrap> setupNetty(InetSocketAddress address, ChannelHandler handler) {
    var future = new CompletableFuture<Bootstrap>();

    var bootstrap = new Bootstrap()
            .group(vertx.nettyEventLoopGroup());

    if (Epoll.isAvailable()) {
        logger.info("epoll support is available, using it for UDP connections.");
        bootstrap.channel(EpollDatagramChannel.class);
    } else {
        logger.info("epoll unavailable, falling back to NIO.");
        bootstrap.channel(NioDatagramChannel.class);
    }

    bootstrap.option(ChannelOption.SO_REUSEADDR, true);
    bootstrap.option(ChannelOption.IP_TOS, 0x10 | 0x08); // IPTOS_LOWDELAY | IPTOS_THROUGHPUT
    bootstrap.handler(handler).connect(address).addListener(res -> {
        if (res.isSuccess()) {
            future.complete(bootstrap);
        } else {
            future.completeExceptionally(res.cause());
        }
    });

    return future;
}
 
源代码6 项目: Jupiter   文件: UDPServerSocket.java
public UDPServerSocket(ThreadedLogger logger, int port, String interfaz) {
    this.logger = logger;
    try {
        bootstrap = new Bootstrap();
        group = new NioEventLoopGroup();
        bootstrap
                .group(group)
                .channel(NioDatagramChannel.class)
                .handler(this);
        channel = bootstrap.bind(interfaz, port).sync().channel();
    } catch (Exception e) {
        this.logger.critical(FastAppender.get(interfaz, ":", port, " 上でサーバーを開けませんでした。"));
        this.logger.critical("同じポートで複数のサーバーを一度に開いていませんか?");
        System.exit(1);
    }
}
 
源代码7 项目: Nukkit   文件: UDPServerSocket.java
public UDPServerSocket(ThreadedLogger logger, int port, String interfaz) {
    this.logger = logger;
    try {
            bootstrap = new Bootstrap()
                    .channel(EPOLL ? EpollDatagramChannel.class : NioDatagramChannel.class)
                    .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    .handler(this)
                    .group(EPOLL ? new EpollEventLoopGroup() : new NioEventLoopGroup());
            this.logger.info("Epoll Status is " + EPOLL);
        channel = bootstrap.bind(interfaz, port).sync().channel();
    } catch (Exception e) {
        this.logger.critical("**** FAILED TO BIND TO " + interfaz + ":" + port + "!");
        this.logger.critical("Perhaps a server is already running on that port?");
        System.exit(1);
    }
}
 
源代码8 项目: Nemisys   文件: UDPServerSocket.java
public UDPServerSocket(ThreadedLogger logger, int port, String interfaz) {
    this.logger = logger;
    try {
        bootstrap = new Bootstrap();
        group = new NioEventLoopGroup();
        bootstrap
                .group(group)
                .channel(NioDatagramChannel.class)
                .handler(this);
        channel = bootstrap.bind(interfaz, port).sync().channel();
    } catch (Exception e) {
        this.logger.critical("**** FAILED TO BIND TO " + interfaz + ":" + port + "!");
        this.logger.critical("Perhaps a server is already running on that port?");
        System.exit(1);
    }
}
 
源代码9 项目: BukkitPE   文件: UDPServerSocket.java
public UDPServerSocket(ThreadedLogger logger, int port, String interfaz) {
    this.logger = logger;
    try {
        bootstrap = new Bootstrap();
        group = new NioEventLoopGroup();
        bootstrap
                .group(group)
                .channel(NioDatagramChannel.class)
                .handler(this);
        channel = bootstrap.bind(interfaz, port).sync().channel();
    } catch (InterruptedException e) {
        this.logger.critical("**** FAILED TO BIND TO " + interfaz + ":" + port + "!");
        this.logger.critical("-------------------------------------------------");
        this.logger.critical("There may be another server running on that port!");
        this.logger.critical("--------------------------------------------------");
        System.exit(1);
    }
}
 
源代码10 项目: UdpServerSocketChannel   文件: UdpServerChannel.java
public UdpServerChannel(int ioThreads) {
	if (ioThreads < 1) {
		throw new IllegalArgumentException("IO threads cound can't be less than 1");
	}
	boolean epollAvailabe = Epoll.isAvailable();
	if (!epollAvailabe) {
		ioThreads = 1;
	}
	group = epollAvailabe ? new EpollEventLoopGroup(ioThreads) : new NioEventLoopGroup(ioThreads);
	Class<? extends DatagramChannel> channel = epollAvailabe ? EpollDatagramChannel.class : NioDatagramChannel.class;
	ChannelInitializer<Channel> initializer = new ChannelInitializer<Channel>() {
		final ReadRouteChannelHandler ioReadRoute = new ReadRouteChannelHandler();
		@Override
		protected void initChannel(Channel ioChannel) throws Exception {
			ioChannel.pipeline().addLast(ioReadRoute);
		}
	};
	while (ioThreads-- > 0) {
		Bootstrap ioBootstrap = new Bootstrap().group(group).channel(channel).handler(initializer);
		if (epollAvailabe) {
			ioBootstrap.option(UnixChannelOption.SO_REUSEPORT, true);
		}
		ioBootstraps.add(ioBootstrap);
	}
}
 
源代码11 项目: heroic   文件: Server.java
static AsyncFuture<Server> setup(
    final AsyncFramework async, final CollectdChannelHandler handler, final InetAddress host,
    final int port
) {
    final EventLoopGroup group = new NioEventLoopGroup();
    final Bootstrap b = new Bootstrap();

    b
        .group(group)
        .channel(NioDatagramChannel.class)
        .option(ChannelOption.SO_BROADCAST, true)
        .handler(handler);

    final ResolvableFuture<Server> future = async.future();

    b.bind(host, port).addListener((ChannelFutureListener) f -> {
        if (f.isSuccess()) {
            future.resolve(new Server(async, f.channel()));
        } else {
            future.fail(
                f.cause() != null ? f.cause() : new RuntimeException("Failed to bind"));
        }
    });

    return future;
}
 
源代码12 项目: atomix   文件: NettyBroadcastService.java
private CompletableFuture<Void> bootstrapServer() {
  Bootstrap serverBootstrap = new Bootstrap()
      .group(group)
      .channelFactory(() -> new NioDatagramChannel(InternetProtocolFamily.IPv4))
      .handler(new SimpleChannelInboundHandler<Object>() {
        @Override
        public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
          // Nothing will be sent.
        }
      })
      .option(ChannelOption.IP_MULTICAST_IF, iface)
      .option(ChannelOption.SO_REUSEADDR, true);

  CompletableFuture<Void> future = new CompletableFuture<>();
  serverBootstrap.bind(localAddress).addListener((ChannelFutureListener) f -> {
    if (f.isSuccess()) {
      serverChannel = f.channel();
      future.complete(null);
    } else {
      future.completeExceptionally(f.cause());
    }
  });
  return future;
}
 
源代码13 项目: atomix   文件: NettyUnicastService.java
private CompletableFuture<Void> bootstrap() {
  Bootstrap serverBootstrap = new Bootstrap()
      .group(group)
      .channel(NioDatagramChannel.class)
      .handler(new SimpleChannelInboundHandler<DatagramPacket>() {
        @Override
        protected void channelRead0(ChannelHandlerContext context, DatagramPacket packet) throws Exception {
          byte[] payload = new byte[packet.content().readInt()];
          packet.content().readBytes(payload);
          Message message = SERIALIZER.decode(payload);
          Map<BiConsumer<Address, byte[]>, Executor> listeners = NettyUnicastService.this.listeners.get(message.subject());
          if (listeners != null) {
            listeners.forEach((consumer, executor) ->
                executor.execute(() -> consumer.accept(message.source(), message.payload())));
          }
        }
      })
      .option(ChannelOption.RCVBUF_ALLOCATOR, new DefaultMaxBytesRecvByteBufAllocator())
      .option(ChannelOption.SO_BROADCAST, true)
      .option(ChannelOption.SO_REUSEADDR, true);

  return bind(serverBootstrap);
}
 
源代码14 项目: Dodder   文件: NettyConfig.java
@Bean(name = "serverBootstrap")
public Bootstrap bootstrap() {
	group = group();
	Bootstrap b = new Bootstrap();
	b.group(group)
			.channel(NioDatagramChannel.class)
			.handler(channelInitializer);
	Map<ChannelOption<?>, Object> udpChannelOptions = udpChannelOptions();
	Set<ChannelOption<?>> keySet = udpChannelOptions.keySet();
	for (@SuppressWarnings("rawtypes")
			ChannelOption option : keySet) {
		b.option(option, udpChannelOptions.get(option));
	}
	return b;
}
 
源代码15 项目: netty-4.1.22   文件: QuoteOfTheMomentServer.java
public static void main(String[] args) throws Exception {
    EventLoopGroup group = new NioEventLoopGroup();
    try {
        Bootstrap b = new Bootstrap();
        b.group(group)
         .channel(NioDatagramChannel.class)
         .option(ChannelOption.SO_BROADCAST, true)
         .handler(new QuoteOfTheMomentServerHandler());

        b.bind(PORT).sync().channel().closeFuture().await();
    } finally {
        group.shutdownGracefully();
    }
}
 
源代码16 项目: netty-4.1.22   文件: SocketTestPermutation.java
public List<BootstrapComboFactory<Bootstrap, Bootstrap>> datagram() {
    // Make the list of Bootstrap factories.
    List<BootstrapFactory<Bootstrap>> bfs = Arrays.asList(
            new BootstrapFactory<Bootstrap>() {
                @Override
                public Bootstrap newInstance() {
                    return new Bootstrap().group(nioWorkerGroup).channelFactory(new ChannelFactory<Channel>() {
                        @Override
                        public Channel newChannel() {
                            return new NioDatagramChannel(InternetProtocolFamily.IPv4);
                        }

                        @Override
                        public String toString() {
                            return NioDatagramChannel.class.getSimpleName() + ".class";
                        }
                    });
                }
            },
            new BootstrapFactory<Bootstrap>() {
                @Override
                public Bootstrap newInstance() {
                    return new Bootstrap().group(oioWorkerGroup).channel(OioDatagramChannel.class)
                            .option(ChannelOption.SO_TIMEOUT, OIO_SO_TIMEOUT);
                }
            }
    );

    // Populare the combinations.
    return combo(bfs, bfs);
}
 
源代码17 项目: netty-4.1.22   文件: SearchDomainTest.java
private DnsNameResolverBuilder newResolver() {
    return new DnsNameResolverBuilder(group.next())
        .channelType(NioDatagramChannel.class)
        .nameServerProvider(new SingletonDnsServerAddressStreamProvider(dnsServer.localAddress()))
        .maxQueriesPerResolve(1)
        .optResourceEnabled(false)
        .ndots(1);
}
 
private static DnsNameResolverBuilder newResolver(EventLoopGroup group) {
    return new DnsNameResolverBuilder(group.next())
            .channelType(NioDatagramChannel.class)
            .nameServerProvider(
                    new SingletonDnsServerAddressStreamProvider(SocketUtils.socketAddress("8.8.8.8", 53)))
            .maxQueriesPerResolve(1)
            .optResourceEnabled(false)
            .ndots(1);
}
 
源代码19 项目: netty-4.1.22   文件: NioDatagramChannelTest.java
/**
 * Test try to reproduce issue #1335
 */
@Test
public void testBindMultiple() throws Exception {
    DefaultChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    NioEventLoopGroup group = new NioEventLoopGroup();
    try {
        for (int i = 0; i < 100; i++) {
            Bootstrap udpBootstrap = new Bootstrap();
            udpBootstrap.group(group).channel(NioDatagramChannel.class)
                    .option(ChannelOption.SO_BROADCAST, true)
                    .handler(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) {
                            // Discard
                            ReferenceCountUtil.release(msg);
                        }
                    });
            DatagramChannel datagramChannel = (DatagramChannel) udpBootstrap
                    .bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
            channelGroup.add(datagramChannel);
        }
        Assert.assertEquals(100, channelGroup.size());
    } finally {
        channelGroup.close().sync();
        group.shutdownGracefully().sync();
    }
}
 
源代码20 项目: netty-4.1.22   文件: KQueueSocketTestPermutation.java
@Override
public List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> datagram() {
    // Make the list of Bootstrap factories.
    @SuppressWarnings("unchecked")
    List<BootstrapFactory<Bootstrap>> bfs = Arrays.asList(
            new BootstrapFactory<Bootstrap>() {
                @Override
                public Bootstrap newInstance() {
                    return new Bootstrap().group(nioWorkerGroup).channelFactory(new ChannelFactory<Channel>() {
                        @Override
                        public Channel newChannel() {
                            return new NioDatagramChannel(InternetProtocolFamily.IPv4);
                        }

                        @Override
                        public String toString() {
                            return NioDatagramChannel.class.getSimpleName() + ".class";
                        }
                    });
                }
            },
            new BootstrapFactory<Bootstrap>() {
                @Override
                public Bootstrap newInstance() {
                    return new Bootstrap().group(KQUEUE_WORKER_GROUP).channel(KQueueDatagramChannel.class);
                }
            }
    );
    return combo(bfs, bfs);
}
 
源代码21 项目: netty-4.1.22   文件: EpollSocketTestPermutation.java
@Override
public List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> datagram() {
    // Make the list of Bootstrap factories.
    @SuppressWarnings("unchecked")
    List<BootstrapFactory<Bootstrap>> bfs = Arrays.asList(
            new BootstrapFactory<Bootstrap>() {
                @Override
                public Bootstrap newInstance() {
                    return new Bootstrap().group(nioWorkerGroup).channelFactory(new ChannelFactory<Channel>() {
                        @Override
                        public Channel newChannel() {
                            return new NioDatagramChannel(InternetProtocolFamily.IPv4);
                        }

                        @Override
                        public String toString() {
                            return NioDatagramChannel.class.getSimpleName() + ".class";
                        }
                    });
                }
            },
            new BootstrapFactory<Bootstrap>() {
                @Override
                public Bootstrap newInstance() {
                    return new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollDatagramChannel.class);
                }
            }
    );
    return combo(bfs, bfs);
}
 
源代码22 项目: easymodbus4j   文件: UdpServer.java
public void setup(int port, SimpleChannelInboundHandler<DatagramPacket> handler) throws InterruptedException {
	Bootstrap b = new Bootstrap();
	EventLoopGroup group = new NioEventLoopGroup();
	b.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true).handler(handler);
	b.bind(port).sync();// .channel().closeFuture().await();
	logger.info(String.format("UdpServer bind:%s", port));
}
 
源代码23 项目: easymodbus4j   文件: UdpClient.java
public void setup(UdpClientHandler handler, boolean wait) throws InterruptedException {
	if (!isInit) {
		EventLoopGroup group = new NioEventLoopGroup();
		Bootstrap b = new Bootstrap();
		b.option(ChannelOption.SO_BROADCAST, true);
		b.group(group).channel(NioDatagramChannel.class).handler(handler);
		channel = b.bind(0).sync().channel();
		sender = UdpSenderFactory.getInstance().get(channel);
		isInit = true;
		if (wait) {
			channel.closeFuture().await();
		}
	}
}
 
源代码24 项目: servicetalk   文件: BuilderUtils.java
/**
 * Returns the correct {@link Class} to use with the given {@link EventLoopGroup}.
 *
 * @param group the {@link EventLoopGroup} for which the class is needed
 * @return the class that should be used for bootstrapping
 */
public static Class<? extends DatagramChannel> datagramChannel(EventLoopGroup group) {
    if (useEpoll(group)) {
        return EpollDatagramChannel.class;
    } else if (useKQueue(group)) {
        return KQueueDatagramChannel.class;
    } else {
        return NioDatagramChannel.class;
    }
}
 
源代码25 项目: JLilyPad   文件: QueryUdpService.java
public void enable(QueryUdpConfig config) throws Exception {
	Bootstrap bootstrap = new Bootstrap().group(this.eventGroup = new NioEventLoopGroup())
			.channel(NioDatagramChannel.class)
			.localAddress(config.queryudp_getBindAddress())
			.handler(new QueryUdpHandler(config.queryudp_getPlayable()));
	bootstrap.bind().sync();
	this.running = true;
}
 
源代码26 项目: gsc-core   文件: BackupServer.java
private void start() throws Exception {
    NioEventLoopGroup group = new NioEventLoopGroup(1);
    try {
        while (!shutdown) {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioDatagramChannel.class)
                    .handler(new ChannelInitializer<NioDatagramChannel>() {
                        @Override
                        public void initChannel(NioDatagramChannel ch)
                                throws Exception {
                            ch.pipeline().addLast(stats.udp);
                            ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                            ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                            ch.pipeline().addLast(new PacketDecoder());
                            MessageHandler messageHandler = new MessageHandler(ch, backupManager);
                            backupManager.setMessageHandler(messageHandler);
                            ch.pipeline().addLast(messageHandler);
                        }
                    });

            channel = b.bind(port).sync().channel();

            logger.info("Backup server started, bind port {}", port);

            channel.closeFuture().sync();
            if (shutdown) {
                logger.info("Shutdown backup BackupServer");
                break;
            }
            logger.warn("Restart backup server ...");
        }
    } catch (Exception e) {
        logger.error("Start backup server with port {} failed.", port, e);
    } finally {
        group.shutdownGracefully().sync();
    }
}
 
源代码27 项目: java-Kcp   文件: KcpClient.java
public void init(ChannelConfig channelConfig) {
    if(channelConfig.isUseConvChannel()){
        int convIndex = 0;
        if(channelConfig.KcpTag){
            convIndex+=Ukcp.KCP_TAG;
        }
        if(channelConfig.getFecDataShardCount()!=0&&channelConfig.getFecParityShardCount()!=0){
            convIndex+= Fec.fecHeaderSizePlus2;
        }
        channelManager = new ConvChannelManager(convIndex);
    }else{
        channelManager = new ClientAddressChannelManager();
    }
    int cpuNum = Runtime.getRuntime().availableProcessors();
    if (disruptorExecutorPool == null) {
        this.disruptorExecutorPool = new DisruptorExecutorPool();
        for (int i = 0; i < cpuNum; i++) {
            disruptorExecutorPool.createDisruptorProcessor("disruptorExecutorPool" + i);
        }
    }
    nioEventLoopGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());
    bootstrap = new Bootstrap();
    bootstrap.channel(NioDatagramChannel.class);
    bootstrap.group(nioEventLoopGroup);
    bootstrap.handler(new ChannelInitializer<NioDatagramChannel>() {
        @Override
        protected void initChannel(NioDatagramChannel ch) {
            ChannelPipeline cp = ch.pipeline();
            if(channelConfig.isCrc32Check()){
                Crc32Encode crc32Encode = new Crc32Encode();
                Crc32Decode crc32Decode = new Crc32Decode();
                cp.addLast(crc32Encode);
                cp.addLast(crc32Decode);
            }
            cp.addLast(new ClientChannelHandler(channelManager));
        }
    });

    Runtime.getRuntime().addShutdownHook(new Thread(() -> stop()));
}
 
源代码28 项目: lippen-network-tool   文件: NetUDPServer.java
public NetUDPServer(DataManager data, MessageReceivedListener listener) {
    super(data, "UDP");
    b = new Bootstrap();
    EventLoopGroup group = new NioEventLoopGroup();
    b.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true)
            .handler(new UDPHandler(data, listener));
}
 
源代码29 项目: mpush   文件: NettyUDPConnector.java
private void createNioServer(Listener listener) {
    NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(
            1, new DefaultThreadFactory(ThreadNames.T_GATEWAY_WORKER)
    );
    eventLoopGroup.setIoRatio(100);
    createServer(listener, eventLoopGroup, () -> new NioDatagramChannel(IPv4));//默认是根据机器情况创建Channel,如果机器支持ipv6,则无法使用ipv4的地址加入组播
}
 
/**
 * <p>Creates a new service using the specified {@link InetSocketAddress} to listen on and utilizing
 * the callback specified to notify listeners of source log events</p>
 *
 * @param listenAddress
 *         An {@link InetSocketAddress} where the listen service will bind or listen on
 * @param logEventCallback
 *         A {@link Consumer} callback that will be called once a log event has been received
 */
public SourceLogListenService(InetSocketAddress listenAddress, Consumer<SourceLogEntry> logEventCallback) {
    this.listenAddress = listenAddress;
    bootstrap = new Bootstrap()
            .localAddress(this.listenAddress)
            .channel(NioDatagramChannel.class)
            .group(listenWorkGroup)
            .handler(new ChannelInitializer<NioDatagramChannel>() {
                @Override
                protected void initChannel(NioDatagramChannel ch) throws Exception {
                    ch.pipeline().addLast(new SourceLogListenHandler(logEventCallback));
                }
            });

    SourceLogListenService service = this;
    //Add shutdown hook
    Runtime.getRuntime().addShutdownHook(new Thread() {
        @Override
        public void run() {
            try {
                log.debug("Service Interrupted. Shutting down gracefully.");
                service.shutdown();
            } catch (InterruptedException e) {
                log.error(e.getMessage(), e);
            }
        }
    });
}
 
 类所在包
 类方法
 同包方法