io.netty.channel.epoll.EpollChannelOption#io.netty.channel.WriteBufferWaterMark源码实例Demo

下面列出了io.netty.channel.epoll.EpollChannelOption#io.netty.channel.WriteBufferWaterMark 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: grpc-nebula-java   文件: UtilsTest.java
private static InternalChannelz.SocketOptions setAndValidateGeneric(Channel channel) {
  channel.config().setOption(ChannelOption.SO_LINGER, 3);
  // only applicable for OIO channels:
  channel.config().setOption(ChannelOption.SO_TIMEOUT, 250);
  // Test some arbitrarily chosen options with a non numeric values
  channel.config().setOption(ChannelOption.SO_KEEPALIVE, true);
  WriteBufferWaterMark writeBufWaterMark = new WriteBufferWaterMark(10, 20);
  channel.config().setOption(ChannelOption.WRITE_BUFFER_WATER_MARK, writeBufWaterMark);

  InternalChannelz.SocketOptions socketOptions = Utils.getSocketOptions(channel);
  assertEquals(3, (int) socketOptions.lingerSeconds);
  assertEquals("true", socketOptions.others.get("SO_KEEPALIVE"));
  assertEquals(
      writeBufWaterMark.toString(),
      socketOptions.others.get(ChannelOption.WRITE_BUFFER_WATER_MARK.toString()));
  return socketOptions;
}
 
源代码2 项目: mantis   文件: TestSseServerFactory.java
public static int newServerWithInitialData(final int port, final String data) {
    final HttpServer<String, ServerSentEvent> server = RxNetty.newHttpServerBuilder(
            port,
            new RequestHandler<String, ServerSentEvent>() {
                @Override
                public Observable<Void> handle(HttpServerRequest<String> req, HttpServerResponse<ServerSentEvent> resp) {
                    final ByteBuf byteBuf = resp.getAllocator().buffer().writeBytes(data.getBytes());
                    resp.writeAndFlush(new ServerSentEvent(byteBuf));
                    return Observable.empty();
                }
            })
            .pipelineConfigurator(PipelineConfigurators.<String>serveSseConfigurator())
            .channelOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 5 * 1024 * 1024))
            
            .build();
    server.start();
    synchronized (servers) {
        servers.add(server);
    }
    return port;
}
 
源代码3 项目: spring-boot-protocol   文件: NettyTcpServer.java
@Override
protected void config(ServerBootstrap bootstrap) throws Exception{
    super.config(bootstrap);
    if(SystemPropertyUtil.get("io.netty.leakDetectionLevel") == null &&
            SystemPropertyUtil.get("io.netty.leakDetection.level") == null){
        ResourceLeakDetector.setLevel(properties.getResourceLeakDetectorLevel());
    }

    if(SystemPropertyUtil.get("io.netty.maxDirectMemory") == null){
        long maxDirectMemory = -1;
        System.setProperty("io.netty.maxDirectMemory", String.valueOf(maxDirectMemory));
    }
    bootstrap.childOption(ChannelOption.WRITE_SPIN_COUNT,Integer.MAX_VALUE);
    bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(32 * 1024,Integer.MAX_VALUE));
    bootstrap.childOption(ChannelOption.AUTO_CLOSE,true);

    bootstrap.childOption(ChannelOption.TCP_NODELAY, properties.isTcpNodelay());
    for (ServerListener serverListener : serverListeners) {
        serverListener.config(bootstrap);
    }
}
 
源代码4 项目: cantor   文件: Bootstraps.java
public static ServerBootstrap serverBootstrap(EventLoopGroup acceptors,
                                              EventLoopGroup workers) {

    ServerBootstrap bootstrap = new ServerBootstrap().group(acceptors, workers)
                                                     .childOption(ChannelOption.TCP_NODELAY,
                                                                  true)
                                                     .childOption(ChannelOption.ALLOCATOR,
                                                                  PooledDirectByteBufAllocator.INSTANCE)
                                                     .childOption(ChannelOption.SO_KEEPALIVE,
                                                                  false)
                                                     .option(ChannelOption.SO_REUSEADDR,
                                                             true);

    bootstrap.channel(NioServerSocketChannel.class);
    bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
                     new WriteBufferWaterMark(32 * 1024, 128 * 1024));

    return bootstrap;
}
 
源代码5 项目: sofa-bolt   文件: RpcServer.java
/**
 * init netty write buffer water mark
 */
private void initWriteBufferWaterMark() {
    int lowWaterMark = this.netty_buffer_low_watermark();
    int highWaterMark = this.netty_buffer_high_watermark();
    if (lowWaterMark > highWaterMark) {
        throw new IllegalArgumentException(
            String
                .format(
                    "[server side] bolt netty high water mark {%s} should not be smaller than low water mark {%s} bytes)",
                    highWaterMark, lowWaterMark));
    } else {
        logger.warn(
            "[server side] bolt netty low water mark is {} bytes, high water mark is {} bytes",
            lowWaterMark, highWaterMark);
    }
    this.bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
        lowWaterMark, highWaterMark));
}
 
源代码6 项目: sofa-bolt   文件: AbstractConnectionFactory.java
/**
 * init netty write buffer water mark
 */
private void initWriteBufferWaterMark() {
    int lowWaterMark = this.confInstance.netty_buffer_low_watermark();
    int highWaterMark = this.confInstance.netty_buffer_high_watermark();
    if (lowWaterMark > highWaterMark) {
        throw new IllegalArgumentException(
            String
                .format(
                    "[client side] bolt netty high water mark {%s} should not be smaller than low water mark {%s} bytes)",
                    highWaterMark, lowWaterMark));
    } else {
        logger.warn(
            "[client side] bolt netty low water mark is {} bytes, high water mark is {} bytes",
            lowWaterMark, highWaterMark);
    }
    this.bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
        lowWaterMark, highWaterMark));
}
 
源代码7 项目: rpc-benchmark   文件: NettyClientConnector.java
private void doConnect(EventLoopGroup loupGroup, Class<? extends SocketChannel> serverChannelClass, boolean isEpoll)
		throws InterruptedException {

	final Bootstrap bootstrap = new Bootstrap();

	if (isEpoll) {
		bootstrap.option(EpollChannelOption.SO_REUSEPORT, true);
	}

	bootstrap.option(ChannelOption.SO_REUSEADDR, true);
	bootstrap.option(ChannelOption.SO_RCVBUF, 256 * 1024);
	bootstrap.option(ChannelOption.SO_SNDBUF, 256 * 1024);
	bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, //
			new WriteBufferWaterMark(1024 * 1024, 2048 * 1024));

	bootstrap.group(loupGroup);
	bootstrap.channel(serverChannelClass);
	bootstrap.handler(new BenchmarkChannelInitializer(futureContainer));

	for (int i = 0; i < CONNECT_COUNT; i++) {
		channels[i] = bootstrap.connect(host, port).sync().channel();
		queues[i] = new MpscAtomicArrayQueue<>(4 * 1024);
	}
}
 
private Bootstrap newBootstrap() {
	Bootstrap boot = new Bootstrap();
	boot.channel(NettyPlatformIndependent.channelClass());
	boot.option(ChannelOption.TCP_NODELAY, true);
	// replace by heart beat
	boot.option(ChannelOption.SO_KEEPALIVE, false);
	// default is pooled direct
	// ByteBuf(io.netty.util.internal.PlatformDependent.DIRECT_BUFFER_PREFERRED)
	boot.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
	// 32kb(for massive long connections, See
	// http://www.infoq.com/cn/articles/netty-million-level-push-service-design-points)
	// 64kb(RocketMq remoting default value)
	boot.option(ChannelOption.SO_SNDBUF, 32 * 1024);
	boot.option(ChannelOption.SO_RCVBUF, 32 * 1024);
	// temporary settings, need more tests
	boot.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024));
	//default is true, reduce thread context switching
	boot.option(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP, true);
	return boot;
}
 
源代码9 项目: sailfish   文件: DefaultServer.java
private ServerBootstrap newServerBootstrap() {
	ServerBootstrap serverBoot = new ServerBootstrap();
	serverBoot.channel(NettyPlatformIndependent.serverChannelClass());
	// connections wait for accept
	serverBoot.option(ChannelOption.SO_BACKLOG, 1024);
	serverBoot.option(ChannelOption.SO_REUSEADDR, true);
	// replace by heart beat
	serverBoot.childOption(ChannelOption.SO_KEEPALIVE, false);
	serverBoot.childOption(ChannelOption.TCP_NODELAY, true);
	serverBoot.childOption(ChannelOption.SO_SNDBUF, 32 * 1024);
	serverBoot.childOption(ChannelOption.SO_RCVBUF, 32 * 1024);
	// temporary settings, need more tests
	serverBoot.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024));
	serverBoot.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
	//default is true, reduce thread context switching
	serverBoot.childOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP, true);
	return serverBoot;
}
 
源代码10 项目: Jupiter   文件: NettyDomainConnector.java
@Override
protected void setOptions() {
    super.setOptions();

    Bootstrap boot = bootstrap();

    // child options
    NettyConfig.NettyDomainConfigGroup.ChildConfig child = childConfig;

    WriteBufferWaterMark waterMark =
            createWriteBufferWaterMark(child.getWriteBufferLowWaterMark(), child.getWriteBufferHighWaterMark());

    boot.option(ChannelOption.WRITE_BUFFER_WATER_MARK, waterMark);

    if (child.getConnectTimeoutMillis() > 0) {
        boot.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, child.getConnectTimeoutMillis());
    }
}
 
源代码11 项目: atomix   文件: NettyMessagingService.java
/**
 * Bootstraps a server.
 *
 * @return a future to be completed once the server has been bound to all interfaces
 */
private CompletableFuture<Void> bootstrapServer() {
  ServerBootstrap b = new ServerBootstrap();
  b.option(ChannelOption.SO_REUSEADDR, true);
  b.option(ChannelOption.SO_BACKLOG, 128);
  b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
      new WriteBufferWaterMark(8 * 1024, 32 * 1024));
  b.childOption(ChannelOption.SO_RCVBUF, 1024 * 1024);
  b.childOption(ChannelOption.SO_SNDBUF, 1024 * 1024);
  b.childOption(ChannelOption.SO_KEEPALIVE, true);
  b.childOption(ChannelOption.TCP_NODELAY, true);
  b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
  b.group(serverGroup, clientGroup);
  b.channel(serverChannelClass);
  if (enableNettyTls) {
    try {
      b.childHandler(new SslServerChannelInitializer());
    } catch (SSLException e) {
      return Futures.exceptionalFuture(e);
    }
  } else {
    b.childHandler(new BasicServerChannelInitializer());
  }
  return bind(b);
}
 
源代码12 项目: pinpoint   文件: DefaultChannelFactory.java
private void setupClientOption(final NettyChannelBuilder channelBuilder) {
    channelBuilder.keepAliveTime(clientOption.getKeepAliveTime(), TimeUnit.MILLISECONDS);
    channelBuilder.keepAliveTimeout(clientOption.getKeepAliveTimeout(), TimeUnit.MILLISECONDS);
    channelBuilder.keepAliveWithoutCalls(clientOption.isKeepAliveWithoutCalls());
    channelBuilder.maxHeaderListSize(clientOption.getMaxHeaderListSize());
    channelBuilder.maxInboundMessageSize(clientOption.getMaxInboundMessageSize());
    channelBuilder.flowControlWindow(clientOption.getFlowControlWindow());
    channelBuilder.idleTimeout(clientOption.getIdleTimeoutMillis(), TimeUnit.MILLISECONDS);

    // ChannelOption
    channelBuilder.withOption(ChannelOption.TCP_NODELAY, true);
    channelBuilder.withOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientOption.getConnectTimeout());

    final WriteBufferWaterMark writeBufferWaterMark = new WriteBufferWaterMark(clientOption.getWriteBufferLowWaterMark(), clientOption.getWriteBufferHighWaterMark());
    channelBuilder.withOption(ChannelOption.WRITE_BUFFER_WATER_MARK, writeBufferWaterMark);
    if (logger.isInfoEnabled()) {
        logger.info("Set clientOption {}. name={}", clientOption, factoryName);
    }
}
 
源代码13 项目: grpc-java   文件: UtilsTest.java
private static InternalChannelz.SocketOptions setAndValidateGeneric(Channel channel) {
  channel.config().setOption(ChannelOption.SO_LINGER, 3);
  // only applicable for OIO channels:
  channel.config().setOption(ChannelOption.SO_TIMEOUT, 250);
  // Test some arbitrarily chosen options with a non numeric values
  channel.config().setOption(ChannelOption.SO_KEEPALIVE, true);
  WriteBufferWaterMark writeBufWaterMark = new WriteBufferWaterMark(10, 20);
  channel.config().setOption(ChannelOption.WRITE_BUFFER_WATER_MARK, writeBufWaterMark);

  InternalChannelz.SocketOptions socketOptions = Utils.getSocketOptions(channel);
  assertEquals(3, (int) socketOptions.lingerSeconds);
  assertEquals("true", socketOptions.others.get("SO_KEEPALIVE"));
  assertEquals(
      writeBufWaterMark.toString(),
      socketOptions.others.get(ChannelOption.WRITE_BUFFER_WATER_MARK.toString()));
  return socketOptions;
}
 
源代码14 项目: turbo-rpc   文件: NettyRpcServer.java
public void start() throws InterruptedException {
	InetSocketAddress inet = new InetSocketAddress(hostPort.host, hostPort.port);

	ServerBootstrap bootstrap = new ServerBootstrap();
	bootstrap.group(eventLoopGroup);

	bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
	bootstrap.option(ChannelOption.SO_REUSEADDR, true);
	bootstrap.option(ChannelOption.SO_RCVBUF, 256 * 1024);

	if (eventLoopGroup instanceof EpollEventLoopGroup) {
		bootstrap.option(EpollChannelOption.SO_REUSEPORT, true);
		bootstrap.channel(EpollServerSocketChannel.class);
	} else if (eventLoopGroup instanceof NioEventLoopGroup) {
		bootstrap.channel(NioServerSocketChannel.class);
	}

	bootstrap.childHandler(new NettyRpcChannelInitializer(invokerFactory, serializer, filters));

	bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
	bootstrap.childOption(ChannelOption.SO_RCVBUF, 256 * 1024);
	bootstrap.childOption(ChannelOption.SO_SNDBUF, 256 * 1024);
	bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, //
			new WriteBufferWaterMark(1024 * 1024, 2048 * 1024));

	channel = bootstrap.bind(inet).sync().channel();

	System.out.println("TurboRpcServer started. Listening on: " + hostPort);
}
 
源代码15 项目: turbo-rpc   文件: NettyRestServer.java
public void start() throws InterruptedException {
	InetSocketAddress inet = new InetSocketAddress(hostPort.host, hostPort.port);

	ServerBootstrap bootstrap = new ServerBootstrap();
	bootstrap.group(eventLoopGroup);

	bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
	bootstrap.option(ChannelOption.SO_REUSEADDR, true);
	bootstrap.option(ChannelOption.SO_RCVBUF, 256 * 1024);

	if (eventLoopGroup instanceof EpollEventLoopGroup) {
		bootstrap.option(EpollChannelOption.SO_REUSEPORT, true);
		bootstrap.channel(EpollServerSocketChannel.class);
	} else if (eventLoopGroup instanceof NioEventLoopGroup) {
		bootstrap.channel(NioServerSocketChannel.class);
	}

	bootstrap.childHandler(new NettyRestChannelInitializer(invokerFactory, jsonMapper, filters));

	bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
	bootstrap.childOption(ChannelOption.SO_RCVBUF, 256 * 1024);
	bootstrap.childOption(ChannelOption.SO_SNDBUF, 256 * 1024);
	bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, //
			new WriteBufferWaterMark(1024 * 1024, 2048 * 1024));

	channel = bootstrap.bind(inet).sync().channel();

	System.out.println("NettyRestServer started. Listening on: " + hostPort);
}
 
源代码16 项目: servicetalk   文件: SocketOptionUtils.java
/**
 * Convert and add the given {@link SocketOption} and value to the {@link ChannelOption}s {@link Map}.
 *
 * @param channelOpts the {@link Map} into which add the converted {@link SocketOption}
 * @param option the {@link SocketOption} to convert and add
 * @param value the value to add
 * @param <T> the type of the {@link SocketOption} value
 * @throws IllegalArgumentException if the specified {@link SocketOption} is not supported
 */
@SuppressWarnings("rawtypes")
public static <T> void addOption(final Map<ChannelOption, Object> channelOpts, final SocketOption<T> option,
                                 final Object value) {
    if (option == StandardSocketOptions.IP_MULTICAST_IF) {
        channelOpts.put(ChannelOption.IP_MULTICAST_IF, value);
    } else if (option == StandardSocketOptions.IP_MULTICAST_LOOP) {
        channelOpts.put(ChannelOption.IP_MULTICAST_LOOP_DISABLED, !(Boolean) value);
    } else if (option == StandardSocketOptions.IP_MULTICAST_TTL) {
        channelOpts.put(ChannelOption.IP_MULTICAST_TTL, value);
    } else if (option == StandardSocketOptions.IP_TOS) {
        channelOpts.put(ChannelOption.IP_TOS, value);
    } else if (option == StandardSocketOptions.SO_BROADCAST) {
        channelOpts.put(ChannelOption.SO_BROADCAST, value);
    } else if (option == StandardSocketOptions.SO_KEEPALIVE) {
        channelOpts.put(ChannelOption.SO_KEEPALIVE, value);
    } else if (option == StandardSocketOptions.SO_LINGER) {
        channelOpts.put(ChannelOption.SO_LINGER, value);
    } else if (option == StandardSocketOptions.SO_RCVBUF) {
        channelOpts.put(ChannelOption.SO_RCVBUF, value);
    } else if (option == StandardSocketOptions.SO_REUSEADDR) {
        channelOpts.put(ChannelOption.SO_REUSEADDR, value);
    } else if (option == StandardSocketOptions.SO_SNDBUF) {
        channelOpts.put(ChannelOption.SO_SNDBUF, value);
    } else if (option == StandardSocketOptions.TCP_NODELAY) {
        channelOpts.put(ChannelOption.TCP_NODELAY, value);
    } else if (option == ServiceTalkSocketOptions.CONNECT_TIMEOUT) {
        channelOpts.put(ChannelOption.CONNECT_TIMEOUT_MILLIS, value);
    } else if (option == ServiceTalkSocketOptions.WRITE_BUFFER_THRESHOLD) {
        final int writeBufferThreshold = (Integer) value;
        channelOpts.put(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(writeBufferThreshold >>> 1,
                writeBufferThreshold));
    } else {
        throw unsupported(option);
    }
}
 
源代码17 项目: xian   文件: RpcNettyServer.java
private void start() throws Exception {
    if (Node.RPC_PORT < 0) {
        LOG.error("No rpc port is specified, rpc server starting failed.");
        return;
    }
    final SslContext sslCtx;
    if (SSL) {
        SelfSignedCertificate ssc = new SelfSignedCertificate();
        sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
    } else {
        sslCtx = null;
    }
    // Boss thread pool below for handling incoming connections.
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    // Worker thread pool below for handling channel read/write. Here we only allow 1 thread to make sure the message order.
    EventLoopGroup workerGroup = new NioEventLoopGroup(1);
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
            .option(ChannelOption.WRITE_BUFFER_WATER_MARK,
                    new WriteBufferWaterMark(
                            10 * 1024 * 1024, //10m
                            20 * 1024 * 1024 //20m
                    )
            )
            .channel(NioServerSocketChannel.class)
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new RpcServerInitializer(sslCtx));
    parentChannel = b.bind(Node.RPC_PORT).sync().channel();
    parentChannel.closeFuture().addListener(future -> {
        workerGroup.shutdownGracefully();
        bossGroup.shutdownGracefully();
        LOG.info("The EventLoopGroup has been terminated completely and all Channels that belong to the group have been closed.");
    });
}
 
源代码18 项目: rpc-benchmark   文件: Server.java
private static void doRun(EventLoopGroup loupGroup, Class<? extends ServerChannel> serverChannelClass,
		boolean isEpoll) throws InterruptedException {
	try {
		InetSocketAddress inet = new InetSocketAddress(port);

		ServerBootstrap b = new ServerBootstrap();

		if (isEpoll) {
			b.option(EpollChannelOption.SO_REUSEPORT, true);
		}

		b.option(ChannelOption.SO_BACKLOG, 1024 * 8);
		b.option(ChannelOption.SO_REUSEADDR, true);
		b.option(ChannelOption.SO_RCVBUF, 256 * 1024);
		b.group(loupGroup).channel(serverChannelClass).childHandler(new BenchmarkChannelInitializer());
		b.childOption(ChannelOption.SO_REUSEADDR, true);

		b.childOption(ChannelOption.SO_REUSEADDR, true);
		b.childOption(ChannelOption.SO_RCVBUF, 256 * 1024);
		b.childOption(ChannelOption.SO_SNDBUF, 256 * 1024);
		b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, //
				new WriteBufferWaterMark(1024 * 1024, 2048 * 1024));

		Channel ch = b.bind(inet).sync().channel();

		System.out.printf("Httpd started. Listening on: %s%n", inet.toString());

		ch.closeFuture().sync();
	} finally {
		loupGroup.shutdownGracefully().sync();
	}
}
 
源代码19 项目: mpush   文件: ConnectionServer.java
@Override
protected void initOptions(ServerBootstrap b) {
    super.initOptions(b);

    b.option(ChannelOption.SO_BACKLOG, 1024);

    /**
     * TCP层面的接收和发送缓冲区大小设置,
     * 在Netty中分别对应ChannelOption的SO_SNDBUF和SO_RCVBUF,
     * 需要根据推送消息的大小,合理设置,对于海量长连接,通常32K是个不错的选择。
     */
    if (snd_buf.connect_server > 0) b.childOption(ChannelOption.SO_SNDBUF, snd_buf.connect_server);
    if (rcv_buf.connect_server > 0) b.childOption(ChannelOption.SO_RCVBUF, rcv_buf.connect_server);

    /**
     * 这个坑其实也不算坑,只是因为懒,该做的事情没做。一般来讲我们的业务如果比较小的时候我们用同步处理,等业务到一定规模的时候,一个优化手段就是异步化。
     * 异步化是提高吞吐量的一个很好的手段。但是,与异步相比,同步有天然的负反馈机制,也就是如果后端慢了,前面也会跟着慢起来,可以自动的调节。
     * 但是异步就不同了,异步就像决堤的大坝一样,洪水是畅通无阻。如果这个时候没有进行有效的限流措施就很容易把后端冲垮。
     * 如果一下子把后端冲垮倒也不是最坏的情况,就怕把后端冲的要死不活。
     * 这个时候,后端就会变得特别缓慢,如果这个时候前面的应用使用了一些无界的资源等,就有可能把自己弄死。
     * 那么现在要介绍的这个坑就是关于Netty里的ChannelOutboundBuffer这个东西的。
     * 这个buffer是用在netty向channel write数据的时候,有个buffer缓冲,这样可以提高网络的吞吐量(每个channel有一个这样的buffer)。
     * 初始大小是32(32个元素,不是指字节),但是如果超过32就会翻倍,一直增长。
     * 大部分时候是没有什么问题的,但是在碰到对端非常慢(对端慢指的是对端处理TCP包的速度变慢,比如对端负载特别高的时候就有可能是这个情况)的时候就有问题了,
     * 这个时候如果还是不断地写数据,这个buffer就会不断地增长,最后就有可能出问题了(我们的情况是开始吃swap,最后进程被linux killer干掉了)。
     * 为什么说这个地方是坑呢,因为大部分时候我们往一个channel写数据会判断channel是否active,但是往往忽略了这种慢的情况。
     *
     * 那这个问题怎么解决呢?其实ChannelOutboundBuffer虽然无界,但是可以给它配置一个高水位线和低水位线,
     * 当buffer的大小超过高水位线的时候对应channel的isWritable就会变成false,
     * 当buffer的大小低于低水位线的时候,isWritable就会变成true。所以应用应该判断isWritable,如果是false就不要再写数据了。
     * 高水位线和低水位线是字节数,默认高水位是64K,低水位是32K,我们可以根据我们的应用需要支持多少连接数和系统资源进行合理规划。
     */
    b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
            connect_server_low, connect_server_high
    ));
}
 
源代码20 项目: lannister   文件: MqttServer.java
private void executeBootstrap(ScheduledExecutor scheduledExecutor, int port, boolean useWebSocket, boolean useSsl)
		throws InterruptedException {
	ServerBootstrap bootstrap = new ServerBootstrap();

	Class<? extends ServerChannel> serverChannelClass;

	if (Literals.NETTY_EPOLL.equals(Settings.INSTANCE.nettyTransportMode())) {
		serverChannelClass = EpollServerSocketChannel.class;
	}
	else {
		serverChannelClass = NioServerSocketChannel.class;
	}

	bootstrap = bootstrap.group(bossGroup, workerGroup).channel(serverChannelClass);
	bootstrap.option(ChannelOption.TCP_NODELAY, true);

	if (scheduledExecutor != null) {
		bootstrap.handler(scheduledExecutor);
	}

	bootstrap.childHandler(new MqttChannelInitializer(useWebSocket, useSsl));

	bootstrap.childOption(ChannelOption.TCP_NODELAY, true)
			// setting buffer size can improve I/O
			.childOption(ChannelOption.SO_SNDBUF, 1048576).childOption(ChannelOption.SO_RCVBUF, 1048576)
			// recommended in
			// http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#11.0
			.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024))
			.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

	bootstrap.bind(port).sync();
}
 
源代码21 项目: shardingsphere   文件: ShardingSphereProxy.java
private void groupsEpoll(final ServerBootstrap bootstrap) {
    workerGroup = new EpollEventLoopGroup();
    bootstrap.group(bossGroup, workerGroup)
            .channel(EpollServerSocketChannel.class)
            .option(EpollChannelOption.SO_BACKLOG, 128)
            .option(EpollChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024 * 1024, 16 * 1024 * 1024))
            .option(EpollChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childOption(EpollChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childOption(EpollChannelOption.TCP_NODELAY, true)
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new ServerHandlerInitializer());
}
 
源代码22 项目: shardingsphere   文件: ShardingSphereProxy.java
private void groupsNio(final ServerBootstrap bootstrap) {
    workerGroup = new NioEventLoopGroup();
    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 128)
            .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024 * 1024, 16 * 1024 * 1024))
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new ServerHandlerInitializer());
}
 
源代码23 项目: spring-boot-netty   文件: ChannelOptionsTest.java
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void testOptionsHaveCorrectTypes() throws Exception {
    final ServerBootstrap bootstrap = new ServerBootstrap();
    final ChannelOptions options = new ChannelOptions();

    options.setAllocator(new PooledByteBufAllocator());
    options.setRecvBufAllocator(new AdaptiveRecvByteBufAllocator());
    options.setConnectTimeout(1);
    options.setWriteSpinCount(1);
    options.setWriteBufferWaterMark(new WriteBufferWaterMark(8192, 32768));
    options.setAllowHalfClosure(true);
    options.setAutoRead(true);
    options.setSoBroadcast(true);
    options.setSoKeepAlive(true);
    options.setSoReuseAddr(true);
    options.setSoSndBuf(8192);
    options.setSoRcvBuf(8192);
    options.setSoLinger(0);
    options.setSoBacklog(0);
    options.setSoTimeout(0);
    options.setIpTos(0);
    options.setIpMulticastAddr(getLoopbackAddress());
    options.setIpMulticastIf(getNetworkInterfaces().nextElement());
    options.setIpMulticastTtl(300);
    options.setIpMulticastLoopDisabled(true);
    options.setTcpNodelay(true);

    final Map<ChannelOption, Object> channelOptionMap = options.get();
    channelOptionMap.forEach((key, value) -> {
        bootstrap.option(key, value);
        bootstrap.childOption(key, value);
    });
}
 
源代码24 项目: Jupiter   文件: NettyDomainAcceptor.java
@Override
protected void setOptions() {
    super.setOptions();

    ServerBootstrap boot = bootstrap();

    // child options
    NettyConfig.NettyDomainConfigGroup.ChildConfig child = configGroup.child();

    WriteBufferWaterMark waterMark =
            createWriteBufferWaterMark(child.getWriteBufferLowWaterMark(), child.getWriteBufferHighWaterMark());

    boot.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, waterMark);
}
 
源代码25 项目: tchannel-java   文件: TChannel.java
private @NotNull Bootstrap bootstrap(@NotNull TChannel topChannel) {
    return new Bootstrap()
        .group(this.childGroup)
        .channel(useEpoll ? EpollSocketChannel.class : NioSocketChannel.class)
        .handler(this.channelInitializer(false, topChannel))
        .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
        .option(ChannelOption.WRITE_BUFFER_WATER_MARK,
                new WriteBufferWaterMark(WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_HIGH_WATER_MARK))
        .validate();
}
 
源代码26 项目: tchannel-java   文件: TChannel.java
private @NotNull ServerBootstrap serverBootstrap(@NotNull TChannel topChannel) {
    return new ServerBootstrap()
        .group(this.bossGroup, this.childGroup)
        .channel(useEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
        .handler(new LoggingHandler(LogLevel.INFO))
        .option(ChannelOption.SO_BACKLOG, 128)
        .childHandler(this.channelInitializer(true, topChannel))
        .childOption(ChannelOption.SO_KEEPALIVE, true)
        .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
        .childOption(
            ChannelOption.WRITE_BUFFER_WATER_MARK,
            new WriteBufferWaterMark(WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_HIGH_WATER_MARK)
        )
        .validate();
}
 
源代码27 项目: atomix   文件: NettyMessagingService.java
/**
 * Bootstraps a new channel to the given address.
 *
 * @param address the address to which to connect
 * @return a future to be completed with the connected channel
 */
private CompletableFuture<Channel> bootstrapClient(Address address) {
  CompletableFuture<Channel> future = new OrderedFuture<>();
  final InetAddress resolvedAddress = address.address(true);
  if (resolvedAddress == null) {
    future.completeExceptionally(new IllegalStateException("Failed to bootstrap client (address "
        + address.toString() + " cannot be resolved)"));
    return future;
  }

  Bootstrap bootstrap = new Bootstrap();
  bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
  bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
      new WriteBufferWaterMark(10 * 32 * 1024, 10 * 64 * 1024));
  bootstrap.option(ChannelOption.SO_RCVBUF, 1024 * 1024);
  bootstrap.option(ChannelOption.SO_SNDBUF, 1024 * 1024);
  bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
  bootstrap.option(ChannelOption.TCP_NODELAY, true);
  bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
  bootstrap.group(clientGroup);
  // TODO: Make this faster:
  // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
  bootstrap.channel(clientChannelClass);
  bootstrap.remoteAddress(resolvedAddress, address.port());
  if (enableNettyTls) {
    try {
      bootstrap.handler(new SslClientChannelInitializer(future, address));
    } catch (SSLException e) {
      return Futures.exceptionalFuture(e);
    }
  } else {
    bootstrap.handler(new BasicClientChannelInitializer(future));
  }
  bootstrap.connect().addListener(f -> {
    if (!f.isSuccess()) {
      future.completeExceptionally(f.cause());
    }
  });
  return future;
}
 
源代码28 项目: xio   文件: Connector.java
private Bootstrap buildBootstrap() {
  Bootstrap bootstrap = new Bootstrap();
  // TODO(CK): move all of these constants out into Config
  bootstrap
      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 500)
      .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
      .option(
          ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024))
      .option(ChannelOption.AUTO_READ, true)
      .group(group())
      .channel(channel());
  return configure(bootstrap);
}
 
源代码29 项目: xio   文件: XioClientBootstrap.java
public Bootstrap buildBootstrap(ChannelConfiguration channelConfig) {
  return new Bootstrap()
      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 500)
      .option(ChannelOption.SO_REUSEADDR, true)
      .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
      .option(
          ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024))
      .option(ChannelOption.TCP_NODELAY, true)
      .group(channelConfig.workerGroup())
      .channel(channelConfig.channel());
}
 
源代码30 项目: pinpoint   文件: ServerFactory.java
private void setupServerOption(final NettyServerBuilder builder) {
    // TODO @see PinpointServerAcceptor
    builder.withChildOption(ChannelOption.TCP_NODELAY, true);
    builder.withChildOption(ChannelOption.SO_REUSEADDR, true);
    builder.withChildOption(ChannelOption.SO_RCVBUF, this.serverOption.getReceiveBufferSize());
    final WriteBufferWaterMark disabledWriteBufferWaterMark = new WriteBufferWaterMark(0, Integer.MAX_VALUE);
    builder.withChildOption(ChannelOption.WRITE_BUFFER_WATER_MARK, disabledWriteBufferWaterMark);

    builder.handshakeTimeout(this.serverOption.getHandshakeTimeout(), TimeUnit.MILLISECONDS);
    builder.flowControlWindow(this.serverOption.getFlowControlWindow());

    builder.maxInboundMessageSize(this.serverOption.getMaxInboundMessageSize());
    builder.maxHeaderListSize(this.serverOption.getMaxHeaderListSize());

    builder.keepAliveTime(this.serverOption.getKeepAliveTime(), TimeUnit.MILLISECONDS);
    builder.keepAliveTimeout(this.serverOption.getKeepAliveTimeout(), TimeUnit.MILLISECONDS);
    builder.permitKeepAliveTime(this.serverOption.getPermitKeepAliveTime(), TimeUnit.MILLISECONDS);
    builder.permitKeepAliveWithoutCalls(this.serverOption.isPermitKeepAliveWithoutCalls());

    builder.maxConnectionIdle(this.serverOption.getMaxConnectionIdle(), TimeUnit.MILLISECONDS);
    builder.maxConnectionAge(this.serverOption.getMaxConnectionAge(), TimeUnit.MILLISECONDS);
    builder.maxConnectionAgeGrace(this.serverOption.getMaxConnectionAgeGrace(), TimeUnit.MILLISECONDS);
    builder.maxConcurrentCallsPerConnection(this.serverOption.getMaxConcurrentCallsPerConnection());
    if (logger.isInfoEnabled()) {
        logger.info("Set serverOption {}. name={}, hostname={}, port={}", serverOption, name, hostname, port);
    }
}