类 io.netty.handler.codec.http.websocketx.WebSocketFrame 源码实例Demo

下面列出了怎么用 io.netty.handler.codec.http.websocketx.WebSocketFrame 的API类实例代码及写法,或者点击链接到github查看源代码。


@Test
public void testDoThrottle1() {
    String publisherClass = "publisherClass";
    PowerMockito.mockStatic(DataPublisherUtil.class);
    APIManagerAnalyticsConfiguration apiMngAnalyticsConfig = Mockito.mock(APIManagerAnalyticsConfiguration.class);
    PowerMockito.when(DataPublisherUtil.getApiManagerAnalyticsConfiguration()).thenReturn(apiMngAnalyticsConfig);
    Mockito.when(apiMngAnalyticsConfig.getPublisherClass()).thenReturn(publisherClass);
    //todo
    ChannelHandlerContext channelHandlerContext = Mockito.mock(ChannelHandlerContext.class);
    WebSocketFrame webSocketFrame = Mockito.mock(WebSocketFrame.class);
    WebsocketInboundHandler websocketInboundHandler = new WebsocketInboundHandler() {
        @Override
        protected String getRemoteIP(ChannelHandlerContext ctx) {
            return "localhost";
        }
    };
    try {
        websocketInboundHandler.doThrottle(channelHandlerContext, webSocketFrame);
        fail("Expected NumberFormatException is not thrown.");
    } catch (Exception e) {
        Assert.assertTrue(e instanceof NumberFormatException);
    }
}
 

private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
    if (logger.isLoggable(Level.FINE)) {
        logger.fine(String.format(
                "Channel %s received %s", ctx.channel().hashCode(), StringUtil.simpleClassName(frame)));
    }

    if (frame instanceof CloseWebSocketFrame) {
        handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
    } else if (frame instanceof PingWebSocketFrame) {
        ctx.write(new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.content()), ctx.voidPromise());
    } else if (frame instanceof TextWebSocketFrame) {
        ctx.write(frame, ctx.voidPromise());
    } else if (frame instanceof BinaryWebSocketFrame) {
        ctx.write(frame, ctx.voidPromise());
    } else if (frame instanceof ContinuationWebSocketFrame) {
        ctx.write(frame, ctx.voidPromise());
    } else if (frame instanceof PongWebSocketFrame) {
        frame.release();
        // Ignore
    } else {
        throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
                .getName()));
    }
}
 

protected void handleWebsocketBinaryFrame(WebSocketFrame frame, MessageContext synCtx) throws AxisFault {
    String endpointName = WebsocketEndpointManager.getInstance().getEndpointName(port, tenantDomain);

    InboundEndpoint endpoint = synCtx.getConfiguration().getInboundEndpoint(endpointName);

    if (endpoint == null) {
        log.error("Cannot find deployed inbound endpoint " + endpointName + "for process request");
        return;
    }

    synCtx.setProperty(InboundWebsocketConstants.WEBSOCKET_BINARY_FRAME_PRESENT, new Boolean(true));
    ((Axis2MessageContext) synCtx).getAxis2MessageContext()
            .setProperty(InboundWebsocketConstants.WEBSOCKET_BINARY_FRAME_PRESENT, new Boolean(true));
    synCtx.setProperty(InboundWebsocketConstants.WEBSOCKET_BINARY_FRAME, frame);
    ((Axis2MessageContext) synCtx).getAxis2MessageContext()
            .setProperty(InboundWebsocketConstants.WEBSOCKET_BINARY_FRAME, frame);

}
 

private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {

        // Check for closing frame
        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        if (frame instanceof PingWebSocketFrame) {
            ctx.write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        if (frame instanceof TextWebSocketFrame) {
            // Echo the frame
            ctx.write(frame.retain());
            return;
        }
        if (frame instanceof BinaryWebSocketFrame) {
            // Echo the frame
            ctx.write(frame.retain());
            return;
        }
    }
 

private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {

        // Check for closing frame
        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        if (!(frame instanceof TextWebSocketFrame)) {
            throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
                    .getName()));
        }

        // Send the uppercase string back.
        String request = ((TextWebSocketFrame) frame).text();
        System.err.printf("%s received %s%n", ctx.channel(), request);
        ctx.channel().write(new TextWebSocketFrame(request.toUpperCase()));
    }
 

private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
  if (frame instanceof CloseWebSocketFrame) {
    handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
    // Pass on to the rest of the channel
    ctx.fireChannelRead(frame);
  } else if (frame instanceof PingWebSocketFrame) {
    ctx.write(new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.content()));
  } else if (frame instanceof ContinuationWebSocketFrame) {
    ctx.write(frame);
  } else if (frame instanceof PongWebSocketFrame) {
    frame.release();
  } else if (frame instanceof BinaryWebSocketFrame || frame instanceof TextWebSocketFrame) {
    // Allow the rest of the pipeline to deal with this.
    ctx.fireChannelRead(frame);
  } else {
    throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
      .getName()));
  }
}
 

private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
	if (frame instanceof CloseWebSocketFrame) {
		handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
		return;
	}
	if (frame instanceof PingWebSocketFrame) {
		ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
		return;
	}
	if (frame instanceof TextWebSocketFrame) {
		if (webSocketEvent != null) {
			webSocketEvent.onMessageStringEvent(baseServer, new WebSocketSession(ctx.channel()), ((TextWebSocketFrame) frame).text());
		}
		return;
	}
	
	if (frame instanceof BinaryWebSocketFrame) {
		if (webSocketEvent != null) {
			webSocketEvent.onMessageBinaryEvent(baseServer, new WebSocketSession(ctx.channel()), ((BinaryWebSocketFrame)frame).content());
		}
	}
}
 

protected void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
   if (frame instanceof PingWebSocketFrame) {
      if (logger.isTraceEnabled()) {
         logger.trace("Ping with payload [{}]", ByteBufUtil.hexDump(frame.content()));
      }

      PongWebSocketFrame pong = new PongWebSocketFrame(frame.content().retain());
      ctx.writeAndFlush(pong);
   }
   else if (frame instanceof PongWebSocketFrame) {
      PingPong pingPongSession = PingPong.get(ctx.channel());
      if (pingPongSession != null) {
         pingPongSession.recordPong();
      }
   }
   else {
      throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
            .getName()));
   }
}
 

/**
 * 处理WebSocket请求
 * 
 * @param ctx
 * @param frame
 */
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
    if (frame instanceof CloseWebSocketFrame) {
        handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
        ctx.close();
        return;
    }
    // 没有使用WebSocketServerProtocolHandler,所以不会接收到PingWebSocketFrame。
    // if (frame instanceof PingWebSocketFrame) {
    // ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
    // return;
    // }
    if (!(frame instanceof TextWebSocketFrame)) {
        throw new UnsupportedOperationException(
                String.format("%s frame types not supported", frame.getClass().getName()));
    }

    String request = ((TextWebSocketFrame) frame).text();
    logger.debug("收到客户端发送的数据:" + request);
    // 回复心跳
    if (request.length() == 0) {
        ctx.writeAndFlush(new TextWebSocketFrame(""));
        return;
    }
    this.handleMessage(ctx.channel(), request);
}
 
源代码10 项目: sctalk   文件: MessageWsServerHandler.java

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

    if (msg instanceof HttpRequest) {
        logger.debug("HttpRequest");
        // 完成 握手
        handleHttpRequest(ctx, (HttpRequest) msg);
    } else if (msg instanceof IMProtoMessage) {
        super.channelRead(ctx, msg);
    } else if (msg instanceof WebSocketFrame) {
        // websocketService.handleFrame(ctx, (WebSocketFrame) msg);
        // 这句应该走不到
        logger.debug("WebSocketFrame");
    } else {
        logger.debug("other:{}", msg);
    }
}
 

public void doOnBinary(Channel channel, WebSocketFrame frame) {
    Attribute<String> attrPath = channel.attr(PATH_KEY);
    PojoMethodMapping methodMapping = null;
    if (pathMethodMappingMap.size() == 1) {
        methodMapping = pathMethodMappingMap.values().iterator().next();
    } else {
        String path = attrPath.get();
        methodMapping = pathMethodMappingMap.get(path);
    }
    if (methodMapping.getOnBinary() != null) {
        BinaryWebSocketFrame binaryWebSocketFrame = (BinaryWebSocketFrame) frame;
        Object implement = channel.attr(POJO_KEY).get();
        try {
            methodMapping.getOnBinary().invoke(implement, methodMapping.getOnBinaryArgs(channel, binaryWebSocketFrame));
        } catch (Throwable t) {
            logger.error(t);
        }
    }
}
 
源代码12 项目: wind-im   文件: WsServerHandler.java

@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
	if (msg instanceof FullHttpRequest) {
		// http 请求握手
		doHttpRequest(ctx, (FullHttpRequest) msg);
	} else if (msg instanceof WebSocketFrame) {
		// websocket 请求
		doWSRequest(ctx, (WebSocketFrame) msg);
	} else {
		// 错误请求,关闭连接
		ctx.close();
	}

}
 
源代码13 项目: openzaly   文件: WsServerHandler.java

@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
	if (msg instanceof FullHttpRequest) {
		// http 请求握手
		doHttpRequest(ctx, (FullHttpRequest) msg);
	} else if (msg instanceof WebSocketFrame) {
		// websocket 请求
		doWSRequest(ctx, (WebSocketFrame) msg);
	} else {
		// 错误请求,关闭连接
		ctx.close();
	}

}
 

@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
    if (frame instanceof TextWebSocketFrame) {
        // Echos the same text
        String text = ((TextWebSocketFrame) frame).text();
        ctx.channel().writeAndFlush(new TextWebSocketFrame(text));
    } else if (frame instanceof BinaryWebSocketFrame) {
        ctx.channel().writeAndFlush(frame.retain());
    } else if (frame instanceof CloseWebSocketFrame) {
        ctx.close();
    } else {
        String message = "unsupported frame type: " + frame.getClass().getName();
        throw new UnsupportedOperationException(message);
    }
}
 

public void broadcastOnSubscriberPath(WebSocketFrame frame, String inboundName, String subscriberPath) {
    List<InboundWebsocketChannelContext> contextList = getSubscriberPathChannelContextList(inboundName,
                                                                                           subscriberPath);
    for (InboundWebsocketChannelContext context : contextList) {
        WebSocketFrame duplicatedFrame = frame.duplicate();
        context.writeToChannel(duplicatedFrame);
    }
}
 

@SuppressWarnings("unchecked")
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof FullHttpRequest) {
        handleHandshake(ctx, (FullHttpRequest) msg);
    } else if (msg instanceof WebSocketFrame) {
        handleWebSocketFrame(ctx, (WebSocketFrame) msg);
    }
}
 

@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
    Channel ch = ctx.channel();
    if (!handshaker.isHandshakeComplete()) {
        try {
            handshaker.finishHandshake(ch, (FullHttpResponse) msg);
            System.out.println("WebSocket Client connected!");
            handshakeFuture.setSuccess();
        } catch (WebSocketHandshakeException e) {
            System.out.println("WebSocket Client failed to connect");
            handshakeFuture.setFailure(e);
        }
        return;
    }

    if (msg instanceof FullHttpResponse) {
        FullHttpResponse response = (FullHttpResponse) msg;
        throw new IllegalStateException(
                "Unexpected FullHttpResponse (getStatus=" + response.status() +
                        ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
    }

    WebSocketFrame frame = (WebSocketFrame) msg;
    if (frame instanceof TextWebSocketFrame) {
        TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
        System.out.println("WebSocket Client received message: " + textFrame.text());
    } else if (frame instanceof PongWebSocketFrame) {
        System.out.println("WebSocket Client received pong");
    } else if (frame instanceof CloseWebSocketFrame) {
        System.out.println("WebSocket Client received closing");
        ch.close();
    }
}
 

public void handleClientWebsocketChannelTermination(WebSocketFrame frame) throws AxisFault {

        handshaker.close(wrappedContext.getChannelHandlerContext().channel(), (CloseWebSocketFrame) frame.retain());
        String endpointName = WebsocketEndpointManager.getInstance().getEndpointName(port, tenantDomain);
        WebsocketSubscriberPathManager.getInstance()
                .removeChannelContext(endpointName, subscriberPath.getPath(), wrappedContext);

    }
 

@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
    Channel ch = ctx.channel();
    moniter.updateTime();
    if (!handshaker.isHandshakeComplete()) {
        handshaker.finishHandshake(ch, (FullHttpResponse) msg);
        System.out.println("WebSocket Client connected!");
        handshakeFuture.setSuccess();
        return;
    }

    if (msg instanceof FullHttpResponse) {
        FullHttpResponse response = (FullHttpResponse) msg;
        throw new IllegalStateException(
                "Unexpected FullHttpResponse (getStatus=" + response.getStatus() +
                        ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
    }

    WebSocketFrame frame = (WebSocketFrame) msg;
    if (frame instanceof TextWebSocketFrame) {
    	 TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
    	 service.onReceive(textFrame.text());
    } else if (frame instanceof BinaryWebSocketFrame) {
    	BinaryWebSocketFrame binaryFrame=(BinaryWebSocketFrame)frame;
    	service.onReceive(decodeByteBuff(binaryFrame.content()));
    }else if (frame instanceof PongWebSocketFrame) {
        System.out.println("WebSocket Client received pong");
    } else if (frame instanceof CloseWebSocketFrame) {
        System.out.println("WebSocket Client received closing");
        ch.close();
    }
}
 
源代码20 项目: netstrap   文件: MyWebSocketFilter.java

@Override
public boolean filter(Channel channel, WebSocketContext context, WebSocketFrame frame) {
    //系统默认处理了CloseFrame和PingFrame,文本消息基于文本协议,此处只需要处理二进制消息
    //所有的websocket消息将经过该过滤器处理,Filterable可以指定排序值
    channel.writeAndFlush(new TextWebSocketFrame("hello world"));
    //此处返回true则继续执行,返回False则不会继续执行后面的逻辑
    return false;
}
 
源代码21 项目: qonduit   文件: WebSocketIT.java

@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
    LOG.info("Received msg: {}", msg);
    if (!this.handshaker.isHandshakeComplete()) {
        this.handshaker.finishHandshake(ctx.channel(), (FullHttpResponse) msg);
        LOG.info("Client connected.");
        this.connected = true;
        this.handshakeFuture.setSuccess();
        return;
    }
    if (msg instanceof FullHttpResponse) {
        throw new IllegalStateException("Unexpected response: " + msg.toString());
    }
    WebSocketFrame frame = (WebSocketFrame) msg;
    if (frame instanceof TextWebSocketFrame) {
        synchronized (responses) {
            responses.add(((TextWebSocketFrame) frame).text().getBytes(StandardCharsets.UTF_8));
        }
    } else if (frame instanceof BinaryWebSocketFrame) {
        ByteBuf buf = frame.content();
        byte[] b = new byte[buf.readableBytes()];
        buf.readBytes(b);
        synchronized (responses) {
            responses.add(b);
        }
    } else if (frame instanceof PingWebSocketFrame) {
        LOG.info("Returning pong message");
        ctx.writeAndFlush(new PongWebSocketFrame());
    } else if (frame instanceof CloseWebSocketFrame) {
        LOG.info("Received message from server to close the channel.");
        ctx.close();
    } else {
        LOG.warn("Unhandled frame type received: " + frame.getClass());
    }
}
 

@Override
public Mono<Void> send(Publisher<WebSocketMessage> messages) {
	Flux<WebSocketFrame> frames = Flux.from(messages)
			.doOnNext(message -> {
				if (logger.isTraceEnabled()) {
					logger.trace(getLogPrefix() + "Sending " + message);
				}
			})
			.map(this::toFrame);
	return getDelegate().getOutbound()
			.options(NettyPipeline.SendOptions::flushOnEach)
			.sendObject(frames)
			.then();
}
 
源代码23 项目: reactor-netty   文件: WebsocketTest.java

@Test
public void testIssue663_3() {
	FluxIdentityProcessor<WebSocketFrame> incomingData = Processors.more().multicastNoBackpressure();

	httpServer =
			HttpServer.create()
			          .port(0)
			          .handle((req, resp) -> resp.sendWebsocket((i, o) -> i.receiveFrames().then()))
			          .wiretap(true)
			          .bindNow();

	HttpClient.create()
	          .port(httpServer.port())
	          .wiretap(true)
	          .websocket()
	          .uri("/")
	          .handle((in, out) ->
	              out.sendObject(Flux.just(new PingWebSocketFrame(), new CloseWebSocketFrame())
	                                 .delayElements(Duration.ofMillis(100)))
	                 .then(in.receiveFrames()
	                         .subscribeWith(incomingData)
	                         .then()))
	          .subscribe();

	StepVerifier.create(incomingData)
	            .expectNext(new PongWebSocketFrame())
	            .expectComplete()
	            .verify(Duration.ofSeconds(30));
}
 

@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
    Channel ch = ctx.channel();
    if (!handshaker.isHandshakeComplete()) {
        try {
            handshaker.finishHandshake(ch, (FullHttpResponse) msg);
            running = true;
            LOG.trace("WebSocket Client connected!");
            handshakeFuture.setSuccess();
        } catch (WebSocketHandshakeException e) {
            LOG.trace("WebSocket Client failed to connect");
            running = false;
            handshakeFuture.setFailure(e);
        }
        return;
    }

    if (msg instanceof FullHttpResponse) {
        FullHttpResponse response = (FullHttpResponse) msg;
        throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content="
                + response.content().toString(CharsetUtil.UTF_8) + ')');
    }

    WebSocketFrame frame = (WebSocketFrame) msg;
    if (frame instanceof TextWebSocketFrame) {
        TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
        LOG.trace("WebSocket Client received message: " + textFrame.text());
        List results = this.gson.fromJson(textFrame.text(), List.class);
        this.subscriptions.getOrDefault(results.get(0), this.defaultSubscriptionMessageHandler).handle(textFrame.text());
        
    } else if (frame instanceof CloseWebSocketFrame) {
        LOG.trace("WebSocket Client received closing");
        running = false;
        ch.close();
    }

}
 
源代码25 项目: quarkus-http   文件: FrameHandler.java

private void onText(WebSocketFrame frame, String text) throws IOException {
    if (session.isSessionClosed()) {
        //to bad, the channel has already been closed
        //we just ignore messages that are received after we have closed, as the endpoint is no longer in a valid state to deal with them
        //this this should only happen if a message was on the wire when we called close()
        session.close();
        return;
    }

    if (!frame.isFinalFragment()) {
        expectedContinuation = FrameType.TEXT;
    } else {
        expectedContinuation = null;
    }
    final HandlerWrapper handler = getHandler(FrameType.TEXT);
    if (handler != null &&
        (handler.isPartialHandler() || (stringBuffer == null && frame.isFinalFragment()))) {
        invokeTextHandler(text, handler, frame.isFinalFragment());
    } else if (handler != null) {
        if (stringBuffer == null) {
            stringBuffer = new StringBuilder();
        }
        stringBuffer.append(text);
        if (frame.isFinalFragment()) {
            invokeTextHandler(stringBuffer.toString(), handler, frame.isFinalFragment());
            stringBuffer = null;
        }
    }
}
 
源代码26 项目: reactor-netty   文件: WebsocketTest.java

private void doTestServerMaxFramePayloadLength(int maxFramePayloadLength, Flux<String> input, Flux<String> expectation, int count) {
	httpServer =
			HttpServer.create()
			          .port(0)
			          .handle((req, res) -> res.sendWebsocket((in, out) ->
			              out.sendObject(in.aggregateFrames()
			                               .receiveFrames()
			                               .map(WebSocketFrame::content)
			                               .map(byteBuf ->
			                                   byteBuf.readCharSequence(byteBuf.readableBytes(), Charset.defaultCharset()).toString())
			                               .map(TextWebSocketFrame::new)),
			              WebsocketServerSpec.builder().maxFramePayloadLength(maxFramePayloadLength).build()))
			          .wiretap(true)
			          .bindNow();

	FluxIdentityProcessor<String> output = Processors.replayAll();
	HttpClient.create()
	          .port(httpServer.port())
	          .websocket()
	          .uri("/")
	          .handle((in, out) -> out.sendString(input)
	                                  .then(in.aggregateFrames()
	                                          .receiveFrames()
	                                          .map(WebSocketFrame::content)
	                                          .map(byteBuf ->
	                                              byteBuf.readCharSequence(byteBuf.readableBytes(), Charset.defaultCharset()).toString())
	                                          .take(count)
	                                          .subscribeWith(output)
	                                          .then()))
	          .blockLast(Duration.ofSeconds(30));

	assertThat(output.collectList().block(Duration.ofSeconds(30)))
			.isEqualTo(expectation.collectList().block(Duration.ofSeconds(30)));
}
 

private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {

        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        } else if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        } else if (!(frame instanceof TextWebSocketFrame)) {
            throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
        }

        exec(ctx, frame);
    }
 

@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof FullHttpRequest) {
        handleHttpRequest(ctx, (FullHttpRequest) msg);
    } else if (msg instanceof WebSocketFrame) {
        handleWebSocketFrame(ctx, (WebSocketFrame) msg);
    }
}
 
源代码29 项目: reactor-netty   文件: WebsocketTest.java

@Test
public void testIssue663_2() {
	FluxIdentityProcessor<WebSocketFrame> incomingData = Processors.more().multicastNoBackpressure();

	httpServer =
			HttpServer.create()
			          .port(0)
			          .handle((req, resp) ->
			              resp.sendWebsocket((i, o) ->
			                  o.sendObject(Flux.just(new PingWebSocketFrame(), new CloseWebSocketFrame())
			                   .delayElements(Duration.ofMillis(100)))
			                   .then(i.receiveFrames()
			                          .subscribeWith(incomingData)
			                          .then())))
			          .wiretap(true)
			          .bindNow();

	HttpClient.create()
	          .port(httpServer.port())
	          .wiretap(true)
	          .websocket(WebsocketClientSpec.builder().handlePing(true).build())
	          .uri("/")
	          .handle((in, out) -> in.receiveFrames())
	          .subscribe();

	StepVerifier.create(incomingData)
	            .expectComplete()
	            .verify(Duration.ofSeconds(30));
}
 
源代码30 项目: litchi   文件: RequestPacket.java

public static RequestPacket valueOfHandler(WebSocketFrame frame, long uid) {
        RequestPacket request = new RequestPacket();

        //uid
        request.uid = uid;

        //messageId
        ByteBuf message = frame.content();
        request.messageId = message.readShort();

        //route
        byte[] routeBytes = new byte[message.readByte()];
        message.readBytes(routeBytes);
        request.route = new String(routeBytes);

        //data
        byte[] data = new byte[message.readShort()];
        message.readBytes(data);
        request.args = new Object[1];
        request.args[0] = data;

//			long crc = message.readLong();
//			byte[] array = message.array();
//			//数据包正确性验证
//			if (crc != CRCUtils.calculateCRC(Parameters.CRC32, array, 0, array.length - 8)) {
//				LOGGER.error("request packet crc error. crc={} array={}", crc, Arrays.toString(array));
//				return;
//			}

        return request;
    }
 
 同包方法