io.netty.channel.socket.SocketChannel#pipeline ( )源码实例Demo

下面列出了io.netty.channel.socket.SocketChannel#pipeline ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Override
public void initChannel(SocketChannel ch) {
    ChannelPipeline pipeline = ch.pipeline();

    if (sslCtx != null) {
        pipeline.addLast(sslCtx.newHandler(ch.alloc()));
    }

    pipeline.addLast(new HttpRequestDecoder());
    pipeline.addLast(new HttpResponseEncoder());

    // Remove the following line if you don't want automatic content compression.
    pipeline.addLast(new HttpContentCompressor());

    pipeline.addLast(new HttpUploadServerHandler());
}
 
private ChannelInitializer<SocketChannel> newChannelInitializer(final NegotiateConfig config,
		final ExchangeChannelGroup channelGroup, final EventExecutorGroup executorGroup) {
	return new ChannelInitializer<SocketChannel>() {
		@Override
		protected void initChannel(SocketChannel ch) throws Exception {
			ChannelPipeline pipeline = ch.pipeline();
			ch.attr(ChannelAttrKeys.maxIdleTimeout).set(config.maxIdleTimeout());
			ch.attr(ChannelAttrKeys.channelGroup).set(channelGroup);
			ch.attr(ChannelAttrKeys.clientSide).set(true);
			ch.attr(OneTime.awaitNegotiate).set(new CountDownLatch(1));
			ch.attr(OneTime.channelConfig).set(config);
			// TODO should increase ioRatio when every ChannelHandler bind to executorGroup?
			pipeline.addLast(executorGroup, 
					RemotingEncoder.INSTANCE, 
					new RemotingDecoder(), 
					new IdleStateHandler(config.idleTimeout(), 0, 0), 
					HeartbeatChannelHandler.INSTANCE,
					NegotiateChannelHandler.INSTANCE,
					ConcreteRequestHandler.INSTANCE);
		}
	};
}
 
源代码3 项目: iot-dc   文件: PortListenerAbstract.java
@Override
        protected void initChannel(SocketChannel socketChannel) {
            ChannelPipeline pipeline = socketChannel.pipeline();
            pipeline
                    // log
                    .addLast("logging", new LoggingHandler(LogLevel.INFO))
                    // 心跳检测
//                    .addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS))
//                    .addLast(new HeartBeatHandler())
                    //  链路管理
                    .addLast(new ChannelManagerHandler())
            ;
            // 拓展
            extHandler(socketChannel.pipeline());
            pipeline.addLast(new MqHandler())
                    // 异常管理
                    .addLast(new ExceptionHandler())
            ;
        }
 
源代码4 项目: SI   文件: HttpServerInitializer.java
@Override
public void initChannel(SocketChannel ch) {
	ChannelPipeline pipeline = ch.pipeline();
	if (sslCtx != null) {
		pipeline.addLast(sslCtx.newHandler(ch.alloc()));
	}
	pipeline.addLast(new HttpResponseEncoder());
	pipeline.addLast(new HttpRequestDecoder());
	// Uncomment the following line if you don't want to handle HttpChunks.
	//pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
	//p.addLast(new HttpObjectAggregator(1048576));
	// Remove the following line if you don't want automatic content compression.
	//pipeline.addLast(new HttpContentCompressor());
	
	// Uncomment the following line if you don't want to handle HttpContents.
	pipeline.addLast(new HttpObjectAggregator(65536));
	pipeline.addLast("readTimeoutHandler", new ReadTimeoutHandler(READ_TIMEOUT));
	pipeline.addLast("myHandler", new MyHandler());
	
	pipeline.addLast("handler", new HttpServerHandler(listener));
}
 
源代码5 项目: tinkerpop   文件: Channelizer.java
@Override
protected void initChannel(final SocketChannel socketChannel) throws Exception {
    final ChannelPipeline pipeline = socketChannel.pipeline();
    final Optional<SslContext> sslCtx;
    if (supportsSsl()) {
        try {
            sslCtx = Optional.of(cluster.createSSLContext());
        } catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    } else {
        sslCtx = Optional.empty();
    }

    if (sslCtx.isPresent()) {
        pipeline.addLast(sslCtx.get().newHandler(socketChannel.alloc(), connection.getUri().getHost(), connection.getUri().getPort()));
    }

    configure(pipeline);
    pipeline.addLast(PIPELINE_GREMLIN_SASL_HANDLER, new Handler.GremlinSaslAuthenticationHandler(cluster.authProperties()));
    pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new Handler.GremlinResponseHandler(pending));
}
 
源代码6 项目: tools-journey   文件: TelnetClientInitializer.java
@Override
public void initChannel(SocketChannel ch) {
    ChannelPipeline pipeline = ch.pipeline();

    if (sslCtx != null) {
        pipeline.addLast(sslCtx.newHandler(ch.alloc(), TelnetClient.HOST, TelnetClient.PORT));
    }

    // Add the text line codec combination first,
    pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
    pipeline.addLast(DECODER);
    pipeline.addLast(ENCODER);

    // and then business logic.
    pipeline.addLast(CLIENT_HANDLER);
}
 
源代码7 项目: x-pipe   文件: SecureChatClientInitializer.java
@Override
public void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();

    // Add SSL handler first to encrypt and decrypt everything.
    // In this example, we use a bogus certificate in the server side
    // and accept any invalid certificates in the client side.
    // You will need something more complicated to identify both
    // and server in the real world.
    pipeline.addLast(createSslHandler(getClientSSLContext()));

    // On top of the SSL handler, add the text line codec.
    pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
    pipeline.addLast(new StringDecoder());
    pipeline.addLast(new StringEncoder());

    // and then business logic.
    pipeline.addLast(new SecureChatClientHandler());
}
 
源代码8 项目: bitchat   文件: ClientInitializer.java
@Override
protected void initChannel(SocketChannel channel) throws Exception {
    ChannelPipeline pipeline = channel.pipeline();
    BaseConfig baseConfig = ConfigFactory.getConfig(BaseConfig.class);
    pipeline.addLast(new IdleStateChecker(baseConfig.readerIdleTime()));
    pipeline.addLast(new PacketCodec());
    pipeline.addLast(new HealthyChecker(client, baseConfig.pingInterval()));
    pipeline.addLast(new ClientHandler());
}
 
源代码9 项目: xian   文件: DefaultInitializer.java
@Override
public void initChannel(SocketChannel ch) throws Exception {
    final ChannelPipeline p = ch.pipeline();
    initOutboundHandlers(p);
    initInboundHandlers(p);
    initIdleStateHandlers(p);
}
 
源代码10 项目: Kepler   文件: MusChannelInitializer.java
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
    ChannelPipeline pipeline = socketChannel.pipeline();
    pipeline.addLast("frameDecoder",new LengthFieldBasedFrameDecoder(1683226630, 2, 4, 0, 0));
    pipeline.addLast("gameDecoder", new MusNetworkDecoder());
    pipeline.addLast("gameEncoder", new MusNetworkEncoder());
    pipeline.addLast("handler", new MusConnectionHandler(this.musServer));
}
 
@Override
protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();

    // WebSocket协议本身是基于HTTP协议的,所以要使用HTTP解编码器
    pipeline.addLast(new HttpServerCodec());
    // 以块的方式来写的处理器
    pipeline.addLast(new ChunkedWriteHandler());
    // Netty是基于分段请求的,HttpObjectAggregator的作用是将请求分段再聚合,参数是聚合字节的最大长度
    pipeline.addLast(new HttpObjectAggregator(8192));
    // 文本消息处理器
    pipeline.addLast(eventExecutorGroup, new TextWebSocketFrameHandler());
}
 
@Override
protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();

    pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
    pipeline.addLast("decoder", new StringDecoder());
    pipeline.addLast("encoder", new StringEncoder());
    pipeline.addLast("handler", new SimpleChatClientHandler());
}
 
源代码13 项目: HAP-Java   文件: ServerInitializer.java
@Override
protected void initChannel(SocketChannel ch) throws Exception {
  ChannelPipeline pipeline = ch.pipeline();
  pipeline.addLast(new LoggingHandler());
  pipeline.addLast(HTTP_HANDLER_NAME, new HttpResponseEncoderAggregate());
  pipeline.addLast(new HttpRequestDecoder());
  pipeline.addLast(new HttpObjectAggregator(MAX_POST));
  pipeline.addLast(blockingExecutorGroup, new AccessoryHandler(homekit));
  allChannels.add(ch);
}
 
源代码14 项目: Okra   文件: Client.java
public static void main(String[] args) {

        TcpProtocolClient client = new TcpProtocolClient("127.0.0.1", 9008) {
            @Override
            protected ChannelHandler newChannelInitializer() {
                return new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline cp = ch.pipeline();
                        cp.addLast("frame", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 2, 0, 2));
                        cp.addLast("prepender", FRAME_PREPENDER);
                        cp.addLast("decoder", GPB_DECODER_HANDLER);
                        cp.addLast("encoder", GPB_ENCODER_HANDLER);
                        // handler
                        cp.addLast("handler", new SimpleChannelInboundHandler<Response>() {

                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, Response msg) throws Exception {

                                System.out.println("msg:" + msg.getId() + ", ");

                            }
                        });
//                cp.addLast("handler", new ServerHandler());
                    }
                };
            }
        };
        client.start();

        AtomicInteger ID = new AtomicInteger(0);
        Channel client1 = client.client();
        client1.writeAndFlush(Request.newBuilder()
                .setId(ID.incrementAndGet())
                .setApi(1)
                .build());
    }
 
源代码15 项目: logstash-input-beats   文件: Server.java
public void initChannel(SocketChannel socket){
    ChannelPipeline pipeline = socket.pipeline();

    if (isSslEnabled()) {
        pipeline.addLast(SSL_HANDLER, sslHandlerProvider.sslHandlerForChannel(socket));
    }
    pipeline.addLast(idleExecutorGroup, IDLESTATE_HANDLER,
                     new IdleStateHandler(localClientInactivityTimeoutSeconds, IDLESTATE_WRITER_IDLE_TIME_SECONDS, localClientInactivityTimeoutSeconds));
    pipeline.addLast(BEATS_ACKER, new AckEncoder());
    pipeline.addLast(CONNECTION_HANDLER, new ConnectionHandler());
    pipeline.addLast(beatsHandlerExecutorGroup, new BeatsParser(), new BeatsHandler(localMessageListener));
}
 
源代码16 项目: Raincat   文件: NettyClientHandlerInitializer.java
@Override
protected void initChannel(final SocketChannel socketChannel) {
    final ChannelPipeline pipeline = socketChannel.pipeline();
    NettyPipelineInit.serializePipeline(serializeProtocolEnum, pipeline);
    pipeline.addLast("timeout", new IdleStateHandler(txConfig.getHeartTime(), txConfig.getHeartTime(), txConfig.getHeartTime(), TimeUnit.SECONDS));
    pipeline.addLast(nettyClientMessageHandler);
}
 
源代码17 项目: JgFramework   文件: AuthConnector.java
@Override
public void initChannel(SocketChannel ch) {
    CorsConfig corsConfig = CorsConfig.withAnyOrigin().build();
    ChannelPipeline p = ch.pipeline();
    p.addLast(new HttpResponseEncoder());
    p.addLast(new HttpRequestDecoder());
    p.addLast(new HttpObjectAggregator(65536));
    p.addLast(new ChunkedWriteHandler());
    p.addLast(new CorsHandler(corsConfig));
    p.addLast(new HttpHandler());
}
 
源代码18 项目: syncer   文件: ExportServer.java
public static void init(String[] args) throws Exception {
  Map<String, BiConsumer<ChannelHandlerContext, HttpRequest>> mapping = new HashMap<>();
  mapping.put(HEALTH, (channelHandlerContext, request) -> {
    ExportResult result = SyncerHealth.export();
    String json = result.getJson();
    Health.HealthStatus overall = result.getOverall();

    FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1,
        overall == Health.HealthStatus.GREEN ? HttpResponseStatus.OK : HttpResponseStatus.INTERNAL_SERVER_ERROR,
        Unpooled.wrappedBuffer(json.getBytes()));
    response.headers().set(CONTENT_TYPE, TEXT_PLAIN);
    response.headers().set(CONTENT_LENGTH, response.content().readableBytes());
    channelHandlerContext.write(response);
  });

  ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) {
      ChannelPipeline p = ch.pipeline();
      p.addLast(new LoggingHandler(LogLevel.INFO));
      p.addLast(new HttpServerCodec());
      // add body support if we need in future
      // p.addLast(new HttpObjectAggregator(Short.MAX_VALUE));
      // add dispatch handler
      p.addLast(new DispatchHandler(mapping));
    }
  };

  // choose port logic
  int port = PORT;
  HashMap<String, String> kvMap = ArgUtil.toMap(args);
  String cmdPort = kvMap.get("port");
  if (cmdPort != null) {
    port = Integer.parseUnsignedInt(cmdPort);
  }

  NettyServer.startAndSync(initializer, port);
}
 
源代码19 项目: getty   文件: GimServerInitializer.java
@Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();


        // ----配置Protobuf处理器----
        // 用于decode前解决半包和粘包问题(利用包头中的包含数组长度来识别半包粘包)
        // pipeline.addLast(new ProtobufVarint32FrameDecoder());

        //配置Protobuf解码处理器,消息接收到了就会自动解码,ProtobufDecoder是netty自带的,Message是自己定义的Protobuf类
        // pipeline.addLast(new ProtobufDecoder(MessageClass.Message.getDefaultInstance()));

        // 用于在序列化的字节数组前加上一个简单的包头,只包含序列化的字节长度。
        // pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
        // 配置Protobuf编码器,发送的消息会先经过编码
        // pipeline.addLast(new ProtobufEncoder());
        // ----Protobuf处理器END----


//        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));
//        pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
//        pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));


//        pipeline.addLast(new SimpleChannelInboundHandler<MessageClass.Message>() {
//            @Override
//            public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//                Channel incoming = ctx.channel();
//                System.out.println("[Client] - " + incoming.remoteAddress() + " 连接过来");
//                //incoming.writeAndFlush("123\r\n456\r789\nabcde\r\n");
//            }
//            @Override
//            protected void channelRead0(ChannelHandlerContext ctx, MessageClass.Message msg) throws Exception {
//                //System.out.println("收到消息:" + msg.getId());
//            }
//        });


        pipeline.addLast("decoder", MqttEncoder.INSTANCE);
        pipeline.addLast("encoder", new MqttDecoder());
        pipeline.addLast(new SimpleChannelInboundHandler<MqttMessage>() {

            @Override
            protected void channelRead0(ChannelHandlerContext channelHandlerContext, MqttMessage mqttMessage) throws Exception {
                System.out.println("mqtt消息:" + mqttMessage.toString());

                switch (mqttMessage.fixedHeader().messageType()) {
                    case PUBLISH:
                        MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
                        ByteBuf payload = mqttPublishMessage.payload();
                        byte[] bytes = ByteBufUtil.getBytes(payload);
                        System.out.println("payload:" + new String(bytes));
                        break;
                    default:
                        break;
                }

            }
        });

    }
 
源代码20 项目: netty-restful-server   文件: ServerInitializer.java
@Override
public void initChannel(SocketChannel ch) {
    ChannelPipeline p = ch.pipeline();

    // HttpServerCodec is a combination of HttpRequestDecoder and HttpResponseEncoder
    p.addLast(new HttpServerCodec());

    // add gizp compressor for http response content
    p.addLast(new HttpContentCompressor());

    p.addLast(new HttpObjectAggregator(1048576));

    p.addLast(new ChunkedWriteHandler());

    p.addLast(new ServerHandler());
}