下面列出了io.netty.channel.ChannelHandlerContext#fireChannelRead ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
final void onHttp2Frame(ChannelHandlerContext ctx, Http2Frame frame) {
if (frame instanceof Http2StreamFrame) {
Http2StreamFrame streamFrame = (Http2StreamFrame) frame;
onHttp2StreamFrame(((Http2MultiplexCodecStream) streamFrame.stream()).channel, streamFrame);
} else if (frame instanceof Http2GoAwayFrame) {
onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) frame);
// Allow other handlers to act on GOAWAY frame
ctx.fireChannelRead(frame);
} else if (frame instanceof Http2SettingsFrame) {
Http2Settings settings = ((Http2SettingsFrame) frame).settings();
if (settings.initialWindowSize() != null) {
initialOutboundStreamWindow = settings.initialWindowSize();
}
// Allow other handlers to act on SETTINGS frame
ctx.fireChannelRead(frame);
} else {
// Send any other frames down the pipeline
ctx.fireChannelRead(frame);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
while (true) {
if (byteBuf.readableBytes() < FrameSetting.FRAME_HEAD_LENGTH) {
return;
}
if (byteBuf.readByte() != FrameSetting.MAJOR_FRAME_HEAD_1
|| byteBuf.readByte() != FrameSetting.MAJOR_FRAME_HEAD_2) {
logger.warn("数据接收异常「帧头不匹配」");
return;
}
int groupId = byteBuf.readByte() & 0xFF;
int msgId = byteBuf.readByte() & 0xFF;
int deviceId = byteBuf.readByte() & 0xFF;
int backupMsg = byteBuf.readByte() & 0xFF;
int dataLength = byteBuf.readShort() & 0xFFFF;
FrameMajorHeader headMsg = new FrameMajorHeader(msgId, groupId, deviceId, dataLength, backupMsg);
ByteBuf subBuf = ctx.alloc().buffer(dataLength);
byteBuf.readBytes(subBuf, dataLength);
ctx.fireChannelRead(new FrameMajor(headMsg, subBuf));
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (config.isCorsSupportEnabled() && msg instanceof HttpRequest) {
request = (HttpRequest) msg;
if (isPreflightRequest(request)) {
try {
handlePreflight(ctx, request);
return;
} finally {
releaseRequest();
}
}
if (config.isShortCircuit() && !validateOrigin()) {
try {
forbidden(ctx, request);
return;
} finally {
releaseRequest();
}
}
}
ctx.fireChannelRead(msg);
}
@Override
public void channelRead(final ChannelHandlerContext handlerContext, final Object message) {
final Span span = handlerContext.channel().attr(CLIENT_ATTRIBUTE_KEY).get();
final boolean finishSpan = message instanceof HttpResponse;
Scope scope = null;
if (span != null && finishSpan) {
scope = GlobalTracer.get().activateSpan(span);
span.setTag(Tags.HTTP_STATUS, ((HttpResponse)message).status().code());
}
try {
handlerContext.fireChannelRead(message);
}
finally {
if (span != null && scope != null) {
scope.close();
span.finish();
}
}
}
@Override
public void channelRead(
final ChannelHandlerContext context,
final Object message) {
runIf(message, HttpRequest.class, httpRequest -> {
this.request = new Request(context, REMOTE, httpRequest);
this.requestStage = logbook.process(request);
});
runIf(message, HttpContent.class, request::buffer);
runIf(message, LastHttpContent.class, content ->
sequence.set(0, throwingRunnable(requestStage::write)));
context.fireChannelRead(message);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean freqResult = true;
try {
freqResult = processServiceFreqControl();
} catch (Throwable e) {
LOGGER.error(SoaCode.FreqControlError.toString(), e);
} finally {
if (freqResult) {
ctx.fireChannelRead(msg);
} else {
throw new SoaException(SoaCode.FreqLimited, "当前服务在一定时间内请求次数过多,被限流");
}
}
}
@Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
if(!(msg instanceof RpcMessage)){
channelHandlerContext.fireChannelRead(msg);
return;
}
RpcMessage message=(RpcMessage)msg;
if(null==message||null==message.getMessageHeader()){
channelHandlerContext.fireChannelRead(msg);
return;
}
if(message.getMessageHeader().getType()== Constants.MESSAGE_TYPE_HEARTBEAT_PONG){
logger.info("ClientHeartbeatHandler.channelRead0 ,pong data is:{}",message.getMessageBody());
}
else if(message.getMessageHeader().getType()== Constants.MESSAGE_TYPE_HEARTBEAT_PING){
this.sendPong(channelHandlerContext);
}
else {
channelHandlerContext.fireChannelRead(msg);
}
}
@Override
public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = internalBuffer();
int readable = buf.readableBytes();
if (readable > 0) {
ByteBuf bytes = buf.readBytes(readable);
buf.release();
ctx.fireChannelRead(bytes);
} else {
buf.release();
}
cumulation = null;
numReads = 0;
ctx.fireChannelReadComplete();
handlerRemoved0(ctx);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, OlapMessage.Command command) throws Exception {
if(command.getType()!=OlapMessage.Command.Type.CANCEL){
ctx.fireChannelRead(command);
return;
}
jobRegistry.clear(command.getUniqueName());
//no response is needed for cancellation
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
ctx.fireChannelRead(decode(ctx, (DatagramPacket) msg));
} finally {
ReferenceCountUtil.release(msg);
}
}
/**
* Sends a raw packet to the server.
*
* @param packet Raw packet to be sent
* @param currentThread If {@code true} executes immediately, {@code false} submits a task to EventLoop
*/
public void sendRawPacketToServer(ByteBuf packet, boolean currentThread) {
ByteBuf buf = packet.alloc().buffer();
try {
try {
Type.VAR_INT.writePrimitive(buf, PacketWrapper.PASSTHROUGH_ID);
} catch (Exception e) {
// Should not happen
Via.getPlatform().getLogger().warning("Type.VAR_INT.write thrown an exception: " + e);
}
buf.writeBytes(packet);
ChannelHandlerContext context = PipelineUtil
.getPreviousContext(Via.getManager().getInjector().getDecoderName(), channel.pipeline());
if (currentThread) {
if (context != null) {
context.fireChannelRead(buf);
} else {
channel.pipeline().fireChannelRead(buf);
}
} else {
try {
channel.eventLoop().submit(() -> {
if (context != null) {
context.fireChannelRead(buf);
} else {
channel.pipeline().fireChannelRead(buf);
}
});
} catch (Throwable t) {
// Couldn't schedule
buf.release();
throw t;
}
}
} finally {
packet.release();
}
}
/**
* 到达业务线程后需要注意msg被释放的问题
*/
@Override
public final void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception
{
if (msg == null)
{
return;
}
boolean release = false;
try {
@SuppressWarnings("unchecked")
M imsg = (M) msg;
Channel channel=ctx.channel();
JConnection connection = findConnection(channel);
if (connection != null) {
listener.messageArrived(connection, imsg);
release = true;
} else {
log.error(ctx.channel() + ":not found NettyConnection Created.");
ctx.fireChannelRead(msg);// 下一个handler继续处理
release = false;
}
} catch (Exception e) {
log.error(e.getMessage(),e);
if(!release)
{//如果出错且还没有被释放
ctx.fireChannelRead(msg);// 下一个handler继续处理
}
} finally {
if (release) {
ReferenceCountUtil.release(msg);
}
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
handleHttpRequest(ctx, (HttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
initHandlerWrapper();
handleWebSocketFrame(ctx, (WebSocketFrame) msg);
} else {
ReferenceCountUtil.retain(msg);
ctx.fireChannelRead(msg);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
InetSocketAddress sa = (InetSocketAddress) ctx.channel().localAddress();
TrafficCollector trafficCollector = TrafficCollector.getCollector(sa.getPort());
trafficCollector.incrementReadBytes(((ByteBuf) msg).readableBytes());
trafficCollector.incrementReadMsgs(1);
ctx.fireChannelRead(msg);
}
@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
if (propagateSettings) {
// Provide an interface for non-listeners to capture settings
ctx.fireChannelRead(settings);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Protocol msg) throws Exception {
if (!ChannelUtil.clientSide(ctx) && msg.request() && msg.heartbeat()) {
dealNegotiate(ctx, msg);
return;
}
// no sense to Protocol in fact
ReferenceCountUtil.retain(msg);
ctx.fireChannelRead(msg);
}
protected void handleSyslog ( final ChannelHandlerContext ctx, final SyslogRequest msg )
{
logger.debug ( "Process syslog command: {}", msg );
ctx.fireChannelRead ( msg.getData () );
ctx.writeAndFlush ( msg.replyOk () );
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, BinaryWebSocketFrame msg) throws Exception {
ctx.fireChannelRead(msg.content());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
lastReadTime = System.nanoTime();
ctx.fireChannelRead(msg);
}
private void handleHttpRequest(ChannelHandlerContext ctx, HttpRequest req) {
// Handle a bad request.
if (!req.decoderResult().isSuccess()) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST, ctx.alloc().buffer(0)));
return;
}
// Allow only GET methods.
if (!GET.equals(req.method())) {
// Let the rest of the pipeline handle this.
ctx.fireChannelRead(req);
return;
}
// Only handle the initial HTTP upgrade request
if (!(req.headers().contains("Connection", "upgrade", true) &&
req.headers().contains("Sec-WebSocket-Version"))) {
ctx.fireChannelRead(req);
return;
}
// Is this something we should try and handle?
Optional<Consumer<Message>> maybeHandler = factory.apply(
req.uri(),
msg -> {
ctx.channel().writeAndFlush(Require.nonNull("Message to send", msg));
});
if (!maybeHandler.isPresent()) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST, ctx.alloc().buffer(0)));
return;
}
// Handshake
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
getWebSocketLocation(req), null, false, Integer.MAX_VALUE);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
ChannelFuture future = handshaker.handshake(ctx.channel(), req);
future.addListener((ChannelFutureListener) channelFuture -> {
if (!future.isSuccess()) {
ctx.fireExceptionCaught(future.cause());
} else {
ctx.channel().attr(key).setIfAbsent(maybeHandler.get());
}
});
}
}