类 io.netty.handler.codec.LineBasedFrameDecoder 源码实例Demo

下面列出了怎么用 io.netty.handler.codec.LineBasedFrameDecoder 的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: netty-4.1.22   文件: RxtxClient.java

public static void main(String[] args) throws Exception {
    EventLoopGroup group = new OioEventLoopGroup();
    try {
        Bootstrap b = new Bootstrap();
        b.group(group)
         .channel(RxtxChannel.class)
         .handler(new ChannelInitializer<RxtxChannel>() {
             @Override
             public void initChannel(RxtxChannel ch) throws Exception {
                 ch.pipeline().addLast(
                     new LineBasedFrameDecoder(32768),
                     new StringEncoder(),
                     new StringDecoder(),
                     new RxtxClientHandler()
                 );
             }
         });

        ChannelFuture f = b.connect(new RxtxDeviceAddress(PORT)).sync();

        f.channel().closeFuture().sync();
    } finally {
        group.shutdownGracefully();
    }
}
 
源代码2 项目: reactor-netty   文件: ConnectionTest.java

@Test
public void addByteDecoderWhenNoLeft() {

	channel.pipeline()
	       .addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() {
	       });
	ChannelHandler decoder = new LineBasedFrameDecoder(12);

	testContext.addHandlerLast("decoder", decoder)
	           .addHandlerFirst("decoder$extract",
			           NettyPipeline.inboundHandler(ADD_EXTRACTOR));

	assertEquals(channel.pipeline()
	                    .names(),
			Arrays.asList("decoder$extract",
					"decoder",
					NettyPipeline.ReactiveBridge,
					"DefaultChannelPipeline$TailContext#0"));
}
 
源代码3 项目: reactor-netty   文件: ConnectionTest.java

@Test
public void addByteDecoderWhenNoRight() {

	channel.pipeline()
	       .addLast(NettyPipeline.HttpCodec, new ChannelHandlerAdapter() {
	       });
	ChannelHandler decoder = new LineBasedFrameDecoder(12);

	testContext.addHandlerLast("decoder", decoder)
	           .addHandlerFirst("decoder$extract",
			           NettyPipeline.inboundHandler(ADD_EXTRACTOR));

	assertEquals(channel.pipeline()
	                    .names(),
			Arrays.asList(NettyPipeline.HttpCodec,
					"decoder$extract",
					"decoder",
					"DefaultChannelPipeline$TailContext#0"));
}
 
源代码4 项目: reactor-netty   文件: ConnectionTest.java

@Test
public void addByteDecoderWhenFullReactorPipeline() {

	channel.pipeline()
	       .addLast(NettyPipeline.HttpCodec, new HttpServerCodec())
	       .addLast(NettyPipeline.HttpTrafficHandler, new ChannelDuplexHandler())
	       .addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() {
	       });
	ChannelHandler decoder = new LineBasedFrameDecoder(12);

	testContext.addHandlerLast("decoder", decoder)
	           .addHandlerFirst("decoder$extract",
			           NettyPipeline.inboundHandler(ADD_EXTRACTOR));

	assertEquals(channel.pipeline()
	                    .names(),
			Arrays.asList(NettyPipeline.HttpCodec,
					NettyPipeline.HttpTrafficHandler,
					"decoder$extract",
					"decoder",
					NettyPipeline.ReactiveBridge,
					"DefaultChannelPipeline$TailContext#0"));
}
 
源代码5 项目: reactor-netty   文件: ConnectionTest.java

@Test
public void addByteEncoderWhenNoLeft() {

	channel.pipeline()
	       .addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() {
	       });
	ChannelHandler encoder = new LineBasedFrameDecoder(12);

	testContext.addHandlerFirst("encoder", encoder);

	assertEquals(channel.pipeline()
	                    .names(),
			Arrays.asList("encoder",
					NettyPipeline.ReactiveBridge,
					"DefaultChannelPipeline$TailContext#0"));
}
 
源代码6 项目: reactor-netty   文件: ConnectionTest.java

@Test
public void addByteEncoderWhenNoRight() {

	channel.pipeline()
	       .addLast(NettyPipeline.HttpCodec, new ChannelHandlerAdapter() {
	       });
	ChannelHandler encoder = new LineBasedFrameDecoder(12);

	testContext.addHandlerFirst("encoder", encoder);

	assertEquals(channel.pipeline()
	                    .names(),
			Arrays.asList(NettyPipeline.HttpCodec,
					"encoder",
					"DefaultChannelPipeline$TailContext#0"));
}
 
源代码7 项目: reactor-netty   文件: ConnectionTest.java

@Test
public void addByteEncoderWhenFullReactorPipeline() {

	channel.pipeline()
	       .addLast(NettyPipeline.HttpCodec, new HttpServerCodec())
	       .addLast(NettyPipeline.HttpTrafficHandler, new ChannelDuplexHandler())
	       .addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() {
	       });
	ChannelHandler encoder = new LineBasedFrameDecoder(12);

	testContext.addHandlerFirst("encoder", encoder);

	assertEquals(channel.pipeline()
	                    .names(),
			Arrays.asList(NettyPipeline.HttpCodec,
					NettyPipeline.HttpTrafficHandler,
					"encoder",
					NettyPipeline.ReactiveBridge,
					"DefaultChannelPipeline$TailContext#0"));
}
 
源代码8 项目: reactor-netty   文件: ConnectionTest.java

@Test
public void addSeveralByteEncodersWhenCodec() {
	ChannelHandler encoder1 = new LineBasedFrameDecoder(12);
	ChannelHandler encoder2 = new LineBasedFrameDecoder(13);

	channel.pipeline()
	       .addLast(NettyPipeline.HttpCodec, new HttpServerCodec())
	       .addLast(NettyPipeline.HttpTrafficHandler, new ChannelDuplexHandler())
	       .addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() {
	       });

	testContext.addHandlerFirst("encoder1", encoder1)
	           .addHandlerFirst("encoder2", encoder2);

	assertEquals(channel.pipeline()
	                    .names(),
			Arrays.asList(NettyPipeline.HttpCodec,
					NettyPipeline.HttpTrafficHandler,
					"encoder2",
					"encoder1",
					NettyPipeline.ReactiveBridge,
					"DefaultChannelPipeline$TailContext#0"));
}
 
源代码9 项目: reactor-netty   文件: TcpClientTests.java

@Test
	public void tcpClientHandlesLineFeedDataFixedPool() throws InterruptedException {
		Consumer<? super Connection> channelInit = c -> c
				.addHandler("codec",
				            new LineBasedFrameDecoder(8 * 1024));

//		ConnectionProvider p = ConnectionProvider.fixed
//				("tcpClientHandlesLineFeedDataFixedPool", 1);

		ConnectionProvider p = ConnectionProvider.newConnection();

		tcpClientHandlesLineFeedData(
				TcpClient.create(p)
				         .host("localhost")
				         .port(echoServerPort)
				         .doOnConnected(channelInit)
		);

	}
 
源代码10 项目: nomulus   文件: WhoisProtocolModule.java

@Provides
@WhoisProtocol
static ImmutableList<Provider<? extends ChannelHandler>> provideHandlerProviders(
    Provider<ProxyProtocolHandler> proxyProtocolHandlerProvider,
    @WhoisProtocol Provider<ReadTimeoutHandler> readTimeoutHandlerProvider,
    Provider<LineBasedFrameDecoder> lineBasedFrameDecoderProvider,
    Provider<WhoisServiceHandler> whoisServiceHandlerProvider,
    Provider<FrontendMetricsHandler> frontendMetricsHandlerProvider,
    Provider<WhoisQuotaHandler> whoisQuotaHandlerProvider,
    Provider<FullHttpRequestRelayHandler> relayHandlerProvider) {
  return ImmutableList.of(
      proxyProtocolHandlerProvider,
      readTimeoutHandlerProvider,
      lineBasedFrameDecoderProvider,
      whoisServiceHandlerProvider,
      frontendMetricsHandlerProvider,
      whoisQuotaHandlerProvider,
      relayHandlerProvider);
}
 
源代码11 项目: netty4.0.27Learn   文件: RxtxClient.java

public static void main(String[] args) throws Exception {
    EventLoopGroup group = new OioEventLoopGroup();
    try {
        Bootstrap b = new Bootstrap();
        b.group(group)
         .channel(RxtxChannel.class)
         .handler(new ChannelInitializer<RxtxChannel>() {
             @Override
             public void initChannel(RxtxChannel ch) throws Exception {
                 ch.pipeline().addLast(
                     new LineBasedFrameDecoder(32768),
                     new StringEncoder(),
                     new StringDecoder(),
                     new RxtxClientHandler()
                 );
             }
         });

        ChannelFuture f = b.connect(new RxtxDeviceAddress(PORT)).sync();

        f.channel().closeFuture().sync();
    } finally {
        group.shutdownGracefully();
    }
}
 
源代码12 项目: dubbo-2.6.5   文件: QosProcessHandler.java

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    if (in.readableBytes() < 1) {
        return;
    }

    // read one byte to guess protocol
    final int magic = in.getByte(in.readerIndex());

    ChannelPipeline p = ctx.pipeline();
    p.addLast(new LocalHostPermitHandler(acceptForeignIp));
    if (isHttp(magic)) {
        // no welcome output for http protocol
        if (welcomeFuture != null && welcomeFuture.isCancellable()) {
            welcomeFuture.cancel(false);
        }
        p.addLast(new HttpServerCodec());
        p.addLast(new HttpObjectAggregator(1048576));
        p.addLast(new HttpProcessHandler());
        p.remove(this);
    } else {
        p.addLast(new LineBasedFrameDecoder(2048));
        p.addLast(new StringDecoder(CharsetUtil.UTF_8));
        p.addLast(new StringEncoder(CharsetUtil.UTF_8));
        p.addLast(new IdleStateHandler(0, 0, 5 * 60));
        p.addLast(new TelnetProcessHandler());
        p.remove(this);
    }
}
 
源代码13 项目: ext-opensource-netty   文件: SimpleServer.java

@Override
  protected void initSocketChannel(SocketChannel ch) {
super.initSocketChannel(ch);
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(new LogDispatchHandler());
ch.pipeline().addLast(new SimpleServerHandler());
  }
 
源代码14 项目: ext-opensource-netty   文件: SimpleClient.java

@Override
protected void initSocketChannel(SocketChannel ch) {
	super.initSocketChannel(ch);
	ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); 
	ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
	ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
	///ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));

	ch.pipeline().addLast(new LogDispatchHandler());
	ch.pipeline().addLast(new ClientSimpleHandler());
	///ch.pipeline().addLast(new ClientSimpleHandlerX());
}
 
源代码15 项目: netty-4.1.22   文件: ProxyHandlerTest.java

@Override
protected void test() throws Exception {
    final SuccessTestHandler testHandler = new SuccessTestHandler();
    Bootstrap b = new Bootstrap();
    b.group(group);
    b.channel(NioSocketChannel.class);
    b.option(ChannelOption.AUTO_READ, ThreadLocalRandom.current().nextBoolean());
    b.resolver(NoopAddressResolverGroup.INSTANCE);
    b.handler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline p = ch.pipeline();
            p.addLast(clientHandlers);
            p.addLast(new LineBasedFrameDecoder(64));
            p.addLast(testHandler);
        }
    });

    boolean finished = b.connect(destination).channel().closeFuture().await(10, TimeUnit.SECONDS);

    logger.debug("Received messages: {}", testHandler.received);

    if (testHandler.exceptions.isEmpty()) {
        logger.debug("No recorded exceptions on the client side.");
    } else {
        for (Throwable t : testHandler.exceptions) {
            logger.debug("Recorded exception on the client side: {}", t);
        }
    }

    assertProxyHandlers(true);

    assertThat(testHandler.received.toArray(), is(new Object[] { "0", "1", "2", "3" }));
    assertThat(testHandler.exceptions.toArray(), is(EmptyArrays.EMPTY_OBJECTS));
    assertThat(testHandler.eventCount, is(expectedEventCount));
    assertThat(finished, is(true));
}
 
源代码16 项目: netty-4.1.22   文件: ProxyHandlerTest.java

@Override
protected void test() throws Exception {
    final FailureTestHandler testHandler = new FailureTestHandler();
    Bootstrap b = new Bootstrap();
    b.group(group);
    b.channel(NioSocketChannel.class);
    b.resolver(NoopAddressResolverGroup.INSTANCE);
    b.handler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline p = ch.pipeline();
            p.addLast(clientHandlers);
            p.addLast(new LineBasedFrameDecoder(64));
            p.addLast(testHandler);
        }
    });

    boolean finished = b.connect(destination).channel().closeFuture().await(10, TimeUnit.SECONDS);
    finished &= testHandler.latch.await(10, TimeUnit.SECONDS);

    logger.debug("Recorded exceptions: {}", testHandler.exceptions);

    assertProxyHandlers(false);

    assertThat(testHandler.exceptions.size(), is(1));
    Throwable e = testHandler.exceptions.poll();
    assertThat(e, is(instanceOf(ProxyConnectException.class)));
    assertThat(String.valueOf(e), containsString(expectedMessage));
    assertThat(finished, is(true));
}
 
源代码17 项目: netty-4.1.22   文件: ProxyHandlerTest.java

@Override
protected void test() throws Exception {
    final long TIMEOUT = 2000;
    for (ChannelHandler h: clientHandlers) {
        if (h instanceof ProxyHandler) {
            ((ProxyHandler) h).setConnectTimeoutMillis(TIMEOUT);
        }
    }

    final FailureTestHandler testHandler = new FailureTestHandler();
    Bootstrap b = new Bootstrap();
    b.group(group);
    b.channel(NioSocketChannel.class);
    b.resolver(NoopAddressResolverGroup.INSTANCE);
    b.handler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline p = ch.pipeline();
            p.addLast(clientHandlers);
            p.addLast(new LineBasedFrameDecoder(64));
            p.addLast(testHandler);
        }
    });

    ChannelFuture cf = b.connect(DESTINATION).channel().closeFuture();
    boolean finished = cf.await(TIMEOUT * 2, TimeUnit.MILLISECONDS);
    finished &= testHandler.latch.await(TIMEOUT * 2, TimeUnit.MILLISECONDS);

    logger.debug("Recorded exceptions: {}", testHandler.exceptions);

    assertProxyHandlers(false);

    assertThat(testHandler.exceptions.size(), is(1));
    Throwable e = testHandler.exceptions.poll();
    assertThat(e, is(instanceOf(ProxyConnectException.class)));
    assertThat(String.valueOf(e), containsString("timeout"));
    assertThat(finished, is(true));
}
 
源代码18 项目: netty-4.1.22   文件: HttpProxyServer.java

private boolean authenticate(ChannelHandlerContext ctx, FullHttpRequest req) {
    assertThat(req.method(), is(HttpMethod.CONNECT));

    if (testMode != TestMode.INTERMEDIARY) {
        ctx.pipeline().addBefore(ctx.name(), "lineDecoder", new LineBasedFrameDecoder(64, false, true));
    }

    ctx.pipeline().remove(HttpObjectAggregator.class);
    ctx.pipeline().get(HttpServerCodec.class).removeInboundHandler();

    boolean authzSuccess = false;
    if (username != null) {
        CharSequence authz = req.headers().get(HttpHeaderNames.PROXY_AUTHORIZATION);
        if (authz != null) {
            String[] authzParts = authz.toString().split(" ", 2);
            ByteBuf authzBuf64 = Unpooled.copiedBuffer(authzParts[1], CharsetUtil.US_ASCII);
            ByteBuf authzBuf = Base64.decode(authzBuf64);

            String expectedAuthz = username + ':' + password;
            authzSuccess = "Basic".equals(authzParts[0]) &&
                           expectedAuthz.equals(authzBuf.toString(CharsetUtil.US_ASCII));

            authzBuf64.release();
            authzBuf.release();
        }
    } else {
        authzSuccess = true;
    }

    return authzSuccess;
}
 
源代码19 项目: netty-4.1.22   文件: Socks4ProxyServer.java

private boolean authenticate(ChannelHandlerContext ctx, Socks4CommandRequest req) {
    assertThat(req.type(), is(Socks4CommandType.CONNECT));

    if (testMode != TestMode.INTERMEDIARY) {
        ctx.pipeline().addBefore(ctx.name(), "lineDecoder", new LineBasedFrameDecoder(64, false, true));
    }

    boolean authzSuccess;
    if (username != null) {
        authzSuccess = username.equals(req.userId());
    } else {
        authzSuccess = true;
    }
    return authzSuccess;
}
 
源代码20 项目: java-study   文件: NettyServerDemo2.java

public void start(){  
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);  
        EventLoopGroup workerGroup = new NioEventLoopGroup();  
        try {  
            ServerBootstrap sbs = new ServerBootstrap().group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))  
                    .childHandler(new ChannelInitializer<SocketChannel>() {  
                        protected void initChannel(SocketChannel ch) throws Exception { 
                        	ChannelPipeline p = ch.pipeline(); 
                         
                          p.addLast(new LineBasedFrameDecoder(2048));		//字节解码器 ,其中2048是规定一行数据最大的字节数。  用于解决拆包问题
//                          p.addLast(new FixedLengthFrameDecoder(100));   //定长数据帧的解码器 ,每帧数据100个字节就切分一次。  用于解决粘包问题        
//                          p.addLast(new DelimiterBasedFrameDecoder(1024,Unpooled.copiedBuffer("~_~".getBytes()))); //固定字符切分解码器 ,会以"~_~"为分隔符。  注意此方法要放到StringDecoder()上面
                            p.addLast(new StringDecoder());     //设置解码器
                            p.addLast(new NettyServerHandlerDemo2());   //绑定自定义事物
                        };  
                          
                    }).option(ChannelOption.SO_BACKLOG, 128)     
                    .childOption(ChannelOption.SO_KEEPALIVE, true);  
             // 绑定端口,开始接收进来的连接  
             ChannelFuture future = sbs.bind(port).sync();    
               
             System.out.println("服务端启动成功,端口为 :" + port );  
             future.channel().closeFuture().sync();  
        } catch (Exception e) {  
            bossGroup.shutdownGracefully();  //关闭EventLoopGroup,释放掉所有资源包括创建的线程
            workerGroup.shutdownGracefully();  //关闭EventLoopGroup,释放掉所有资源包括创建的线程
        }  
    }
 
源代码21 项目: hermes   文件: ShutdownRequestMonitor.java

public void start() {
	ServerBootstrap b = new ServerBootstrap();

	b.group(m_bossGroup, m_workerGroup)//
	      .channel(NioServerSocketChannel.class)//
	      .childHandler(new ChannelInitializer<SocketChannel>() {
		      @Override
		      public void initChannel(SocketChannel ch) throws Exception {
			      ch.pipeline().addLast(new LineBasedFrameDecoder(10),//
			            new StringDecoder(Charsets.UTF_8),//
			            new IdleStateHandler(0, 0, MAX_IDLE_SECONDS),//
			            new ShutdownRequestInboundHandler());
		      }
	      }).option(ChannelOption.SO_BACKLOG, 128) // TODO set tcp options
	      .childOption(ChannelOption.SO_KEEPALIVE, true);

	// Bind and start to accept incoming connections.
	ChannelFuture f = b.bind(m_config.getShutdownRequestPort());

	f.addListener(new ChannelFutureListener() {

		@Override
		public void operationComplete(ChannelFuture future) throws Exception {
			if (future.isSuccess()) {
				log.info("Broker shutdown port is {}.", m_config.getShutdownRequestPort());
			} else {
				log.error("Failed to listen shutdown port {}.", m_config.getShutdownRequestPort());
			}

		}
	});
}
 

/**
 * 채널 파이프라인 설정.
 * Netty.Server.Configuration.NettyServerConfiguration 에서 등록한 Bean 을 이용해 사용자의 통신을 처리할 Handler 도 등록.
 * Netty.Server.Handler.JsonHandler 에서 실제 사용자 요청 처리.
 *
 * @param channel
 * @throws Exception
 */
@Override
protected void initChannel(Channel channel) throws Exception {

	ChannelPipeline channelPipeline = channel.pipeline();

	switch (transferType) {

		case "websocket":

			channelPipeline
					.addLast(new HttpServerCodec())
					.addLast(new HttpObjectAggregator(65536))
					.addLast(new WebSocketServerCompressionHandler())
					.addLast(new WebSocketServerProtocolHandler(transferWebsocketPath, transferWebsocketSubProtocol, transferWebsocketAllowExtensions))
					.addLast(new LoggingHandler(LogLevel.valueOf(logLevelPipeline)))
					.addLast(websocketHandler);

		case "tcp":
		default:

			channelPipeline
					.addLast(new LineBasedFrameDecoder(Integer.MAX_VALUE))
					.addLast(STRING_DECODER)
					.addLast(STRING_ENCODER)
					.addLast(new LoggingHandler(LogLevel.valueOf(logLevelPipeline)))
					.addLast(jsonHandler);

	}

}
 
源代码23 项目: reactor-netty   文件: ConnectionTest.java

@Test
public void addByteDecoderWhenEmptyPipeline() {

	ChannelHandler decoder = new LineBasedFrameDecoder(12);

	testContext.addHandlerLast("decoder", decoder)
	           .addHandlerFirst("decoder$extract",
			           NettyPipeline.inboundHandler(ADD_EXTRACTOR));

	assertEquals(channel.pipeline()
	                    .names(),
			Arrays.asList("decoder$extract",
					"decoder",
					"DefaultChannelPipeline$TailContext#0"));
}
 
源代码24 项目: reactor-netty   文件: ConnectionTest.java

@Test
public void addSeveralByteDecodersWhenCodec() {
	ChannelHandler decoder1 = new LineBasedFrameDecoder(12);
	ChannelHandler decoder2 = new LineBasedFrameDecoder(13);

	channel.pipeline()
	       .addLast(NettyPipeline.HttpCodec, new HttpServerCodec())
	       .addLast(NettyPipeline.HttpTrafficHandler, new ChannelDuplexHandler())
	       .addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() {
	       });

	testContext.addHandlerLast("decoder1$extract",
			NettyPipeline.inboundHandler(ADD_EXTRACTOR))
	           .addHandlerLast("decoder1", decoder1)

	           .addHandlerLast("decoder2$extract",
			           NettyPipeline.inboundHandler(ADD_EXTRACTOR))
	           .addHandlerLast("decoder2", decoder2);

	assertEquals(channel.pipeline()
	                    .names(),
			Arrays.asList(NettyPipeline.HttpCodec,
					NettyPipeline.HttpTrafficHandler,
					"decoder1$extract",
					"decoder1",
					"decoder2$extract",
					"decoder2",
					NettyPipeline.ReactiveBridge,
					"DefaultChannelPipeline$TailContext#0"));
}
 
源代码25 项目: reactor-netty   文件: ConnectionTest.java

@Test
public void addByteEncoderWhenEmptyPipeline() {

	ChannelHandler encoder = new LineBasedFrameDecoder(12);

	testContext.addHandlerFirst("encoder", encoder);

	assertEquals(channel.pipeline()
	                    .names(),
			Arrays.asList("encoder", "DefaultChannelPipeline$TailContext#0"));
}
 
源代码26 项目: reactor-netty   文件: ConnectionTest.java

@Test
public void addDecoderSkipsIfExist() {
	channel.pipeline()
	       .addFirst("foo", new Utf8FrameValidator());

	testContext.addHandlerFirst("foo", new LineBasedFrameDecoder(10));

	assertEquals(channel.pipeline()
	                    .names(),
			Arrays.asList("foo", "DefaultChannelPipeline$TailContext#0"));
	MatcherAssert.assertThat(channel.pipeline()
	                  .get("foo"), is(instanceOf(Utf8FrameValidator.class)));
}
 
源代码27 项目: reactor-netty   文件: ConnectionTest.java

@Test
public void addEncoderSkipsIfExist() {
	channel.pipeline()
	       .addFirst("foo", new Utf8FrameValidator());

	testContext.addHandlerFirst("foo", new LineBasedFrameDecoder(10));

	assertEquals(channel.pipeline()
	                    .names(),
			Arrays.asList("foo", "DefaultChannelPipeline$TailContext#0"));
	MatcherAssert.assertThat(channel.pipeline()
	                  .get("foo"), is(instanceOf(Utf8FrameValidator.class)));
}
 
源代码28 项目: reactor-netty   文件: TcpClientTests.java

@Test
public void tcpClientHandlesLineFeedData() throws InterruptedException {
	final int messages = 100;
	final CountDownLatch latch = new CountDownLatch(messages);
	final List<String> strings = new ArrayList<>();

	Connection client =
			TcpClient.create()
			         .host("localhost")
			         .port(echoServerPort)
			         .doOnConnected(c -> c.addHandlerLast("codec",
					                                 new LineBasedFrameDecoder(8 * 1024)))
			         .handle((in, out) ->
				        out.sendString(Flux.range(1, messages)
				                            .map(i -> "Hello World!" + i + "\n")
				                            .subscribeOn(Schedulers.parallel()))
				            .then( in.receive()
				                     .asString()
				                     .take(100)
				                     .flatMapIterable(s -> Arrays.asList(s.split("\\n")))
				                     .doOnNext(s -> {
					                     strings.add(s);
					                     latch.countDown();
				                     }).then())
			         )
			         .wiretap(true)
			         .connectNow(Duration.ofSeconds(15));

	assertTrue("Expected messages not received. Received " + strings.size() + " messages: " + strings,
			latch.await(15, TimeUnit.SECONDS));

	assertEquals(messages, strings.size());
	client.disposeNow();
}
 
源代码29 项目: reactor-netty   文件: TcpClientTests.java

@Test
public void tcpClientHandlesLineFeedDataElasticPool() throws InterruptedException {
	Consumer<? super Connection> channelInit = c -> c
			.addHandler("codec",
			            new LineBasedFrameDecoder(8 * 1024));

	tcpClientHandlesLineFeedData(
			TcpClient.create(ConnectionProvider.create("tcpClientHandlesLineFeedDataElasticPool", Integer.MAX_VALUE))
			         .host("localhost")
			         .port(echoServerPort)
			         .doOnConnected(channelInit)
	);
}
 
源代码30 项目: reactor-netty   文件: HttpServerTests.java

@Test
public void flushOnComplete() {

	Flux<String> flux = Flux.range(0, 100)
	                        .map(n -> String.format("%010d", n));
	List<String> test =
			flux.collectList()
			    .block();
	assertThat(test).isNotNull();

	disposableServer = HttpServer.create()
	                             .port(0)
	                             .handle((req, resp) -> resp.sendString(flux.map(s -> s + "\n")))
	                             .wiretap(true)
	                             .bindNow();

	Flux<String> client = HttpClient.create()
	                                .port(disposableServer.port())
	                                .wiretap(true)
	                                .doOnConnected(res ->
	                                        res.addHandler(new LineBasedFrameDecoder(10)))
	                                .get()
	                                .uri("/")
	                                .responseContent()
	                                .asString();

	StepVerifier.create(client)
	            .expectNextSequence(test)
	            .expectComplete()
	            .verify(Duration.ofSeconds(30));
}
 
 类所在包
 类方法
 同包方法