下面列出了 io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame #io.netty.handler.codec.http.websocketx.WebSocketFrame 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testDoThrottle1() {
String publisherClass = "publisherClass";
PowerMockito.mockStatic(DataPublisherUtil.class);
APIManagerAnalyticsConfiguration apiMngAnalyticsConfig = Mockito.mock(APIManagerAnalyticsConfiguration.class);
PowerMockito.when(DataPublisherUtil.getApiManagerAnalyticsConfiguration()).thenReturn(apiMngAnalyticsConfig);
Mockito.when(apiMngAnalyticsConfig.getPublisherClass()).thenReturn(publisherClass);
//todo
ChannelHandlerContext channelHandlerContext = Mockito.mock(ChannelHandlerContext.class);
WebSocketFrame webSocketFrame = Mockito.mock(WebSocketFrame.class);
WebsocketInboundHandler websocketInboundHandler = new WebsocketInboundHandler() {
@Override
protected String getRemoteIP(ChannelHandlerContext ctx) {
return "localhost";
}
};
try {
websocketInboundHandler.doThrottle(channelHandlerContext, webSocketFrame);
fail("Expected NumberFormatException is not thrown.");
} catch (Exception e) {
Assert.assertTrue(e instanceof NumberFormatException);
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (logger.isLoggable(Level.FINE)) {
logger.fine(String.format(
"Channel %s received %s", ctx.channel().hashCode(), StringUtil.simpleClassName(frame)));
}
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
} else if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.content()), ctx.voidPromise());
} else if (frame instanceof TextWebSocketFrame) {
ctx.write(frame, ctx.voidPromise());
} else if (frame instanceof BinaryWebSocketFrame) {
ctx.write(frame, ctx.voidPromise());
} else if (frame instanceof ContinuationWebSocketFrame) {
ctx.write(frame, ctx.voidPromise());
} else if (frame instanceof PongWebSocketFrame) {
frame.release();
// Ignore
} else {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
.getName()));
}
}
protected void handleWebsocketBinaryFrame(WebSocketFrame frame, MessageContext synCtx) throws AxisFault {
String endpointName = WebsocketEndpointManager.getInstance().getEndpointName(port, tenantDomain);
InboundEndpoint endpoint = synCtx.getConfiguration().getInboundEndpoint(endpointName);
if (endpoint == null) {
log.error("Cannot find deployed inbound endpoint " + endpointName + "for process request");
return;
}
synCtx.setProperty(InboundWebsocketConstants.WEBSOCKET_BINARY_FRAME_PRESENT, new Boolean(true));
((Axis2MessageContext) synCtx).getAxis2MessageContext()
.setProperty(InboundWebsocketConstants.WEBSOCKET_BINARY_FRAME_PRESENT, new Boolean(true));
synCtx.setProperty(InboundWebsocketConstants.WEBSOCKET_BINARY_FRAME, frame);
((Axis2MessageContext) synCtx).getAxis2MessageContext()
.setProperty(InboundWebsocketConstants.WEBSOCKET_BINARY_FRAME, frame);
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// Check for closing frame
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (frame instanceof TextWebSocketFrame) {
// Echo the frame
ctx.write(frame.retain());
return;
}
if (frame instanceof BinaryWebSocketFrame) {
// Echo the frame
ctx.write(frame.retain());
return;
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// Check for closing frame
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (!(frame instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
.getName()));
}
// Send the uppercase string back.
String request = ((TextWebSocketFrame) frame).text();
System.err.printf("%s received %s%n", ctx.channel(), request);
ctx.channel().write(new TextWebSocketFrame(request.toUpperCase()));
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
// Pass on to the rest of the channel
ctx.fireChannelRead(frame);
} else if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.content()));
} else if (frame instanceof ContinuationWebSocketFrame) {
ctx.write(frame);
} else if (frame instanceof PongWebSocketFrame) {
frame.release();
} else if (frame instanceof BinaryWebSocketFrame || frame instanceof TextWebSocketFrame) {
// Allow the rest of the pipeline to deal with this.
ctx.fireChannelRead(frame);
} else {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
.getName()));
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (frame instanceof TextWebSocketFrame) {
if (webSocketEvent != null) {
webSocketEvent.onMessageStringEvent(baseServer, new WebSocketSession(ctx.channel()), ((TextWebSocketFrame) frame).text());
}
return;
}
if (frame instanceof BinaryWebSocketFrame) {
if (webSocketEvent != null) {
webSocketEvent.onMessageBinaryEvent(baseServer, new WebSocketSession(ctx.channel()), ((BinaryWebSocketFrame)frame).content());
}
}
}
protected void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof PingWebSocketFrame) {
if (logger.isTraceEnabled()) {
logger.trace("Ping with payload [{}]", ByteBufUtil.hexDump(frame.content()));
}
PongWebSocketFrame pong = new PongWebSocketFrame(frame.content().retain());
ctx.writeAndFlush(pong);
}
else if (frame instanceof PongWebSocketFrame) {
PingPong pingPongSession = PingPong.get(ctx.channel());
if (pingPongSession != null) {
pingPongSession.recordPong();
}
}
else {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
.getName()));
}
}
/**
* 处理WebSocket请求
*
* @param ctx
* @param frame
*/
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
ctx.close();
return;
}
// 没有使用WebSocketServerProtocolHandler,所以不会接收到PingWebSocketFrame。
// if (frame instanceof PingWebSocketFrame) {
// ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
// return;
// }
if (!(frame instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException(
String.format("%s frame types not supported", frame.getClass().getName()));
}
String request = ((TextWebSocketFrame) frame).text();
logger.debug("收到客户端发送的数据:" + request);
// 回复心跳
if (request.length() == 0) {
ctx.writeAndFlush(new TextWebSocketFrame(""));
return;
}
this.handleMessage(ctx.channel(), request);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpRequest) {
logger.debug("HttpRequest");
// 完成 握手
handleHttpRequest(ctx, (HttpRequest) msg);
} else if (msg instanceof IMProtoMessage) {
super.channelRead(ctx, msg);
} else if (msg instanceof WebSocketFrame) {
// websocketService.handleFrame(ctx, (WebSocketFrame) msg);
// 这句应该走不到
logger.debug("WebSocketFrame");
} else {
logger.debug("other:{}", msg);
}
}
public void doOnBinary(Channel channel, WebSocketFrame frame) {
Attribute<String> attrPath = channel.attr(PATH_KEY);
PojoMethodMapping methodMapping = null;
if (pathMethodMappingMap.size() == 1) {
methodMapping = pathMethodMappingMap.values().iterator().next();
} else {
String path = attrPath.get();
methodMapping = pathMethodMappingMap.get(path);
}
if (methodMapping.getOnBinary() != null) {
BinaryWebSocketFrame binaryWebSocketFrame = (BinaryWebSocketFrame) frame;
Object implement = channel.attr(POJO_KEY).get();
try {
methodMapping.getOnBinary().invoke(implement, methodMapping.getOnBinaryArgs(channel, binaryWebSocketFrame));
} catch (Throwable t) {
logger.error(t);
}
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
// http 请求握手
doHttpRequest(ctx, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
// websocket 请求
doWSRequest(ctx, (WebSocketFrame) msg);
} else {
// 错误请求,关闭连接
ctx.close();
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
// http 请求握手
doHttpRequest(ctx, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
// websocket 请求
doWSRequest(ctx, (WebSocketFrame) msg);
} else {
// 错误请求,关闭连接
ctx.close();
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
if (frame instanceof TextWebSocketFrame) {
// Echos the same text
String text = ((TextWebSocketFrame) frame).text();
ctx.channel().writeAndFlush(new TextWebSocketFrame(text));
} else if (frame instanceof BinaryWebSocketFrame) {
ctx.channel().writeAndFlush(frame.retain());
} else if (frame instanceof CloseWebSocketFrame) {
ctx.close();
} else {
String message = "unsupported frame type: " + frame.getClass().getName();
throw new UnsupportedOperationException(message);
}
}
public void broadcastOnSubscriberPath(WebSocketFrame frame, String inboundName, String subscriberPath) {
List<InboundWebsocketChannelContext> contextList = getSubscriberPathChannelContextList(inboundName,
subscriberPath);
for (InboundWebsocketChannelContext context : contextList) {
WebSocketFrame duplicatedFrame = frame.duplicate();
context.writeToChannel(duplicatedFrame);
}
}
@SuppressWarnings("unchecked")
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
handleHandshake(ctx, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
handleWebSocketFrame(ctx, (WebSocketFrame) msg);
}
}
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
if (!handshaker.isHandshakeComplete()) {
try {
handshaker.finishHandshake(ch, (FullHttpResponse) msg);
System.out.println("WebSocket Client connected!");
handshakeFuture.setSuccess();
} catch (WebSocketHandshakeException e) {
System.out.println("WebSocket Client failed to connect");
handshakeFuture.setFailure(e);
}
return;
}
if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
throw new IllegalStateException(
"Unexpected FullHttpResponse (getStatus=" + response.status() +
", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
}
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
System.out.println("WebSocket Client received message: " + textFrame.text());
} else if (frame instanceof PongWebSocketFrame) {
System.out.println("WebSocket Client received pong");
} else if (frame instanceof CloseWebSocketFrame) {
System.out.println("WebSocket Client received closing");
ch.close();
}
}
public void handleClientWebsocketChannelTermination(WebSocketFrame frame) throws AxisFault {
handshaker.close(wrappedContext.getChannelHandlerContext().channel(), (CloseWebSocketFrame) frame.retain());
String endpointName = WebsocketEndpointManager.getInstance().getEndpointName(port, tenantDomain);
WebsocketSubscriberPathManager.getInstance()
.removeChannelContext(endpointName, subscriberPath.getPath(), wrappedContext);
}
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
moniter.updateTime();
if (!handshaker.isHandshakeComplete()) {
handshaker.finishHandshake(ch, (FullHttpResponse) msg);
System.out.println("WebSocket Client connected!");
handshakeFuture.setSuccess();
return;
}
if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
throw new IllegalStateException(
"Unexpected FullHttpResponse (getStatus=" + response.getStatus() +
", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
}
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
service.onReceive(textFrame.text());
} else if (frame instanceof BinaryWebSocketFrame) {
BinaryWebSocketFrame binaryFrame=(BinaryWebSocketFrame)frame;
service.onReceive(decodeByteBuff(binaryFrame.content()));
}else if (frame instanceof PongWebSocketFrame) {
System.out.println("WebSocket Client received pong");
} else if (frame instanceof CloseWebSocketFrame) {
System.out.println("WebSocket Client received closing");
ch.close();
}
}
@Override
public boolean filter(Channel channel, WebSocketContext context, WebSocketFrame frame) {
//系统默认处理了CloseFrame和PingFrame,文本消息基于文本协议,此处只需要处理二进制消息
//所有的websocket消息将经过该过滤器处理,Filterable可以指定排序值
channel.writeAndFlush(new TextWebSocketFrame("hello world"));
//此处返回true则继续执行,返回False则不会继续执行后面的逻辑
return false;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
LOG.info("Received msg: {}", msg);
if (!this.handshaker.isHandshakeComplete()) {
this.handshaker.finishHandshake(ctx.channel(), (FullHttpResponse) msg);
LOG.info("Client connected.");
this.connected = true;
this.handshakeFuture.setSuccess();
return;
}
if (msg instanceof FullHttpResponse) {
throw new IllegalStateException("Unexpected response: " + msg.toString());
}
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
synchronized (responses) {
responses.add(((TextWebSocketFrame) frame).text().getBytes(StandardCharsets.UTF_8));
}
} else if (frame instanceof BinaryWebSocketFrame) {
ByteBuf buf = frame.content();
byte[] b = new byte[buf.readableBytes()];
buf.readBytes(b);
synchronized (responses) {
responses.add(b);
}
} else if (frame instanceof PingWebSocketFrame) {
LOG.info("Returning pong message");
ctx.writeAndFlush(new PongWebSocketFrame());
} else if (frame instanceof CloseWebSocketFrame) {
LOG.info("Received message from server to close the channel.");
ctx.close();
} else {
LOG.warn("Unhandled frame type received: " + frame.getClass());
}
}
@Override
public Mono<Void> send(Publisher<WebSocketMessage> messages) {
Flux<WebSocketFrame> frames = Flux.from(messages)
.doOnNext(message -> {
if (logger.isTraceEnabled()) {
logger.trace(getLogPrefix() + "Sending " + message);
}
})
.map(this::toFrame);
return getDelegate().getOutbound()
.options(NettyPipeline.SendOptions::flushOnEach)
.sendObject(frames)
.then();
}
@Test
public void testIssue663_3() {
FluxIdentityProcessor<WebSocketFrame> incomingData = Processors.more().multicastNoBackpressure();
httpServer =
HttpServer.create()
.port(0)
.handle((req, resp) -> resp.sendWebsocket((i, o) -> i.receiveFrames().then()))
.wiretap(true)
.bindNow();
HttpClient.create()
.port(httpServer.port())
.wiretap(true)
.websocket()
.uri("/")
.handle((in, out) ->
out.sendObject(Flux.just(new PingWebSocketFrame(), new CloseWebSocketFrame())
.delayElements(Duration.ofMillis(100)))
.then(in.receiveFrames()
.subscribeWith(incomingData)
.then()))
.subscribe();
StepVerifier.create(incomingData)
.expectNext(new PongWebSocketFrame())
.expectComplete()
.verify(Duration.ofSeconds(30));
}
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
if (!handshaker.isHandshakeComplete()) {
try {
handshaker.finishHandshake(ch, (FullHttpResponse) msg);
running = true;
LOG.trace("WebSocket Client connected!");
handshakeFuture.setSuccess();
} catch (WebSocketHandshakeException e) {
LOG.trace("WebSocket Client failed to connect");
running = false;
handshakeFuture.setFailure(e);
}
return;
}
if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content="
+ response.content().toString(CharsetUtil.UTF_8) + ')');
}
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
LOG.trace("WebSocket Client received message: " + textFrame.text());
List results = this.gson.fromJson(textFrame.text(), List.class);
this.subscriptions.getOrDefault(results.get(0), this.defaultSubscriptionMessageHandler).handle(textFrame.text());
} else if (frame instanceof CloseWebSocketFrame) {
LOG.trace("WebSocket Client received closing");
running = false;
ch.close();
}
}
private void onText(WebSocketFrame frame, String text) throws IOException {
if (session.isSessionClosed()) {
//to bad, the channel has already been closed
//we just ignore messages that are received after we have closed, as the endpoint is no longer in a valid state to deal with them
//this this should only happen if a message was on the wire when we called close()
session.close();
return;
}
if (!frame.isFinalFragment()) {
expectedContinuation = FrameType.TEXT;
} else {
expectedContinuation = null;
}
final HandlerWrapper handler = getHandler(FrameType.TEXT);
if (handler != null &&
(handler.isPartialHandler() || (stringBuffer == null && frame.isFinalFragment()))) {
invokeTextHandler(text, handler, frame.isFinalFragment());
} else if (handler != null) {
if (stringBuffer == null) {
stringBuffer = new StringBuilder();
}
stringBuffer.append(text);
if (frame.isFinalFragment()) {
invokeTextHandler(stringBuffer.toString(), handler, frame.isFinalFragment());
stringBuffer = null;
}
}
}
private void doTestServerMaxFramePayloadLength(int maxFramePayloadLength, Flux<String> input, Flux<String> expectation, int count) {
httpServer =
HttpServer.create()
.port(0)
.handle((req, res) -> res.sendWebsocket((in, out) ->
out.sendObject(in.aggregateFrames()
.receiveFrames()
.map(WebSocketFrame::content)
.map(byteBuf ->
byteBuf.readCharSequence(byteBuf.readableBytes(), Charset.defaultCharset()).toString())
.map(TextWebSocketFrame::new)),
WebsocketServerSpec.builder().maxFramePayloadLength(maxFramePayloadLength).build()))
.wiretap(true)
.bindNow();
FluxIdentityProcessor<String> output = Processors.replayAll();
HttpClient.create()
.port(httpServer.port())
.websocket()
.uri("/")
.handle((in, out) -> out.sendString(input)
.then(in.aggregateFrames()
.receiveFrames()
.map(WebSocketFrame::content)
.map(byteBuf ->
byteBuf.readCharSequence(byteBuf.readableBytes(), Charset.defaultCharset()).toString())
.take(count)
.subscribeWith(output)
.then()))
.blockLast(Duration.ofSeconds(30));
assertThat(output.collectList().block(Duration.ofSeconds(30)))
.isEqualTo(expectation.collectList().block(Duration.ofSeconds(30)));
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
} else if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
} else if (!(frame instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
}
exec(ctx, frame);
}
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
handleHttpRequest(ctx, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
handleWebSocketFrame(ctx, (WebSocketFrame) msg);
}
}
@Test
public void testIssue663_2() {
FluxIdentityProcessor<WebSocketFrame> incomingData = Processors.more().multicastNoBackpressure();
httpServer =
HttpServer.create()
.port(0)
.handle((req, resp) ->
resp.sendWebsocket((i, o) ->
o.sendObject(Flux.just(new PingWebSocketFrame(), new CloseWebSocketFrame())
.delayElements(Duration.ofMillis(100)))
.then(i.receiveFrames()
.subscribeWith(incomingData)
.then())))
.wiretap(true)
.bindNow();
HttpClient.create()
.port(httpServer.port())
.wiretap(true)
.websocket(WebsocketClientSpec.builder().handlePing(true).build())
.uri("/")
.handle((in, out) -> in.receiveFrames())
.subscribe();
StepVerifier.create(incomingData)
.expectComplete()
.verify(Duration.ofSeconds(30));
}
public static RequestPacket valueOfHandler(WebSocketFrame frame, long uid) {
RequestPacket request = new RequestPacket();
//uid
request.uid = uid;
//messageId
ByteBuf message = frame.content();
request.messageId = message.readShort();
//route
byte[] routeBytes = new byte[message.readByte()];
message.readBytes(routeBytes);
request.route = new String(routeBytes);
//data
byte[] data = new byte[message.readShort()];
message.readBytes(data);
request.args = new Object[1];
request.args[0] = data;
// long crc = message.readLong();
// byte[] array = message.array();
// //数据包正确性验证
// if (crc != CRCUtils.calculateCRC(Parameters.CRC32, array, 0, array.length - 8)) {
// LOGGER.error("request packet crc error. crc={} array={}", crc, Arrays.toString(array));
// return;
// }
return request;
}