下面列出了怎么用 io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame 的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Print {@link WebSocketFrame} information.
*
* @param log {@link Log} object of the relevant class
* @param frame {@link WebSocketFrame} frame
* @param channelContextId {@link ChannelHandlerContext} context id as a String
* @param customMsg Log message which needs to be appended to the frame information,
* if it is not required provide null
* @param isInbound true if the frame is inbound, false if it is outbound
*/
private static void printWebSocketFrame(
Log log, WebSocketFrame frame, String channelContextId,
String customMsg, boolean isInbound) {
String logStatement = getDirectionString(isInbound) + channelContextId;
if (frame instanceof PingWebSocketFrame) {
logStatement += " Ping frame";
} else if (frame instanceof PongWebSocketFrame) {
logStatement += " Pong frame";
} else if (frame instanceof CloseWebSocketFrame) {
logStatement += " Close frame";
} else if (frame instanceof BinaryWebSocketFrame) {
logStatement += " Binary frame";
} else if (frame instanceof TextWebSocketFrame) {
logStatement += " " + ((TextWebSocketFrame) frame).text();
}
//specifically for logging close websocket frames with error status
if (customMsg != null) {
logStatement += " " + customMsg;
}
log.debug(logStatement);
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (frame instanceof TextWebSocketFrame) {
if (webSocketEvent != null) {
webSocketEvent.onMessageStringEvent(baseServer, new WebSocketSession(ctx.channel()), ((TextWebSocketFrame) frame).text());
}
return;
}
if (frame instanceof BinaryWebSocketFrame) {
if (webSocketEvent != null) {
webSocketEvent.onMessageBinaryEvent(baseServer, new WebSocketSession(ctx.channel()), ((BinaryWebSocketFrame)frame).content());
}
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// Check for closing frame
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (frame instanceof TextWebSocketFrame) {
// Echo the frame
ctx.write(frame.retain());
return;
}
if (frame instanceof BinaryWebSocketFrame) {
// Echo the frame
ctx.write(frame.retain());
return;
}
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
LOG.trace("NettyServerHandler: Channel write: {}", msg);
if (isWebSocketServer() && msg instanceof ByteBuf) {
if (isFragmentWrites()) {
ByteBuf orig = (ByteBuf) msg;
int origIndex = orig.readerIndex();
int split = orig.readableBytes()/2;
ByteBuf part1 = orig.copy(origIndex, split);
LOG.trace("NettyServerHandler: Part1: {}", part1);
orig.readerIndex(origIndex + split);
LOG.trace("NettyServerHandler: Part2: {}", orig);
BinaryWebSocketFrame frame1 = new BinaryWebSocketFrame(false, 0, part1);
ctx.writeAndFlush(frame1);
ContinuationWebSocketFrame frame2 = new ContinuationWebSocketFrame(true, 0, orig);
ctx.write(frame2, promise);
} else {
BinaryWebSocketFrame frame = new BinaryWebSocketFrame((ByteBuf) msg);
ctx.write(frame, promise);
}
} else {
ctx.write(msg, promise);
}
}
void sendMetrics(@Nullable JsonObject metrics) {
ChannelHandlerContext c = ctx;
if (metrics == null || !connected) {
return;
}
try {
String spayload = JSON.toJson(metrics);
byte[] payload = spayload.getBytes(StandardCharsets.UTF_8);
ByteBuf buffer = c.alloc().ioBuffer();
OutputStream out = new ByteBufOutputStream(buffer);
hubSerializer.serialize(HubMessage.createMetrics(payload), out);
IOUtils.closeQuietly(out);
BinaryWebSocketFrame frame = new BinaryWebSocketFrame(buffer);
c.writeAndFlush(frame);
lastHubMsg = System.nanoTime();
} catch (IOException ex) {
log.warn("metrics serialization failed, dropping message", ex);
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (logger.isLoggable(Level.FINE)) {
logger.fine(String.format(
"Channel %s received %s", ctx.channel().hashCode(), StringUtil.simpleClassName(frame)));
}
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
} else if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.content()), ctx.voidPromise());
} else if (frame instanceof TextWebSocketFrame) {
ctx.write(frame, ctx.voidPromise());
} else if (frame instanceof BinaryWebSocketFrame) {
ctx.write(frame, ctx.voidPromise());
} else if (frame instanceof ContinuationWebSocketFrame) {
ctx.write(frame, ctx.voidPromise());
} else if (frame instanceof PongWebSocketFrame) {
frame.release();
// Ignore
} else {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
.getName()));
}
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("In webapp socket encoder {}", msg);
if (msg instanceof MessageBase) {
MessageBase message = (MessageBase) msg;
ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
out.writeByte(message.command);
out.writeShort(message.id);
if (message instanceof ResponseMessage) {
out.writeInt(((ResponseMessage) message).code);
} else {
byte[] body = message.getBytes();
if (body.length > 0) {
out.writeBytes(body);
}
}
super.write(ctx, new BinaryWebSocketFrame(out), promise);
} else {
super.write(ctx, msg, promise);
}
}
@Override
protected void encode(ChannelHandlerContext ctx, Frame frame, List<Object> out) throws Exception {
ByteBuf in = ByteBufAllocator.DEFAULT.buffer();
// check the frame
if (!checkFrame(frame)) {
throw new RuntimeException("checkFrame failed!");
}
// get frame content bytes
byte[] content = serializer.serialize(frame);
// do encode
in.writeByte(frame.getMagic());
in.writeByte(frame.getType());
in.writeInt(content.length);
in.writeBytes(content);
out.add(new BinaryWebSocketFrame(in));
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
// Pass on to the rest of the channel
ctx.fireChannelRead(frame);
} else if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.content()));
} else if (frame instanceof ContinuationWebSocketFrame) {
ctx.write(frame);
} else if (frame instanceof PongWebSocketFrame) {
frame.release();
} else if (frame instanceof BinaryWebSocketFrame || frame instanceof TextWebSocketFrame) {
// Allow the rest of the pipeline to deal with this.
ctx.fireChannelRead(frame);
} else {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
.getName()));
}
}
public void doOnBinary(Channel channel, WebSocketFrame frame) {
Attribute<String> attrPath = channel.attr(PATH_KEY);
PojoMethodMapping methodMapping = null;
if (pathMethodMappingMap.size() == 1) {
methodMapping = pathMethodMappingMap.values().iterator().next();
} else {
String path = attrPath.get();
methodMapping = pathMethodMappingMap.get(path);
}
if (methodMapping.getOnBinary() != null) {
BinaryWebSocketFrame binaryWebSocketFrame = (BinaryWebSocketFrame) frame;
Object implement = channel.attr(POJO_KEY).get();
try {
methodMapping.getOnBinary().invoke(implement, methodMapping.getOnBinaryArgs(channel, binaryWebSocketFrame));
} catch (Throwable t) {
logger.error(t);
}
}
}
private RequestData<String> parse(WebSocketFrame msg) throws IOException {
if (msg instanceof TextWebSocketFrame) {
String text = ((TextWebSocketFrame) msg).text();
return encryption.decrypt(text);
} else if (msg instanceof BinaryWebSocketFrame) {
ByteBuf content = msg.content();
byte[] data = new byte[content.readableBytes()];
content.readBytes(data);
return encryption.decrypt(new String(data, Charsets.UTF_8));
} else {
return null;
}
}
@Override
protected void encode(ChannelHandlerContext ctx, Packet packet, List out) throws Exception {
ByteBuf buf = ctx.alloc().buffer().order(ByteOrder.LITTLE_ENDIAN);
int packetId = PacketRegistry.CLIENTBOUND.getPacketId(packet.getClass());
if (packetId == -1) {
throw new IllegalArgumentException("Provided packet is not registered as a clientbound packet!");
}
buf.writeByte(packetId);
packet.writeData(buf);
out.add(new BinaryWebSocketFrame(buf));
OgarServer.log.finest("Sent packet ID " + packetId + " (" + packet.getClass().getSimpleName() + ") to " + ctx.channel().remoteAddress());
}
@Override
protected void decode(ChannelHandlerContext chc, BinaryWebSocketFrame frame, List<Object> out)
throws Exception {
//convert the frame to a ByteBuf
ByteBuf bb = frame.content();
//System.out.println("WebSocketFrameToByteBufDecoder decode - " + ByteBufUtil.hexDump(bb));
bb.retain();
out.add(bb);
}
@Override
public Future<Void> sendBinary(final ByteBuffer data) {
if (data == null) {
throw JsrWebSocketMessages.MESSAGES.messageInNull();
}
return undertowSession.getChannel().writeAndFlush(new BinaryWebSocketFrame(Unpooled.copiedBuffer(data)));
}
@Override
public void sendBinary(final ByteBuffer data, final SendHandler completion) {
if (completion == null) {
throw JsrWebSocketMessages.MESSAGES.handlerIsNull();
}
if (data == null) {
throw JsrWebSocketMessages.MESSAGES.messageInNull();
}
undertowSession.getChannel().writeAndFlush(new BinaryWebSocketFrame(Unpooled.copiedBuffer(data))).addListener(new SendHandlerAdapter(completion));
}
@Override
public void sendBinary(final ByteBuffer data) throws IOException {
if (data == null) {
throw JsrWebSocketMessages.MESSAGES.messageInNull();
}
assertNotInFragment();
try {
undertowSession.getChannel().writeAndFlush(new BinaryWebSocketFrame(Unpooled.copiedBuffer(data))).get();
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
}
data.clear(); //for some reason the TCK expects this, might as well just match the RI behaviour
}
@Test
public void testEncodingAndDecodingBinary() throws Exception {
final byte[] payload = "hello".getBytes();
final CompletableFuture latch = new CompletableFuture();
WebSocketTestClient client = new WebSocketTestClient(new URI("ws://" + DefaultServer.getHostAddress("default") + ":" + DefaultServer.getHostPort("default") + "/ws/encoding/Stuart"));
client.connect();
client.send(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(payload)), new FrameChecker(TextWebSocketFrame.class, "hello Stuart".getBytes(), latch));
latch.get();
client.destroy();
}
@org.junit.Test
public void testBinaryWithByteBuffer() throws Exception {
final byte[] payload = "payload".getBytes();
final AtomicReference<Throwable> cause = new AtomicReference<>();
final AtomicBoolean connected = new AtomicBoolean(false);
final CompletableFuture latch = new CompletableFuture<>();
class TestEndPoint extends Endpoint {
@Override
public void onOpen(final Session session, EndpointConfig config) {
connected.set(true);
session.addMessageHandler(new MessageHandler.Whole<ByteBuffer>() {
@Override
public void onMessage(ByteBuffer message) {
ByteBuffer buf = ByteBuffer.allocate(message.remaining());
buf.put(message);
buf.flip();
session.getAsyncRemote().sendBinary(buf);
}
});
}
}
ServerWebSocketContainer builder = new ServerWebSocketContainer(TestClassIntrospector.INSTANCE, DefaultServer.getEventLoopSupplier(), Collections.EMPTY_LIST, false, false);
builder.addEndpoint(ServerEndpointConfig.Builder.create(TestEndPoint.class, "/").configurator(new InstanceConfigurator(new TestEndPoint())).build());
deployServlet(builder);
WebSocketTestClient client = new WebSocketTestClient(new URI("ws://" + NetworkUtils.formatPossibleIpv6Address(DefaultServer.getHostAddress("default")) + ":" + DefaultServer.getHostPort("default") + "/"));
client.connect();
client.send(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(payload)), new FrameChecker(BinaryWebSocketFrame.class, payload, latch));
latch.get();
Assert.assertNull(cause.get());
client.destroy();
}
@org.junit.Test
public void testBinaryWithByteArray() throws Exception {
final byte[] payload = "payload".getBytes();
final AtomicReference<Throwable> cause = new AtomicReference<>();
final AtomicBoolean connected = new AtomicBoolean(false);
final CompletableFuture<?> latch = new CompletableFuture<>();
class TestEndPoint extends Endpoint {
@Override
public void onOpen(final Session session, EndpointConfig config) {
connected.set(true);
session.addMessageHandler(new MessageHandler.Whole<byte[]>() {
@Override
public void onMessage(byte[] message) {
session.getAsyncRemote().sendBinary(ByteBuffer.wrap(message.clone()));
}
});
}
}
ServerWebSocketContainer builder = new ServerWebSocketContainer(TestClassIntrospector.INSTANCE, DefaultServer.getEventLoopSupplier(), Collections.EMPTY_LIST, false, false);
builder.addEndpoint(ServerEndpointConfig.Builder.create(TestEndPoint.class, "/").configurator(new InstanceConfigurator(new TestEndPoint())).build());
deployServlet(builder);
WebSocketTestClient client = new WebSocketTestClient(new URI("ws://" + DefaultServer.getHostAddress("default") + ":" + DefaultServer.getHostPort("default") + "/"));
client.connect();
client.send(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(payload)), new FrameChecker(BinaryWebSocketFrame.class, payload, latch));
latch.get();
Assert.assertNull(cause.get());
client.destroy();
}
@org.junit.Test
public void testBinaryWithByteBufferByFuture() throws Exception {
final byte[] payload = "payload".getBytes();
final AtomicReference<Future<Void>> sendResult = new AtomicReference<>();
final AtomicBoolean connected = new AtomicBoolean(false);
final CompletableFuture<?> latch = new CompletableFuture<>();
class TestEndPoint extends Endpoint {
@Override
public void onOpen(final Session session, EndpointConfig config) {
connected.set(true);
session.addMessageHandler(new MessageHandler.Whole<ByteBuffer>() {
@Override
public void onMessage(ByteBuffer message) {
ByteBuffer buf = ByteBuffer.allocate(message.remaining());
buf.put(message);
buf.flip();
sendResult.set(session.getAsyncRemote().sendBinary(buf));
}
});
}
}
ServerWebSocketContainer builder = new ServerWebSocketContainer(TestClassIntrospector.INSTANCE, DefaultServer.getEventLoopSupplier(), Collections.EMPTY_LIST, false, false);
builder.addEndpoint(ServerEndpointConfig.Builder.create(TestEndPoint.class, "/").configurator(new InstanceConfigurator(new TestEndPoint())).build());
deployServlet(builder);
WebSocketTestClient client = new WebSocketTestClient(new URI("ws://" + DefaultServer.getHostAddress("default") + ":" + DefaultServer.getHostPort("default") + "/"));
client.connect();
client.send(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(payload)), new FrameChecker(BinaryWebSocketFrame.class, payload, latch));
latch.get();
Future<Void> result = sendResult.get();
client.destroy();
}
@org.junit.Test
public void testBinaryWithByteBufferAsync() throws Exception {
final byte[] payload = "payload".getBytes();
final AtomicReference<Throwable> cause = new AtomicReference<>();
final AtomicBoolean connected = new AtomicBoolean(false);
final CompletableFuture<?> latch = new CompletableFuture<>();
class TestEndPoint extends Endpoint {
@Override
public void onOpen(final Session session, EndpointConfig config) {
connected.set(true);
session.addMessageHandler(new MessageHandler.Partial<ByteBuffer>() {
@Override
public void onMessage(ByteBuffer message, boolean last) {
Assert.assertTrue(last);
ByteBuffer buf = ByteBuffer.allocate(message.remaining());
buf.put(message);
buf.flip();
session.getAsyncRemote().sendBinary(buf);
}
});
}
}
ServerWebSocketContainer builder = new ServerWebSocketContainer(TestClassIntrospector.INSTANCE, DefaultServer.getEventLoopSupplier(), Collections.EMPTY_LIST, false, false);
builder.addEndpoint(ServerEndpointConfig.Builder.create(TestEndPoint.class, "/").configurator(new InstanceConfigurator(new TestEndPoint())).build());
deployServlet(builder);
WebSocketTestClient client = new WebSocketTestClient(new URI("ws://" + DefaultServer.getHostAddress("default") + ":" + DefaultServer.getHostPort("default") + "/"));
client.connect();
client.send(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(payload)), new FrameChecker(BinaryWebSocketFrame.class, payload, latch));
latch.get();
Assert.assertNull(cause.get());
client.destroy();
}
@Override
protected void encode(ChannelHandlerContext ctx, Packet<ServerPacketHandler> msg, List<Object> out) throws Exception {
ByteBuf buf = ctx.alloc().buffer();
buf.writeByte(Packets.getServerPacketId(msg));
msg.write(new ByteBufPacketStream(buf));
out.add(new BinaryWebSocketFrame(buf));
}
@Override
protected void decode(ChannelHandlerContext chc, BinaryWebSocketFrame frame, List<Object> out)
throws Exception {
// convert the frame to a ByteBuf
ByteBuf bb = frame.content();
bb.retain();
out.add(bb);
}
@Override
protected void encode(ChannelHandlerContext chc, ByteBuf bb, List<Object> out) throws Exception {
// convert the ByteBuf to a WebSocketFrame
BinaryWebSocketFrame result = new BinaryWebSocketFrame();
result.content().writeBytes(bb);
out.add(result);
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
System.out.println("TextWebSocketFrame:" + textFrame.text());
} else if (frame instanceof BinaryWebSocketFrame) {
BinaryWebSocketFrame binFrame = (BinaryWebSocketFrame) frame;
System.out.println("BinaryWebSocketFrame:" + binFrame.toString());
} else if (frame instanceof PongWebSocketFrame) {
System.out.println("WebSocket Client received pong");
} else if (frame instanceof CloseWebSocketFrame) {
System.out.println("receive close frame");
ctx.channel().close();
}
}
/**
* 将webSocket消息转换为bytebuf类型,以适配后面的解码器
*/
@Override
protected void decode(ChannelHandlerContext paramChannelHandlerContext,
WebSocketFrame paramINBOUND_IN, List<Object> paramList)
throws Exception {
if(paramINBOUND_IN instanceof BinaryWebSocketFrame)
{
BinaryWebSocketFrame msg=(BinaryWebSocketFrame)paramINBOUND_IN;
ByteBuf data = msg.content();
paramList.add(data);
data.retain();
}
}
@Test
public void test_unwrap_byte_buffer() throws Exception {
final ByteBuf expected = Unpooled.buffer();
final BinaryWebSocketFrame frame = new BinaryWebSocketFrame(expected);
channel.writeInbound(frame);
final Object object = channel.readInbound();
final ByteBuf result = (ByteBuf) object;
assertEquals(expected, result);
assertEquals(1, result.refCnt());
}
public boolean send(PlatformMessage msg, boolean checkAuth) {
Address address = msg.getDestination();
if (address.isHubAddress() && !address.isBroadcast()) {
return true;
}
ChannelHandlerContext c = ctx;
if (c == null || c.channel() == null || (checkAuth && !authorized.get())) {
return false;
}
try {
ByteBuf buffer = ctx.alloc().ioBuffer();
byte[] payload = platformSerializer.serialize(msg);
ByteBufOutputStream out = new ByteBufOutputStream(buffer);
hubSerializer.serialize(HubMessage.createPlatform(payload), out);
IOUtils.closeQuietly(out);
BinaryWebSocketFrame frame = new BinaryWebSocketFrame(buffer);
c.writeAndFlush(frame);
lastHubMsg = System.nanoTime();
return true;
} catch (IOException ex) {
log.warn("gateway serialization failed, dropping message: {}", msg);
return true;
}
}
void sendLogs(BlockingQueue<JsonObject> logs) {
ChannelHandlerContext c = ctx;
if (c == null || logs.isEmpty() || !connected) {
return;
}
JsonArray lgs = new JsonArray();
for (int i = 0; i < 1024; ++i) {
JsonObject next = logs.poll();
if (next == null) {
break;
}
lgs.add(next);
}
try {
String spayload = JSON.toJson(lgs);
byte[] payload = spayload.getBytes(StandardCharsets.UTF_8);
ByteBuf buffer = c.alloc().ioBuffer();
ByteBufOutputStream out = new ByteBufOutputStream(buffer);
hubSerializer.serialize(HubMessage.createLog(payload), out);
IOUtils.closeQuietly(out);
BinaryWebSocketFrame frame = new BinaryWebSocketFrame(buffer);
c.writeAndFlush(frame);
lastHubMsg = System.nanoTime();
} catch (IOException ex) {
log.warn("log serialization failed, dropping message", ex);
}
}
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
moniter.updateTime();
if (!handshaker.isHandshakeComplete()) {
handshaker.finishHandshake(ch, (FullHttpResponse) msg);
System.out.println("WebSocket Client connected!");
handshakeFuture.setSuccess();
return;
}
if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
throw new IllegalStateException(
"Unexpected FullHttpResponse (getStatus=" + response.getStatus() +
", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
}
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
service.onReceive(textFrame.text());
} else if (frame instanceof BinaryWebSocketFrame) {
BinaryWebSocketFrame binaryFrame=(BinaryWebSocketFrame)frame;
service.onReceive(decodeByteBuff(binaryFrame.content()));
}else if (frame instanceof PongWebSocketFrame) {
System.out.println("WebSocket Client received pong");
} else if (frame instanceof CloseWebSocketFrame) {
System.out.println("WebSocket Client received closing");
ch.close();
}
}