下面列出了 io.netty.handler.codec.DecoderResult # isFailure ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
Throwable getDecoderFailure(HttpObject httpObject) {
if (httpObject == null) {
return null;
}
DecoderResult decoderResult = httpObject.decoderResult();
if (decoderResult == null) {
return null;
}
if (!decoderResult.isFailure()) {
return null;
}
return decoderResult.cause();
}
private String getShortResult(DecoderResult result){
if (result.isSuccess())
return "S";
else if (result.isFinished())
return "F";
else if (result.isFailure())
return "X";
else
return "-";
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (FullHttpRequest.class.isAssignableFrom(msg.getClass())) {
FullHttpRequest req = FullHttpRequest.class.cast(msg);
DecoderResult result = req.decoderResult();
if (result.isFailure()) {
if (log.isWarnEnabled())
log.warn("http decoder failure", result.cause());
ReferenceCountUtil.release(msg);
ctx.writeAndFlush(HttpResponses.badRequest());
ctx.channel().close();
return;
}
if (HttpUtil.is100ContinueExpected(req))
ctx.writeAndFlush(new DefaultFullHttpResponse(req.protocolVersion(), CONTINUE));
FullHttpRequest safeReq = new DefaultFullHttpRequest(req.protocolVersion(),
req.method(),
req.uri(),
// Buffers.safeByteBuf(req.content(), ctx.alloc()),
req.content(),
req.headers(),
req.trailingHeaders());
channelRead(ctx, safeReq);
} else
ctx.fireChannelRead(msg);
}
@Override
@SuppressWarnings("FutureReturnValueIgnored")
final public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg == null || msg == Unpooled.EMPTY_BUFFER || msg instanceof EmptyByteBuf) {
return;
}
try {
ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel());
if (ops != null) {
ops.onInboundNext(ctx, msg);
}
else {
if (log.isDebugEnabled()) {
if (msg instanceof DecoderResultProvider) {
DecoderResult decoderResult = ((DecoderResultProvider) msg).decoderResult();
if (decoderResult.isFailure()) {
log.debug(format(ctx.channel(), "Decoding failed: " + msg + " : "),
decoderResult.cause());
}
}
log.debug(format(ctx.channel(), "No ChannelOperation attached. Dropping: {}"),
toPrettyHexDump(msg));
}
ReferenceCountUtil.release(msg);
}
}
catch (Throwable err) {
safeRelease(msg);
log.error(format(ctx.channel(), "Error was received while reading the incoming data." +
" The connection will be closed."), err);
//"FutureReturnValueIgnored" this is deliberate
ctx.close();
exceptionCaught(ctx, err);
}
}
@Override
public void channelRead0(final ChannelHandlerContext ctx, final HttpObject msg) {
if (msg instanceof FullHttpResponse) {
final FullHttpResponse response = (FullHttpResponse) msg;
httpReceive.setStatusCode(response.status().code())
.setStatusText(response.status().codeAsText().toString());
HttpHeaders headers = response.headers();
if (httpSend.getNeedReceiveHeaders() && !headers.isEmpty()) {
final Map<String, String> responseHeaderMap = Maps.newHashMapWithExpectedSize(headers.size());
headers.forEach(one -> {
responseHeaderMap.put(one.getKey(), one.getValue());
});
httpReceive.setResponseHeader(responseHeaderMap);
}
if (HttpUtil.isTransferEncodingChunked(response)) {
if (log.isDebugEnabled()) {
log.debug("#HTTP 内容开始{");
}
} else {
if (log.isDebugEnabled()) {
log.debug("#HTTP 内容开始{");
}
}
final String responseBody = response.content().toString(httpSend.getCharset());
httpReceive.setResponseBody(responseBody);
if (log.isDebugEnabled()) {
log.debug(responseBody);
}
if (log.isDebugEnabled()) {
log.debug("}EOF#");
}
final DecoderResult decoderResult = response.decoderResult();
if (decoderResult.isFailure()) {
Throwable cause = decoderResult.cause();
if (log.isErrorEnabled()) {
log.error(ToolFormat.toException(cause), cause);
}
httpReceive.setHaveError(true)
.setErrMsg(cause.getMessage())
.setThrowable(cause);
} else if (response.status().code() != 200) {
httpReceive.setHaveError(true)
.setErrMsg("本次请求响应码不是200,是" + response.status().code());
}
httpReceive.setIsDone(true);
ctx.close();
}
}
/**
* Handle the MQTT message received from the remote MQTT server
*
* @param msg Incoming Packet
*/
private void handleMessage(ChannelHandlerContext chctx, Object msg) {
// handling directly native Netty MQTT messages, some of them are translated
// to the related Vert.x ones for polyglotization
if (msg instanceof io.netty.handler.codec.mqtt.MqttMessage) {
io.netty.handler.codec.mqtt.MqttMessage mqttMessage = (io.netty.handler.codec.mqtt.MqttMessage) msg;
DecoderResult result = mqttMessage.decoderResult();
if (result.isFailure()) {
chctx.pipeline().fireExceptionCaught(result.cause());
return;
}
if (!result.isFinished()) {
chctx.pipeline().fireExceptionCaught(new Exception("Unfinished message"));
return;
}
log.debug(String.format("Incoming packet %s", msg));
switch (mqttMessage.fixedHeader().messageType()) {
case CONNACK:
io.netty.handler.codec.mqtt.MqttConnAckMessage connack = (io.netty.handler.codec.mqtt.MqttConnAckMessage) mqttMessage;
MqttConnAckMessage mqttConnAckMessage = MqttConnAckMessage.create(
connack.variableHeader().connectReturnCode(),
connack.variableHeader().isSessionPresent());
handleConnack(mqttConnAckMessage);
break;
case PUBLISH:
io.netty.handler.codec.mqtt.MqttPublishMessage publish = (io.netty.handler.codec.mqtt.MqttPublishMessage) mqttMessage;
ByteBuf newBuf = VertxHandler.safeBuffer(publish.payload(), chctx.alloc());
MqttPublishMessage mqttPublishMessage = MqttPublishMessage.create(
publish.variableHeader().packetId(),
publish.fixedHeader().qosLevel(),
publish.fixedHeader().isDup(),
publish.fixedHeader().isRetain(),
publish.variableHeader().topicName(),
newBuf);
handlePublish(mqttPublishMessage);
break;
case PUBACK:
handlePuback(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId());
break;
case PUBREC:
handlePubrec(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId());
break;
case PUBREL:
handlePubrel(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId());
break;
case PUBCOMP:
handlePubcomp(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId());
break;
case SUBACK:
io.netty.handler.codec.mqtt.MqttSubAckMessage unsuback = (io.netty.handler.codec.mqtt.MqttSubAckMessage) mqttMessage;
MqttSubAckMessage mqttSubAckMessage = MqttSubAckMessage.create(
unsuback.variableHeader().messageId(),
unsuback.payload().grantedQoSLevels());
handleSuback(mqttSubAckMessage);
break;
case UNSUBACK:
handleUnsuback(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId());
break;
case PINGRESP:
handlePingresp();
break;
default:
chctx.pipeline().fireExceptionCaught(new Exception("Wrong message type " + msg.getClass().getName()));
break;
}
} else {
chctx.pipeline().fireExceptionCaught(new Exception("Wrong message type"));
}
}