下面列出了io.netty.channel.socket.SocketChannel#pipeline ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpRequestDecoder());
pipeline.addLast(new HttpResponseEncoder());
// Remove the following line if you don't want automatic content compression.
pipeline.addLast(new HttpContentCompressor());
pipeline.addLast(new HttpUploadServerHandler());
}
private ChannelInitializer<SocketChannel> newChannelInitializer(final NegotiateConfig config,
final ExchangeChannelGroup channelGroup, final EventExecutorGroup executorGroup) {
return new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
ch.attr(ChannelAttrKeys.maxIdleTimeout).set(config.maxIdleTimeout());
ch.attr(ChannelAttrKeys.channelGroup).set(channelGroup);
ch.attr(ChannelAttrKeys.clientSide).set(true);
ch.attr(OneTime.awaitNegotiate).set(new CountDownLatch(1));
ch.attr(OneTime.channelConfig).set(config);
// TODO should increase ioRatio when every ChannelHandler bind to executorGroup?
pipeline.addLast(executorGroup,
RemotingEncoder.INSTANCE,
new RemotingDecoder(),
new IdleStateHandler(config.idleTimeout(), 0, 0),
HeartbeatChannelHandler.INSTANCE,
NegotiateChannelHandler.INSTANCE,
ConcreteRequestHandler.INSTANCE);
}
};
}
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline
// log
.addLast("logging", new LoggingHandler(LogLevel.INFO))
// 心跳检测
// .addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS))
// .addLast(new HeartBeatHandler())
// 链路管理
.addLast(new ChannelManagerHandler())
;
// 拓展
extHandler(socketChannel.pipeline());
pipeline.addLast(new MqHandler())
// 异常管理
.addLast(new ExceptionHandler())
;
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpResponseEncoder());
pipeline.addLast(new HttpRequestDecoder());
// Uncomment the following line if you don't want to handle HttpChunks.
//pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
//p.addLast(new HttpObjectAggregator(1048576));
// Remove the following line if you don't want automatic content compression.
//pipeline.addLast(new HttpContentCompressor());
// Uncomment the following line if you don't want to handle HttpContents.
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast("readTimeoutHandler", new ReadTimeoutHandler(READ_TIMEOUT));
pipeline.addLast("myHandler", new MyHandler());
pipeline.addLast("handler", new HttpServerHandler(listener));
}
@Override
protected void initChannel(final SocketChannel socketChannel) throws Exception {
final ChannelPipeline pipeline = socketChannel.pipeline();
final Optional<SslContext> sslCtx;
if (supportsSsl()) {
try {
sslCtx = Optional.of(cluster.createSSLContext());
} catch (Exception ex) {
throw new RuntimeException(ex);
}
} else {
sslCtx = Optional.empty();
}
if (sslCtx.isPresent()) {
pipeline.addLast(sslCtx.get().newHandler(socketChannel.alloc(), connection.getUri().getHost(), connection.getUri().getPort()));
}
configure(pipeline);
pipeline.addLast(PIPELINE_GREMLIN_SASL_HANDLER, new Handler.GremlinSaslAuthenticationHandler(cluster.authProperties()));
pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new Handler.GremlinResponseHandler(pending));
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc(), TelnetClient.HOST, TelnetClient.PORT));
}
// Add the text line codec combination first,
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// and then business logic.
pipeline.addLast(CLIENT_HANDLER);
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// Add SSL handler first to encrypt and decrypt everything.
// In this example, we use a bogus certificate in the server side
// and accept any invalid certificates in the client side.
// You will need something more complicated to identify both
// and server in the real world.
pipeline.addLast(createSslHandler(getClientSSLContext()));
// On top of the SSL handler, add the text line codec.
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// and then business logic.
pipeline.addLast(new SecureChatClientHandler());
}
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
BaseConfig baseConfig = ConfigFactory.getConfig(BaseConfig.class);
pipeline.addLast(new IdleStateChecker(baseConfig.readerIdleTime()));
pipeline.addLast(new PacketCodec());
pipeline.addLast(new HealthyChecker(client, baseConfig.pingInterval()));
pipeline.addLast(new ClientHandler());
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
final ChannelPipeline p = ch.pipeline();
initOutboundHandlers(p);
initInboundHandlers(p);
initIdleStateHandlers(p);
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("frameDecoder",new LengthFieldBasedFrameDecoder(1683226630, 2, 4, 0, 0));
pipeline.addLast("gameDecoder", new MusNetworkDecoder());
pipeline.addLast("gameEncoder", new MusNetworkEncoder());
pipeline.addLast("handler", new MusConnectionHandler(this.musServer));
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// WebSocket协议本身是基于HTTP协议的,所以要使用HTTP解编码器
pipeline.addLast(new HttpServerCodec());
// 以块的方式来写的处理器
pipeline.addLast(new ChunkedWriteHandler());
// Netty是基于分段请求的,HttpObjectAggregator的作用是将请求分段再聚合,参数是聚合字节的最大长度
pipeline.addLast(new HttpObjectAggregator(8192));
// 文本消息处理器
pipeline.addLast(eventExecutorGroup, new TextWebSocketFrameHandler());
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new SimpleChatClientHandler());
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LoggingHandler());
pipeline.addLast(HTTP_HANDLER_NAME, new HttpResponseEncoderAggregate());
pipeline.addLast(new HttpRequestDecoder());
pipeline.addLast(new HttpObjectAggregator(MAX_POST));
pipeline.addLast(blockingExecutorGroup, new AccessoryHandler(homekit));
allChannels.add(ch);
}
public static void main(String[] args) {
TcpProtocolClient client = new TcpProtocolClient("127.0.0.1", 9008) {
@Override
protected ChannelHandler newChannelInitializer() {
return new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline cp = ch.pipeline();
cp.addLast("frame", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 2, 0, 2));
cp.addLast("prepender", FRAME_PREPENDER);
cp.addLast("decoder", GPB_DECODER_HANDLER);
cp.addLast("encoder", GPB_ENCODER_HANDLER);
// handler
cp.addLast("handler", new SimpleChannelInboundHandler<Response>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Response msg) throws Exception {
System.out.println("msg:" + msg.getId() + ", ");
}
});
// cp.addLast("handler", new ServerHandler());
}
};
}
};
client.start();
AtomicInteger ID = new AtomicInteger(0);
Channel client1 = client.client();
client1.writeAndFlush(Request.newBuilder()
.setId(ID.incrementAndGet())
.setApi(1)
.build());
}
public void initChannel(SocketChannel socket){
ChannelPipeline pipeline = socket.pipeline();
if (isSslEnabled()) {
pipeline.addLast(SSL_HANDLER, sslHandlerProvider.sslHandlerForChannel(socket));
}
pipeline.addLast(idleExecutorGroup, IDLESTATE_HANDLER,
new IdleStateHandler(localClientInactivityTimeoutSeconds, IDLESTATE_WRITER_IDLE_TIME_SECONDS, localClientInactivityTimeoutSeconds));
pipeline.addLast(BEATS_ACKER, new AckEncoder());
pipeline.addLast(CONNECTION_HANDLER, new ConnectionHandler());
pipeline.addLast(beatsHandlerExecutorGroup, new BeatsParser(), new BeatsHandler(localMessageListener));
}
@Override
protected void initChannel(final SocketChannel socketChannel) {
final ChannelPipeline pipeline = socketChannel.pipeline();
NettyPipelineInit.serializePipeline(serializeProtocolEnum, pipeline);
pipeline.addLast("timeout", new IdleStateHandler(txConfig.getHeartTime(), txConfig.getHeartTime(), txConfig.getHeartTime(), TimeUnit.SECONDS));
pipeline.addLast(nettyClientMessageHandler);
}
@Override
public void initChannel(SocketChannel ch) {
CorsConfig corsConfig = CorsConfig.withAnyOrigin().build();
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpResponseEncoder());
p.addLast(new HttpRequestDecoder());
p.addLast(new HttpObjectAggregator(65536));
p.addLast(new ChunkedWriteHandler());
p.addLast(new CorsHandler(corsConfig));
p.addLast(new HttpHandler());
}
public static void init(String[] args) throws Exception {
Map<String, BiConsumer<ChannelHandlerContext, HttpRequest>> mapping = new HashMap<>();
mapping.put(HEALTH, (channelHandlerContext, request) -> {
ExportResult result = SyncerHealth.export();
String json = result.getJson();
Health.HealthStatus overall = result.getOverall();
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1,
overall == Health.HealthStatus.GREEN ? HttpResponseStatus.OK : HttpResponseStatus.INTERNAL_SERVER_ERROR,
Unpooled.wrappedBuffer(json.getBytes()));
response.headers().set(CONTENT_TYPE, TEXT_PLAIN);
response.headers().set(CONTENT_LENGTH, response.content().readableBytes());
channelHandlerContext.write(response);
});
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new HttpServerCodec());
// add body support if we need in future
// p.addLast(new HttpObjectAggregator(Short.MAX_VALUE));
// add dispatch handler
p.addLast(new DispatchHandler(mapping));
}
};
// choose port logic
int port = PORT;
HashMap<String, String> kvMap = ArgUtil.toMap(args);
String cmdPort = kvMap.get("port");
if (cmdPort != null) {
port = Integer.parseUnsignedInt(cmdPort);
}
NettyServer.startAndSync(initializer, port);
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// ----配置Protobuf处理器----
// 用于decode前解决半包和粘包问题(利用包头中的包含数组长度来识别半包粘包)
// pipeline.addLast(new ProtobufVarint32FrameDecoder());
//配置Protobuf解码处理器,消息接收到了就会自动解码,ProtobufDecoder是netty自带的,Message是自己定义的Protobuf类
// pipeline.addLast(new ProtobufDecoder(MessageClass.Message.getDefaultInstance()));
// 用于在序列化的字节数组前加上一个简单的包头,只包含序列化的字节长度。
// pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
// 配置Protobuf编码器,发送的消息会先经过编码
// pipeline.addLast(new ProtobufEncoder());
// ----Protobuf处理器END----
// pipeline.addLast("framer", new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));
// pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
// pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
// pipeline.addLast(new SimpleChannelInboundHandler<MessageClass.Message>() {
// @Override
// public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// Channel incoming = ctx.channel();
// System.out.println("[Client] - " + incoming.remoteAddress() + " 连接过来");
// //incoming.writeAndFlush("123\r\n456\r789\nabcde\r\n");
// }
// @Override
// protected void channelRead0(ChannelHandlerContext ctx, MessageClass.Message msg) throws Exception {
// //System.out.println("收到消息:" + msg.getId());
// }
// });
pipeline.addLast("decoder", MqttEncoder.INSTANCE);
pipeline.addLast("encoder", new MqttDecoder());
pipeline.addLast(new SimpleChannelInboundHandler<MqttMessage>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MqttMessage mqttMessage) throws Exception {
System.out.println("mqtt消息:" + mqttMessage.toString());
switch (mqttMessage.fixedHeader().messageType()) {
case PUBLISH:
MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
ByteBuf payload = mqttPublishMessage.payload();
byte[] bytes = ByteBufUtil.getBytes(payload);
System.out.println("payload:" + new String(bytes));
break;
default:
break;
}
}
});
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
// HttpServerCodec is a combination of HttpRequestDecoder and HttpResponseEncoder
p.addLast(new HttpServerCodec());
// add gizp compressor for http response content
p.addLast(new HttpContentCompressor());
p.addLast(new HttpObjectAggregator(1048576));
p.addLast(new ChunkedWriteHandler());
p.addLast(new ServerHandler());
}