下面列出了怎么用 io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame 的API类实例代码及写法,或者点击链接到github查看源代码。
private void processFrame(WebSocketFrame msg) throws IOException {
if (msg instanceof CloseWebSocketFrame) {
onCloseFrame((CloseWebSocketFrame) msg);
} else if (msg instanceof PongWebSocketFrame) {
onPongMessage((PongWebSocketFrame) msg);
} else if (msg instanceof PingWebSocketFrame) {
byte[] data = new byte[msg.content().readableBytes()];
msg.content().readBytes(data);
session.getAsyncRemote().sendPong(ByteBuffer.wrap(data));
} else if (msg instanceof TextWebSocketFrame) {
onText(msg, ((TextWebSocketFrame) msg).text());
} else if (msg instanceof BinaryWebSocketFrame) {
onBinary(msg);
} else if (msg instanceof ContinuationWebSocketFrame) {
if (expectedContinuation == FrameType.BYTE) {
onBinary(msg);
} else if (expectedContinuation == FrameType.TEXT) {
onText(msg, ((ContinuationWebSocketFrame) msg).text());
}
}
}
@Override
public void sendText(final String partialMessage, final boolean isLast) throws IOException {
if (partialMessage == null) {
throw JsrWebSocketMessages.MESSAGES.messageInNull();
}
if (inBinaryFragment) {
throw JsrWebSocketMessages.MESSAGES.cannotSendInMiddleOfFragmentedMessage();
}
boolean fragmented = inTextFragment;
inTextFragment = !isLast;
try {
if (fragmented) {
undertowSession.getChannel().writeAndFlush(new ContinuationWebSocketFrame(isLast, 0, partialMessage)).get();
} else {
undertowSession.getChannel().writeAndFlush(new TextWebSocketFrame(isLast, 0, partialMessage)).get();
}
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
}
}
@Override
public void sendBinary(final ByteBuffer partialByte, final boolean isLast) throws IOException {
if (partialByte == null) {
throw JsrWebSocketMessages.MESSAGES.messageInNull();
}
if (inTextFragment) {
throw JsrWebSocketMessages.MESSAGES.cannotSendInMiddleOfFragmentedMessage();
}
boolean fragmented = inBinaryFragment;
inBinaryFragment = !isLast;
try {
if (fragmented) {
undertowSession.getChannel().writeAndFlush(new ContinuationWebSocketFrame(isLast, 0, Unpooled.copiedBuffer(partialByte))).get();
} else {
undertowSession.getChannel().writeAndFlush(new BinaryWebSocketFrame(isLast, 0, Unpooled.copiedBuffer(partialByte))).get();
}
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
}
partialByte.clear();
}
private void sendMessage(String json) {
if (!isConnected()) {
throw new IllegalStateException("Cannot send message because not connected");
}
int buffersize = deviceModel.getBuffersize();
int startPos = 0;
TextWebSocketFrame respFrame = new TextWebSocketFrame(
startPos + buffersize >= json.length(),
0,
json.substring(startPos, Math.min(json.length(), (startPos + buffersize)))
);
channel.writeAndFlush(respFrame);
startPos += buffersize;
while (startPos < json.length()) {
ContinuationWebSocketFrame contFrame = new ContinuationWebSocketFrame(
startPos + buffersize >= json.length(),
0,
json.substring(startPos, Math.min(json.length(), (startPos + buffersize)))
);
startPos += buffersize;
channel.writeAndFlush(contFrame);
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (logger.isLoggable(Level.FINE)) {
logger.fine(String.format(
"Channel %s received %s", ctx.channel().hashCode(), StringUtil.simpleClassName(frame)));
}
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
} else if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.content()));
} else if (frame instanceof TextWebSocketFrame ||
frame instanceof BinaryWebSocketFrame ||
frame instanceof ContinuationWebSocketFrame) {
ctx.write(frame);
} else if (frame instanceof PongWebSocketFrame) {
frame.release();
// Ignore
} else {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
.getName()));
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (logger.isLoggable(Level.FINE)) {
logger.fine(String.format(
"Channel %s received %s", ctx.channel().hashCode(), StringUtil.simpleClassName(frame)));
}
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
} else if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.content()), ctx.voidPromise());
} else if (frame instanceof TextWebSocketFrame) {
ctx.write(frame, ctx.voidPromise());
} else if (frame instanceof BinaryWebSocketFrame) {
ctx.write(frame, ctx.voidPromise());
} else if (frame instanceof ContinuationWebSocketFrame) {
ctx.write(frame, ctx.voidPromise());
} else if (frame instanceof PongWebSocketFrame) {
frame.release();
// Ignore
} else {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
.getName()));
}
}
private void writeContinuationFrame(ChannelHandlerContext ctx, ByteBuf byteBuf, ChannelPromise promise) {
int count = byteBuf.readableBytes();
int length = Math.min(count, maxFramePayloadLength);
boolean finalFragment = length == count;
ByteBuf fragment = Unpooled.buffer(length);
byteBuf.readBytes(fragment, length);
ctx.writeAndFlush(new BinaryWebSocketFrame(finalFragment, 0, fragment), promise);
while ((count = byteBuf.readableBytes()) > 0) {
length = Math.min(count, maxFramePayloadLength);
finalFragment = length == count;
fragment = Unpooled.buffer(length);
byteBuf.readBytes(fragment, length);
ctx.writeAndFlush(new ContinuationWebSocketFrame(finalFragment, 0, fragment), promise);
}
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
LOG.trace("NettyServerHandler: Channel write: {}", msg);
if (isWebSocketServer() && msg instanceof ByteBuf) {
if (isFragmentWrites()) {
ByteBuf orig = (ByteBuf) msg;
int origIndex = orig.readerIndex();
int split = orig.readableBytes()/2;
ByteBuf part1 = orig.copy(origIndex, split);
LOG.trace("NettyServerHandler: Part1: {}", part1);
orig.readerIndex(origIndex + split);
LOG.trace("NettyServerHandler: Part2: {}", orig);
BinaryWebSocketFrame frame1 = new BinaryWebSocketFrame(false, 0, part1);
ctx.writeAndFlush(frame1);
ContinuationWebSocketFrame frame2 = new ContinuationWebSocketFrame(true, 0, orig);
ctx.write(frame2, promise);
} else {
BinaryWebSocketFrame frame = new BinaryWebSocketFrame((ByteBuf) msg);
ctx.write(frame, promise);
}
} else {
ctx.write(msg, promise);
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
// Pass on to the rest of the channel
ctx.fireChannelRead(frame);
} else if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.content()));
} else if (frame instanceof ContinuationWebSocketFrame) {
ctx.write(frame);
} else if (frame instanceof PongWebSocketFrame) {
frame.release();
} else if (frame instanceof BinaryWebSocketFrame || frame instanceof TextWebSocketFrame) {
// Allow the rest of the pipeline to deal with this.
ctx.fireChannelRead(frame);
} else {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
.getName()));
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// Check for closing frame
if (frame instanceof CloseWebSocketFrame) {
handleMessageCompleted(ctx, jsonBuffer.toString());
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
SessionRegistry.destroySession(session);
return;
}
if (frame instanceof PingWebSocketFrame) {
if (logger.isDebugEnabled())
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (frame instanceof TextWebSocketFrame) {
jsonBuffer = new StringBuffer();
jsonBuffer.append(((TextWebSocketFrame)frame).text());
}
else if (frame instanceof ContinuationWebSocketFrame) {
if (jsonBuffer != null) {
jsonBuffer.append(((ContinuationWebSocketFrame)frame).text());
}
else {
comlog.warn("Continuation frame received without initial frame.");
}
}
else {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
.getName()));
}
// Check if Text or Continuation Frame is final fragment and handle if needed.
if (frame.isFinalFragment()) {
handleMessageCompleted(ctx, jsonBuffer.toString());
jsonBuffer = null;
}
}
@Override
protected void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (logger.isTraceEnabled()) {
logger.trace("Received incoming frame [{}]", frame.getClass().getName());
}
// Check for closing frame
if (frame instanceof CloseWebSocketFrame) {
if (jsonBuffer != null) {
handleMessageCompleted(ctx, jsonBuffer.toString());
}
terminateSocketSessionWithExtremePrejudice(ctx.channel());
closeWebSocket(ctx, (CloseWebSocketFrame) frame.retain());
metrics.incSessionDestroyedCounter();
return;
}
if (frame instanceof TextWebSocketFrame) {
metrics.incFramesReceivedCounter();
jsonBuffer = new StringBuilder();
jsonBuffer.append(((TextWebSocketFrame)frame).text());
} else if (frame instanceof ContinuationWebSocketFrame) {
metrics.incFramesReceivedCounter();
if (jsonBuffer != null) {
jsonBuffer.append(((ContinuationWebSocketFrame)frame).text());
} else {
comlog.warn("Continuation frame received without initial frame.");
}
} else {
super.handleWebSocketFrame(ctx, frame);
return;
}
// Check if Text or Continuation Frame is final fragment and handle if needed.
if (frame.isFinalFragment()) {
handleMessageCompleted(ctx, jsonBuffer.toString());
jsonBuffer = null;
}
}
@Override
public boolean acceptInboundMessage(Object msg) throws Exception {
return ((msg instanceof TextWebSocketFrame ||
msg instanceof BinaryWebSocketFrame) &&
(((WebSocketFrame) msg).rsv() & WebSocketExtension.RSV1) > 0) ||
(msg instanceof ContinuationWebSocketFrame && compressing);
}
@Override
public boolean acceptInboundMessage(Object msg) throws Exception {
return (msg instanceof TextWebSocketFrame ||
msg instanceof BinaryWebSocketFrame ||
msg instanceof ContinuationWebSocketFrame) &&
(((WebSocketFrame) msg).rsv() & WebSocketExtension.RSV1) > 0;
}
@Override
public boolean acceptOutboundMessage(Object msg) throws Exception {
return ((msg instanceof TextWebSocketFrame ||
msg instanceof BinaryWebSocketFrame) &&
(((WebSocketFrame) msg).rsv() & WebSocketExtension.RSV1) == 0) ||
(msg instanceof ContinuationWebSocketFrame && compressing);
}
@Override
public boolean acceptOutboundMessage(Object msg) throws Exception {
return (msg instanceof TextWebSocketFrame ||
msg instanceof BinaryWebSocketFrame ||
msg instanceof ContinuationWebSocketFrame) &&
((WebSocketFrame) msg).content().readableBytes() > 0 &&
(((WebSocketFrame) msg).rsv() & WebSocketExtension.RSV1) == 0;
}
protected void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
logger.debug("Received incoming frame [{}]", frame.getClass().getName());
// Check for closing frame
if (frame instanceof CloseWebSocketFrame) {
if (frameBuffer != null) {
handleMessageCompleted(ctx, frameBuffer.toString());
}
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (frame instanceof PongWebSocketFrame) {
logger.info("Pong frame received");
return;
}
if (frame instanceof TextWebSocketFrame) {
frameBuffer = new StringBuilder();
frameBuffer.append(((TextWebSocketFrame)frame).text());
} else if (frame instanceof ContinuationWebSocketFrame) {
if (frameBuffer != null) {
frameBuffer.append(((ContinuationWebSocketFrame)frame).text());
} else {
logger.warn("Continuation frame received without initial frame.");
}
} else {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
}
// Check if Text or Continuation Frame is final fragment and handle if needed.
if (frame.isFinalFragment()) {
handleMessageCompleted(ctx, frameBuffer.toString());
frameBuffer = null;
}
}
private boolean handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// Check for closing frame
if (frame instanceof CloseWebSocketFrame) {
this.handshaker.close(ctx.channel(), ((CloseWebSocketFrame) frame).retain());
return false;
} else if (frame instanceof PingWebSocketFrame) {
ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
return false;
} else if (!(frame instanceof TextWebSocketFrame) && !(frame instanceof BinaryWebSocketFrame) && !(frame instanceof ContinuationWebSocketFrame)) {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
}
return true;
}
@Test
public void testRead0HandleContinuationFrame() throws Exception {
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
Object msg = new ContinuationWebSocketFrame();
spy.channelRead0(ctx, msg); //test
verify(spy).channelRead0(ctx, msg);
verify(ctx).fireChannelRead(any(ByteBuf.class));
verifyNoMoreInteractions(spy, ctx);
}
@Override
protected void channelRead0(final ChannelHandlerContext ctx, final ContinuationWebSocketFrame msg) throws Exception {
ctx.fireChannelRead(msg.content().retain());
}
@Override
protected void channelRead0(@Nullable ChannelHandlerContext ctx, @Nullable Object msg) throws Exception {
if (ctx == null || msg == null) {
return;
}
lastPlatformMsg = System.nanoTime();
Channel ch = ctx.channel();
if (!handshaker.isHandshakeComplete()) {
handshaker.finishHandshake(ch, (FullHttpResponse) msg);
connected = true;
handshakeFuture.setSuccess();
return;
}
if (msg instanceof FullHttpResponse) {
log.warn("unxpected full http response: {}", msg);
ctx.close();
return;
}
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof BinaryWebSocketFrame) {
websocketFrameBuf.clear();
websocketFrameBuf.writeBytes(frame.content());
} else if (frame instanceof ContinuationWebSocketFrame){
if (websocketFrameBuf.isReadable()) {
websocketFrameBuf.writeBytes(frame.content());
} else {
log.warn("continuation frame received without initial frame.");
ctx.close();
}
} else if (frame instanceof PingWebSocketFrame) {
log.trace("received websocket ping request from platform");
ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
lastHubMsg = System.nanoTime();
return;
} else if (frame instanceof PongWebSocketFrame) {
log.trace("received websocket pong response from platform");
return;
} else if (frame instanceof CloseWebSocketFrame) {
log.warn("received websocket close request");
ctx.close();
return;
}
if (frame.isFinalFragment()) {
decodeHubFrame(ctx, websocketFrameBuf);
}
}
@Test
public void testFramementedFrame() {
EmbeddedChannel encoderChannel = new EmbeddedChannel(new PerMessageDeflateEncoder(9, 15, false));
EmbeddedChannel decoderChannel = new EmbeddedChannel(
ZlibCodecFactory.newZlibDecoder(ZlibWrapper.NONE));
// initialize
byte[] payload1 = new byte[100];
random.nextBytes(payload1);
byte[] payload2 = new byte[100];
random.nextBytes(payload2);
byte[] payload3 = new byte[100];
random.nextBytes(payload3);
BinaryWebSocketFrame frame1 = new BinaryWebSocketFrame(false,
WebSocketExtension.RSV3, Unpooled.wrappedBuffer(payload1));
ContinuationWebSocketFrame frame2 = new ContinuationWebSocketFrame(false,
WebSocketExtension.RSV3, Unpooled.wrappedBuffer(payload2));
ContinuationWebSocketFrame frame3 = new ContinuationWebSocketFrame(true,
WebSocketExtension.RSV3, Unpooled.wrappedBuffer(payload3));
// execute
encoderChannel.writeOutbound(frame1);
encoderChannel.writeOutbound(frame2);
encoderChannel.writeOutbound(frame3);
BinaryWebSocketFrame compressedFrame1 = encoderChannel.readOutbound();
ContinuationWebSocketFrame compressedFrame2 = encoderChannel.readOutbound();
ContinuationWebSocketFrame compressedFrame3 = encoderChannel.readOutbound();
// test
assertNotNull(compressedFrame1);
assertNotNull(compressedFrame2);
assertNotNull(compressedFrame3);
assertEquals(WebSocketExtension.RSV1 | WebSocketExtension.RSV3, compressedFrame1.rsv());
assertEquals(WebSocketExtension.RSV3, compressedFrame2.rsv());
assertEquals(WebSocketExtension.RSV3, compressedFrame3.rsv());
assertFalse(compressedFrame1.isFinalFragment());
assertFalse(compressedFrame2.isFinalFragment());
assertTrue(compressedFrame3.isFinalFragment());
decoderChannel.writeInbound(compressedFrame1.content());
ByteBuf uncompressedPayload1 = decoderChannel.readInbound();
byte[] finalPayload1 = new byte[100];
uncompressedPayload1.readBytes(finalPayload1);
assertTrue(Arrays.equals(finalPayload1, payload1));
uncompressedPayload1.release();
decoderChannel.writeInbound(compressedFrame2.content());
ByteBuf uncompressedPayload2 = decoderChannel.readInbound();
byte[] finalPayload2 = new byte[100];
uncompressedPayload2.readBytes(finalPayload2);
assertTrue(Arrays.equals(finalPayload2, payload2));
uncompressedPayload2.release();
decoderChannel.writeInbound(compressedFrame3.content());
decoderChannel.writeInbound(DeflateDecoder.FRAME_TAIL);
ByteBuf uncompressedPayload3 = decoderChannel.readInbound();
byte[] finalPayload3 = new byte[100];
uncompressedPayload3.readBytes(finalPayload3);
assertTrue(Arrays.equals(finalPayload3, payload3));
uncompressedPayload3.release();
}
@Test
public void testFramementedFrame() {
EmbeddedChannel encoderChannel = new EmbeddedChannel(
ZlibCodecFactory.newZlibEncoder(ZlibWrapper.NONE, 9, 15, 8));
EmbeddedChannel decoderChannel = new EmbeddedChannel(new PerMessageDeflateDecoder(false));
// initialize
byte[] payload = new byte[300];
random.nextBytes(payload);
encoderChannel.writeOutbound(Unpooled.wrappedBuffer(payload));
ByteBuf compressedPayload = encoderChannel.readOutbound();
compressedPayload = compressedPayload.slice(0, compressedPayload.readableBytes() - 4);
int oneThird = compressedPayload.readableBytes() / 3;
BinaryWebSocketFrame compressedFrame1 = new BinaryWebSocketFrame(false,
WebSocketExtension.RSV1 | WebSocketExtension.RSV3,
compressedPayload.slice(0, oneThird));
ContinuationWebSocketFrame compressedFrame2 = new ContinuationWebSocketFrame(false,
WebSocketExtension.RSV3, compressedPayload.slice(oneThird, oneThird));
ContinuationWebSocketFrame compressedFrame3 = new ContinuationWebSocketFrame(true,
WebSocketExtension.RSV3, compressedPayload.slice(oneThird * 2,
compressedPayload.readableBytes() - oneThird * 2));
// execute
decoderChannel.writeInbound(compressedFrame1.retain());
decoderChannel.writeInbound(compressedFrame2.retain());
decoderChannel.writeInbound(compressedFrame3);
BinaryWebSocketFrame uncompressedFrame1 = decoderChannel.readInbound();
ContinuationWebSocketFrame uncompressedFrame2 = decoderChannel.readInbound();
ContinuationWebSocketFrame uncompressedFrame3 = decoderChannel.readInbound();
// test
assertNotNull(uncompressedFrame1);
assertNotNull(uncompressedFrame2);
assertNotNull(uncompressedFrame3);
assertEquals(WebSocketExtension.RSV3, uncompressedFrame1.rsv());
assertEquals(WebSocketExtension.RSV3, uncompressedFrame2.rsv());
assertEquals(WebSocketExtension.RSV3, uncompressedFrame3.rsv());
ByteBuf finalPayloadWrapped = Unpooled.wrappedBuffer(uncompressedFrame1.content(),
uncompressedFrame2.content(), uncompressedFrame3.content());
assertEquals(300, finalPayloadWrapped.readableBytes());
byte[] finalPayload = new byte[300];
finalPayloadWrapped.readBytes(finalPayload);
assertTrue(Arrays.equals(finalPayload, payload));
finalPayloadWrapped.release();
}
@Test
public void testFramementedFrame() {
EmbeddedChannel encoderChannel = new EmbeddedChannel(new PerFrameDeflateEncoder(9, 15, false));
EmbeddedChannel decoderChannel = new EmbeddedChannel(
ZlibCodecFactory.newZlibDecoder(ZlibWrapper.NONE));
// initialize
byte[] payload1 = new byte[100];
random.nextBytes(payload1);
byte[] payload2 = new byte[100];
random.nextBytes(payload2);
byte[] payload3 = new byte[100];
random.nextBytes(payload3);
BinaryWebSocketFrame frame1 = new BinaryWebSocketFrame(false,
WebSocketExtension.RSV3, Unpooled.wrappedBuffer(payload1));
ContinuationWebSocketFrame frame2 = new ContinuationWebSocketFrame(false,
WebSocketExtension.RSV3, Unpooled.wrappedBuffer(payload2));
ContinuationWebSocketFrame frame3 = new ContinuationWebSocketFrame(true,
WebSocketExtension.RSV3, Unpooled.wrappedBuffer(payload3));
// execute
encoderChannel.writeOutbound(frame1);
encoderChannel.writeOutbound(frame2);
encoderChannel.writeOutbound(frame3);
BinaryWebSocketFrame compressedFrame1 = encoderChannel.readOutbound();
ContinuationWebSocketFrame compressedFrame2 = encoderChannel.readOutbound();
ContinuationWebSocketFrame compressedFrame3 = encoderChannel.readOutbound();
// test
assertNotNull(compressedFrame1);
assertNotNull(compressedFrame2);
assertNotNull(compressedFrame3);
assertEquals(WebSocketExtension.RSV1 | WebSocketExtension.RSV3, compressedFrame1.rsv());
assertEquals(WebSocketExtension.RSV1 | WebSocketExtension.RSV3, compressedFrame2.rsv());
assertEquals(WebSocketExtension.RSV1 | WebSocketExtension.RSV3, compressedFrame3.rsv());
assertFalse(compressedFrame1.isFinalFragment());
assertFalse(compressedFrame2.isFinalFragment());
assertTrue(compressedFrame3.isFinalFragment());
decoderChannel.writeInbound(compressedFrame1.content());
decoderChannel.writeInbound(Unpooled.wrappedBuffer(DeflateDecoder.FRAME_TAIL));
ByteBuf uncompressedPayload1 = decoderChannel.readInbound();
byte[] finalPayload1 = new byte[100];
uncompressedPayload1.readBytes(finalPayload1);
assertTrue(Arrays.equals(finalPayload1, payload1));
uncompressedPayload1.release();
decoderChannel.writeInbound(compressedFrame2.content());
decoderChannel.writeInbound(Unpooled.wrappedBuffer(DeflateDecoder.FRAME_TAIL));
ByteBuf uncompressedPayload2 = decoderChannel.readInbound();
byte[] finalPayload2 = new byte[100];
uncompressedPayload2.readBytes(finalPayload2);
assertTrue(Arrays.equals(finalPayload2, payload2));
uncompressedPayload2.release();
decoderChannel.writeInbound(compressedFrame3.content());
decoderChannel.writeInbound(Unpooled.wrappedBuffer(DeflateDecoder.FRAME_TAIL));
ByteBuf uncompressedPayload3 = decoderChannel.readInbound();
byte[] finalPayload3 = new byte[100];
uncompressedPayload3.readBytes(finalPayload3);
assertTrue(Arrays.equals(finalPayload3, payload3));
uncompressedPayload3.release();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object message) throws Exception {
LOG.trace("New data read: incoming: {}", message);
Channel ch = ctx.channel();
if (!handshaker.isHandshakeComplete()) {
handshaker.finishHandshake(ch, (FullHttpResponse) message);
LOG.trace("WebSocket Client connected! {}", ctx.channel());
// Now trigger super processing as we are really connected.
NettyWSTransport.super.handleConnected(ch);
return;
}
// We shouldn't get this since we handle the handshake previously.
if (message instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) message;
throw new IllegalStateException(
"Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(StandardCharsets.UTF_8) + ')');
}
WebSocketFrame frame = (WebSocketFrame) message;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
LOG.warn("WebSocket Client received message: " + textFrame.text());
ctx.fireExceptionCaught(new IOException("Received invalid frame over WebSocket."));
} else if (frame instanceof BinaryWebSocketFrame) {
BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame;
LOG.trace("WebSocket Client received data: {} bytes", binaryFrame.content().readableBytes());
listener.onData(binaryFrame.content());
} else if (frame instanceof ContinuationWebSocketFrame) {
ContinuationWebSocketFrame continuationFrame = (ContinuationWebSocketFrame) frame;
LOG.trace("WebSocket Client received data continuation: {} bytes", continuationFrame.content().readableBytes());
listener.onData(continuationFrame.content());
} else if (frame instanceof PingWebSocketFrame) {
LOG.trace("WebSocket Client received ping, response with pong");
ch.write(new PongWebSocketFrame(frame.content()));
} else if (frame instanceof CloseWebSocketFrame) {
LOG.trace("WebSocket Client received closing");
ch.close();
}
}
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
if (!handshaker.isHandshakeComplete()) {
handshaker.finishHandshake(ch, (FullHttpResponse) msg);
log.debug("{} WebSocket Client connected!", label);
handshakeFuture.setSuccess();
return;
}
if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
throw new IllegalStateException(
"Unexpected FullHttpResponse (getStatus=" + response.status() + ", content="
+ response.content().toString(CharsetUtil.UTF_8) + ')');
}
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
if (textFrame.isFinalFragment()) {
receivedTextMessage(textFrame.text());
} else {
partialText.append(textFrame.text());
}
} else if (frame instanceof ContinuationWebSocketFrame) {
ContinuationWebSocketFrame continuationFrame = (ContinuationWebSocketFrame) frame;
partialText.append(continuationFrame.text());
if (continuationFrame.isFinalFragment()) {
receivedTextMessage(partialText.toString());
partialText.setLength(0);
}
} else if (frame instanceof CloseWebSocketFrame) {
CloseWebSocketFrame closeFrame = (CloseWebSocketFrame) frame;
log.info("{} Received close frame from server. Will close client! Reason: {}", label,
closeFrame.reasonText());
} else {
log.warn("{} Received frame of type {}. Will be ignored", label,
frame.getClass().getSimpleName());
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object message) throws Exception {
LOG.trace("New data read: incoming: {}", message);
Channel ch = ctx.channel();
if (!handshaker.isHandshakeComplete()) {
handshaker.finishHandshake(ch, (FullHttpResponse) message);
LOG.trace("WebSocket Client connected! {}", ctx.channel());
// Now trigger super processing as we are really connected.
if(handshakeTimeoutFuture.cancel(false)) {
NettyWsTransport.super.handleConnected(ch);
}
return;
}
// We shouldn't get this since we handle the handshake previously.
if (message instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) message;
throw new IllegalStateException(
"Unexpected FullHttpResponse (getStatus=" + response.status() +
", content=" + response.content().toString(StandardCharsets.UTF_8) + ')');
}
WebSocketFrame frame = (WebSocketFrame) message;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
LOG.warn("WebSocket Client received message: " + textFrame.text());
ctx.fireExceptionCaught(new IOException("Received invalid frame over WebSocket."));
} else if (frame instanceof BinaryWebSocketFrame) {
BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame;
LOG.trace("WebSocket Client received data: {} bytes", binaryFrame.content().readableBytes());
listener.onData(binaryFrame.content());
} else if (frame instanceof ContinuationWebSocketFrame) {
ContinuationWebSocketFrame continuationFrame = (ContinuationWebSocketFrame) frame;
LOG.trace("WebSocket Client received data continuation: {} bytes", continuationFrame.content().readableBytes());
listener.onData(continuationFrame.content());
} else if (frame instanceof PingWebSocketFrame) {
LOG.trace("WebSocket Client received ping, response with pong");
ch.write(new PongWebSocketFrame(frame.content()));
} else if (frame instanceof CloseWebSocketFrame) {
LOG.trace("WebSocket Client received closing");
ch.close();
}
}