下面列出了 io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame #io.netty.handler.codec.http.websocketx.CloseWebSocketFrame 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Print {@link WebSocketFrame} information.
*
* @param log {@link Logger} object of the relevant class
* @param frame {@link WebSocketFrame} frame
* @param channelContextId {@link ChannelHandlerContext} context id as a String
* @param customMsg Log message which needs to be appended to the frame information,
* if it is not required provide null
* @param isInbound true if the frame is inbound, false if it is outbound
*/
private static void printWebSocketFrame(Logger log, WebSocketFrame frame, String channelContextId,
String customMsg, boolean isInbound) {
String logStatement = getDirectionString(isInbound) + channelContextId;
if (frame instanceof PingWebSocketFrame) {
logStatement += " Ping frame";
} else if (frame instanceof PongWebSocketFrame) {
logStatement += " Pong frame";
} else if (frame instanceof CloseWebSocketFrame) {
logStatement += " Close frame";
} else if (frame instanceof BinaryWebSocketFrame) {
logStatement += " Binary frame";
} else if (frame instanceof TextWebSocketFrame) {
logStatement += " " + ((TextWebSocketFrame) frame).text();
}
//specifically for logging close websocket frames with error status
if (customMsg != null) {
logStatement += " " + customMsg;
}
log.debug(logStatement);
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// Check for closing frame
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (!(frame instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
.getName()));
}
// Send the uppercase string back.
String request = ((TextWebSocketFrame) frame).text();
System.err.printf("%s received %s%n", ctx.channel(), request);
ctx.channel().write(new TextWebSocketFrame(request.toUpperCase()));
}
/**
* Print {@link WebSocketFrame} information.
*
* @param log {@link Log} object of the relevant class
* @param frame {@link WebSocketFrame} frame
* @param channelContextId {@link ChannelHandlerContext} context id as a String
* @param customMsg Log message which needs to be appended to the frame information,
* if it is not required provide null
* @param isInbound true if the frame is inbound, false if it is outbound
*/
private static void printWebSocketFrame(
Log log, WebSocketFrame frame, String channelContextId,
String customMsg, boolean isInbound) {
String logStatement = getDirectionString(isInbound) + channelContextId;
if (frame instanceof PingWebSocketFrame) {
logStatement += " Ping frame";
} else if (frame instanceof PongWebSocketFrame) {
logStatement += " Pong frame";
} else if (frame instanceof CloseWebSocketFrame) {
logStatement += " Close frame";
} else if (frame instanceof BinaryWebSocketFrame) {
logStatement += " Binary frame";
} else if (frame instanceof TextWebSocketFrame) {
logStatement += " " + ((TextWebSocketFrame) frame).text();
}
//specifically for logging close websocket frames with error status
if (customMsg != null) {
logStatement += " " + customMsg;
}
log.debug(logStatement);
}
void onCloseFrame(final CloseWebSocketFrame message) {
if (session.isSessionClosed()) {
//we have already handled this when we sent the close frame
return;
}
String reason = message.reasonText();
int code = message.statusCode() == -1 ? CloseReason.CloseCodes.NORMAL_CLOSURE.getCode() : message.statusCode();
session.getContainer().invokeEndpointMethod(executor, new Runnable() {
@Override
public void run() {
try {
session.closeInternal(new CloseReason(CloseReason.CloseCodes.getCloseCode(code), reason));
} catch (IOException e) {
invokeOnError(e);
}
}
});
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (frame instanceof TextWebSocketFrame) {
if (webSocketEvent != null) {
webSocketEvent.onMessageStringEvent(baseServer, new WebSocketSession(ctx.channel()), ((TextWebSocketFrame) frame).text());
}
return;
}
if (frame instanceof BinaryWebSocketFrame) {
if (webSocketEvent != null) {
webSocketEvent.onMessageBinaryEvent(baseServer, new WebSocketSession(ctx.channel()), ((BinaryWebSocketFrame)frame).content());
}
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// Check for closing frame
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (frame instanceof TextWebSocketFrame) {
// Echo the frame
ctx.write(frame.retain());
return;
}
if (frame instanceof BinaryWebSocketFrame) {
// Echo the frame
ctx.write(frame.retain());
return;
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// Check for closing frame
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (frame instanceof TextWebSocketFrame) {
// Echo the frame
ctx.write(frame.retain());
return;
}
if (frame instanceof BinaryWebSocketFrame) {
// Echo the frame
ctx.write(frame.retain());
}
}
/**
* 处理WebSocket请求
*
* @param ctx
* @param frame
*/
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
ctx.close();
return;
}
// 没有使用WebSocketServerProtocolHandler,所以不会接收到PingWebSocketFrame。
// if (frame instanceof PingWebSocketFrame) {
// ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
// return;
// }
if (!(frame instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException(
String.format("%s frame types not supported", frame.getClass().getName()));
}
String request = ((TextWebSocketFrame) frame).text();
logger.debug("收到客户端发送的数据:" + request);
// 回复心跳
if (request.length() == 0) {
ctx.writeAndFlush(new TextWebSocketFrame(""));
return;
}
this.handleMessage(ctx.channel(), request);
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (logger.isLoggable(Level.FINE)) {
logger.fine(String.format(
"Channel %s received %s", ctx.channel().hashCode(), StringUtil.simpleClassName(frame)));
}
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
} else if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.content()), ctx.voidPromise());
} else if (frame instanceof TextWebSocketFrame) {
ctx.write(frame, ctx.voidPromise());
} else if (frame instanceof BinaryWebSocketFrame) {
ctx.write(frame, ctx.voidPromise());
} else if (frame instanceof ContinuationWebSocketFrame) {
ctx.write(frame, ctx.voidPromise());
} else if (frame instanceof PongWebSocketFrame) {
frame.release();
// Ignore
} else {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
.getName()));
}
}
@Override
public void channelRead0(ChannelHandlerContext context, Object message) {
Channel channel = context.channel();
if (message instanceof FullHttpResponse) {
checkState(!handshaker.isHandshakeComplete());
try {
handshaker.finishHandshake(channel, (FullHttpResponse) message);
delegate.onOpen();
} catch (WebSocketHandshakeException e) {
delegate.onError(e);
}
} else if (message instanceof TextWebSocketFrame) {
delegate.onMessage(((TextWebSocketFrame) message).text());
} else {
checkState(message instanceof CloseWebSocketFrame);
delegate.onClose();
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// Check for closing frame
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (frame instanceof TextWebSocketFrame) {
// Echo the frame
ctx.write(frame.retain());
return;
}
if (frame instanceof BinaryWebSocketFrame) {
// Echo the frame
ctx.write(frame.retain());
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception {
if (log.isDebugEnabled())
log.debug("Received {} WebSocketFrame: {} from channel: {}", getTransportType().getName(), msg, ctx.channel());
if (msg instanceof CloseWebSocketFrame) {
sessionIdByChannel.remove(ctx.channel());
ChannelFuture f = ctx.writeAndFlush(msg);
f.addListener(ChannelFutureListener.CLOSE);
} else if (msg instanceof PingWebSocketFrame) {
ctx.writeAndFlush(new PongWebSocketFrame(msg.content()));
} else if (msg instanceof TextWebSocketFrame || msg instanceof BinaryWebSocketFrame){
Packet packet = PacketDecoder.decodePacket(msg.content());
packet.setTransportType(getTransportType());
String sessionId = sessionIdByChannel.get(ctx.channel());
packet.setSessionId(sessionId);
msg.release();
ctx.fireChannelRead(packet);
} else {
msg.release();
log.warn("{} frame type is not supported", msg.getClass().getName());
}
}
@Test
public void testCreateSubscriptionWithMissingSessionId() throws Exception {
decoder = new WebSocketRequestDecoder(config.getSecurity());
// @formatter:off
String request = "{ "+
"\"operation\" : \"create\", " +
"\"subscriptionId\" : \"1234\"" +
" }";
// @formatter:on
TextWebSocketFrame frame = new TextWebSocketFrame();
frame.content().writeBytes(request.getBytes(StandardCharsets.UTF_8));
decoder.decode(ctx, frame, results);
Assert.assertNotNull(ctx.msg);
Assert.assertEquals(CloseWebSocketFrame.class, ctx.msg.getClass());
Assert.assertEquals(1008, ((CloseWebSocketFrame) ctx.msg).statusCode());
Assert.assertEquals("User must log in", ((CloseWebSocketFrame) ctx.msg).reasonText());
}
@Test
public void testCreateSubscriptionWithInvalidSessionIdAndNonAnonymousAccess() throws Exception {
ctx.channel().attr(SubscriptionRegistry.SESSION_ID_ATTR)
.set(URLEncoder.encode(UUID.randomUUID().toString(), StandardCharsets.UTF_8.name()));
decoder = new WebSocketRequestDecoder(config.getSecurity());
// @formatter:off
String request = "{ "+
"\"operation\" : \"create\", " +
"\"subscriptionId\" : \"1234\"" +
" }";
// @formatter:on
TextWebSocketFrame frame = new TextWebSocketFrame();
frame.content().writeBytes(request.getBytes(StandardCharsets.UTF_8));
decoder.decode(ctx, frame, results);
Assert.assertNotNull(ctx.msg);
Assert.assertEquals(CloseWebSocketFrame.class, ctx.msg.getClass());
Assert.assertEquals(1008, ((CloseWebSocketFrame) ctx.msg).statusCode());
Assert.assertEquals("User must log in", ((CloseWebSocketFrame) ctx.msg).reasonText());
}
private byte[] decodeWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// Check for closing frame
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return null;
}
if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.content().retain()));
return null;
}
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
return parseMessage(textFrame.content());
}
if (frame instanceof BinaryWebSocketFrame) {
BinaryWebSocketFrame binFrame = (BinaryWebSocketFrame) frame;
return parseMessage(binFrame.content());
}
logger.warn("Message format error: " + frame);
return null;
}
Mono<Void> sendClose(CloseWebSocketFrame frame) {
if (CLOSE_SENT.get(this) == 0) {
//commented for now as we assume the close is always scheduled (deferFuture runs)
//onTerminate().subscribe(null, null, () -> ReactorNetty.safeRelease(frame));
return FutureMono.deferFuture(() -> {
if (CLOSE_SENT.getAndSet(this, 1) == 0) {
discard();
onCloseState.onNext(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText()));
return channel().writeAndFlush(frame)
.addListener(ChannelFutureListener.CLOSE);
}
frame.release();
return channel().newSucceededFuture();
}).doOnCancel(() -> ReactorNetty.safeRelease(frame));
}
frame.release();
return Mono.empty();
}
@SuppressWarnings("FutureReturnValueIgnored")
void sendCloseNow(@Nullable CloseWebSocketFrame frame) {
if (frame != null && !frame.isFinalFragment()) {
//"FutureReturnValueIgnored" this is deliberate
channel().writeAndFlush(frame);
return;
}
if (CLOSE_SENT.getAndSet(this, 1) == 0) {
if (frame != null) {
onCloseState.onNext(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText()));
channel().writeAndFlush(frame)
.addListener(ChannelFutureListener.CLOSE);
} else {
onCloseState.onNext(new WebSocketCloseStatus(-1, ""));
channel().writeAndFlush(new CloseWebSocketFrame())
.addListener(ChannelFutureListener.CLOSE);
}
}
else if (frame != null) {
frame.release();
}
}
Mono<Void> sendClose(CloseWebSocketFrame frame) {
if (CLOSE_SENT.get(this) == 0) {
//commented for now as we assume the close is always scheduled (deferFuture runs)
//onTerminate().subscribe(null, null, () -> ReactorNetty.safeRelease(frame));
return FutureMono.deferFuture(() -> {
if (CLOSE_SENT.getAndSet(this, 1) == 0) {
discard();
onCloseState.onNext(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText()));
return channel().writeAndFlush(frame)
.addListener(ChannelFutureListener.CLOSE);
}
frame.release();
return channel().newSucceededFuture();
}).doOnCancel(() -> ReactorNetty.safeRelease(frame));
}
frame.release();
return Mono.empty();
}
@SuppressWarnings("FutureReturnValueIgnored")
void sendCloseNow(@Nullable CloseWebSocketFrame frame, ChannelFutureListener listener) {
if (frame != null && !frame.isFinalFragment()) {
//"FutureReturnValueIgnored" this is deliberate
channel().writeAndFlush(frame);
return;
}
if (CLOSE_SENT.getAndSet(this, 1) == 0) {
if (frame != null) {
onCloseState.onNext(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText()));
channel().writeAndFlush(frame)
.addListener(listener);
} else {
onCloseState.onNext(new WebSocketCloseStatus(-1, ""));
channel().writeAndFlush(new CloseWebSocketFrame())
.addListener(listener);
}
}
else if (frame != null) {
frame.release();
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
// Pass on to the rest of the channel
ctx.fireChannelRead(frame);
} else if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.content()));
} else if (frame instanceof ContinuationWebSocketFrame) {
ctx.write(frame);
} else if (frame instanceof PongWebSocketFrame) {
frame.release();
} else if (frame instanceof BinaryWebSocketFrame || frame instanceof TextWebSocketFrame) {
// Allow the rest of the pipeline to deal with this.
ctx.fireChannelRead(frame);
} else {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
.getName()));
}
}
@Test
public void testCloseWebSocketFrameSentByClient() {
httpServer =
HttpServer.create()
.port(0)
.handle((req, res) ->
res.sendWebsocket((in, out) -> out.sendString(Mono.just("echo"))
.sendObject(new CloseWebSocketFrame())))
.wiretap(true)
.bindNow();
Mono<Void> response =
HttpClient.create()
.port(httpServer.port())
.websocket()
.uri("/")
.handle((in, out) -> out.sendObject(in.receiveFrames()
.doOnNext(WebSocketFrame::retain)
.then()))
.next();
StepVerifier.create(response)
.expectComplete()
.verify(Duration.ofSeconds(30));
}
@Test
public void testIssue444() {
doTestIssue444((in, out) ->
out.sendObject(Flux.error(new Throwable())
.onErrorResume(ex -> out.sendClose(1001, "Going Away"))
.cast(WebSocketFrame.class)));
doTestIssue444((in, out) ->
out.send(Flux.range(0, 10)
.map(i -> {
if (i == 5) {
out.sendClose(1001, "Going Away").subscribe();
}
return Unpooled.copiedBuffer((i + "").getBytes(Charset.defaultCharset()));
})));
doTestIssue444((in, out) ->
out.sendObject(Flux.error(new Throwable())
.onErrorResume(ex -> Flux.empty())
.cast(WebSocketFrame.class))
.then(Mono.defer(() -> out.sendObject(
new CloseWebSocketFrame(1001, "Going Away")).then())));
}
private void doWSRequest(ChannelHandlerContext ctx, WebSocketFrame wsFrame) {
InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = socketAddress.getAddress().getHostAddress();
ChannelSession channelSession = ctx.channel().attr(ChannelConst.CHANNELSESSION).get();
Command command = new Command();
command.setSiteUserId(channelSession.getUserId());
command.setClientIp(clientIp);
command.setStartTime(System.currentTimeMillis());
if (wsFrame instanceof TextWebSocketFrame) {
TextWebSocketFrame textWsFrame = (TextWebSocketFrame) wsFrame;
String webText = textWsFrame.text();
try {
command.setParams(webText.getBytes(CharsetCoding.UTF_8));
} catch (UnsupportedEncodingException e) {
logger.error("web message text=" + webText + " Charset code error");
}
TextWebSocketFrame resFrame = new TextWebSocketFrame(textWsFrame.text());
ctx.channel().writeAndFlush(resFrame);
executor.execute("WS-ACTION", command);
} else if (wsFrame instanceof PingWebSocketFrame) {
// ping/pong
ctx.channel().writeAndFlush(new PongWebSocketFrame(wsFrame.content().retain()));
logger.info("ws client siteUserId={} ping to server", command.getSiteUserId());
} else if (wsFrame instanceof CloseWebSocketFrame) {
// close channel
wsHandshaker.close(ctx.channel(), (CloseWebSocketFrame) wsFrame.retain());
WebChannelManager.delChannelSession(command.getSiteUserId());
}
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
if (!(msg instanceof Message)) {
super.write(ctx, msg, promise);
return;
}
Message seMessage = (Message) msg;
if (seMessage instanceof CloseMessage) {
ctx.writeAndFlush(new CloseWebSocketFrame(true, 0));
} else if (seMessage instanceof BinaryMessage) {
ctx.writeAndFlush(
new BinaryWebSocketFrame(
true,
0,
Unpooled.copiedBuffer(((BinaryMessage) seMessage).data())));
} else if (seMessage instanceof TextMessage) {
ctx.writeAndFlush(
new TextWebSocketFrame(
true,
0,
((TextMessage) seMessage).text()));
} else {
LOG.warning(String.format("Unable to handle %s", msg));
super.write(ctx, msg, promise);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
if (frame instanceof TextWebSocketFrame) {
// Echos the same text
String text = ((TextWebSocketFrame) frame).text();
ctx.channel().writeAndFlush(new TextWebSocketFrame(text));
} else if (frame instanceof BinaryWebSocketFrame) {
ctx.channel().writeAndFlush(frame.retain());
} else if (frame instanceof CloseWebSocketFrame) {
ctx.close();
} else {
String message = "unsupported frame type: " + frame.getClass().getName();
throw new UnsupportedOperationException(message);
}
}
public void addCloseListener(final ChannelHandlerContext targetCtx) {
ChannelFuture closeFuture = ctx.channel().closeFuture();
closeFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
if (targetCtx.channel().isActive()) {
targetCtx.channel().write(new CloseWebSocketFrame()).addListener(ChannelFutureListener.CLOSE);
}
}
}
});
}
public void handleClientWebsocketChannelTermination(WebSocketFrame frame) throws AxisFault {
handshaker.close(wrappedContext.getChannelHandlerContext().channel(), (CloseWebSocketFrame) frame.retain());
String endpointName = WebsocketEndpointManager.getInstance().getEndpointName(port, tenantDomain);
WebsocketSubscriberPathManager.getInstance()
.removeChannelContext(endpointName, subscriberPath.getPath(), wrappedContext);
}
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
if (!handshaker.isHandshakeComplete()) {
handshaker.finishHandshake(ch, (FullHttpResponse) msg);
System.out.println("WebSocket Client connected!");
handshakeFuture.setSuccess();
return;
}
if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
throw new IllegalStateException(
"Unexpected FullHttpResponse (getStatus=" + response.getStatus() +
", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
}
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
System.out.println("WebSocket Client received message: " + textFrame.text());
} else if (frame instanceof PongWebSocketFrame) {
System.out.println("WebSocket Client received pong");
} else if (frame instanceof CloseWebSocketFrame) {
System.out.println("WebSocket Client received closing");
ch.close();
}
}
@Override
protected void decode(final ChannelHandlerContext channelHandlerContext, final CloseWebSocketFrame frame, final List<Object> objects) throws Exception {
final ByteBuf messageBytes = frame.content();
final byte len = messageBytes.readByte();
if (len <= 0) {
objects.add(RequestMessage.INVALID);
return;
}
final ByteBuf contentTypeBytes = channelHandlerContext.alloc().buffer(len);
try {
messageBytes.readBytes(contentTypeBytes);
final String contentType = contentTypeBytes.toString(UTF8);
final MessageSerializer serializer = select(contentType, ServerSerializers.DEFAULT_BINARY_SERIALIZER);
// it's important to re-initialize these channel attributes as they apply globally to the channel. in
// other words, the next request to this channel might not come with the same configuration and mixed
// state can carry through from one request to the next
channelHandlerContext.channel().attr(StateKey.SESSION).set(null);
channelHandlerContext.channel().attr(StateKey.SERIALIZER).set(serializer);
channelHandlerContext.channel().attr(StateKey.USE_BINARY).set(true);
try {
objects.add(serializer.deserializeRequest(messageBytes.discardReadBytes()));
} catch (SerializationException se) {
objects.add(RequestMessage.INVALID);
}
} finally {
contentTypeBytes.release();
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
if (frame instanceof TextWebSocketFrame) {
// Echos the same text
String text = ((TextWebSocketFrame) frame).text();
ctx.channel().writeAndFlush(new TextWebSocketFrame(text));
} else if (frame instanceof BinaryWebSocketFrame) {
ctx.channel().writeAndFlush(frame.retain());
} else if (frame instanceof CloseWebSocketFrame) {
ctx.close();
} else {
String message = "unsupported frame type: " + frame.getClass().getName();
throw new UnsupportedOperationException(message);
}
}