下面列出了怎么用io.netty.channel.socket.DatagramPacket的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
protected void decode(ChannelHandlerContext ctx, DatagramPacket msg, List<Object> out) throws Exception {
//Create our response packet from the datagram we received
final MasterServerResponsePacket packet = builder.construct(msg.content());
if (packet != null) {
final MasterServerResponse response = new MasterServerResponse();
if (response != null) {
response.setSender(msg.sender());
response.setRecipient(msg.recipient());
response.setResponsePacket(packet);
log.debug("Receiving Data '{}' from '{}' using Channel Id: {}", response.getClass().getSimpleName(), ctx.channel().remoteAddress(), ctx.channel().id());
//Pass the message back to the messenger
responseCallback.accept(response, null);
return;
}
}
throw new IllegalStateException("No response packet found for the incoming datagram");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
byte[] buff = new byte[packet.content().readableBytes()];
packet.content().readBytes(buff);
pool.execute(() -> {
Map<String, ?> map = BencodingUtils.decode(buff);
if (map == null || map.get("y") == null)
return;
String y = new String((byte[]) map.get("y"));
if ("q".equals(y)) { //请求 Queries
onQuery(map, packet.sender());
} else if ("r".equals(y)) { //回复 Responses
onResponse(map, packet.sender());
}
});
}
public UDPMessage deserialize(byte[] buffer) throws IOException {
ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
ObjectInputStream ois = new ObjectInputStream(bais);
int version = ois.readInt();
if (version == UDPConstants.UDP_MESSAGE_VERSION) {
int type = ois.readInt();
long received = ois.readLong();
String address = ois.readUTF();
int port = ois.readInt();
InetSocketAddress sender = new InetSocketAddress(address, port);
address = ois.readUTF();
port = ois.readInt();
InetSocketAddress receiver = new InetSocketAddress(address, port);
int dataLen = ois.readInt();
byte[] data = new byte[dataLen];
ois.readFully(data);
ois.close();
ByteBuf byteBuf = Unpooled.wrappedBuffer(data);
DatagramPacket datagram = new DatagramPacket(byteBuf, receiver, sender);
return new UDPMessage(type, received, datagram);
} else {
throw new IOException(Utils.format("Unsupported version '{}'", version));
}
}
private void handleResponse(@Nullable ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
EmbeddedChannel http = new EmbeddedChannel(new HttpResponseDecoder());
try {
http.writeInbound(Unpooled.unreleasableBuffer(packet.content()));
http.finish();
while (true) {
Object result = http.readInbound();
if (result == null) {
break;
}
if (result instanceof HttpResponse) {
HttpResponse res = (HttpResponse)result;
switch (res.getStatus().code()) {
case 200: handleUpnpMsearchResponse(packet, res); break;
default: log.debug("unknown upnp response: {}", res.getStatus().code()); break;
}
}
}
} finally {
http.finishAndReleaseAll();
}
}
private void handleUpnpMsearchResponse(DatagramPacket packet, HttpResponse response) {
HttpHeaders headers = response.headers();
if (!parseUsnHeader(headers)) {
log.trace("dropping upnp m-search response with bad usn");
return;
}
String url = headers.get("LOCATION");
long maxAge = parseCacheControlHeader(headers);
if (log.isTraceEnabled()) {
log.trace("upnp msearch response: cache={}s, sender={}, uuid={}, class={}, namespace={}, type={}, version={}\n{}",
TimeUnit.SECONDS.convert(maxAge,TimeUnit.NANOSECONDS),
packet.sender(),
usn.deviceUuid,
usn.clazz,
usn.namespace,
usn.type,
usn.version, response);
}
IrisUpnpService.poke(packet.sender(), usn, maxAge, url, headers);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket p) throws Exception {
UdpChannel channel = userChannels.compute(p.sender(), (lAddr, lChannel) -> {
return ((lChannel == null) || !lChannel.isOpen()) ? new UdpChannel(UdpServerChannel.this, lAddr) : lChannel;
});
channel.buffers.add(p.content().retain());
if (channel.getIsNew()) {
ChannelPipeline serverPipeline = UdpServerChannel.this.pipeline();
serverPipeline.fireChannelRead(channel);
serverPipeline.fireChannelReadComplete();
} else {
if (channel.isRegistered()) {
channel.read();
}
}
}
private static DnsResponse newResponse(DatagramPacket packet, ByteBuf buf) {
final int id = buf.readUnsignedShort();
final int flags = buf.readUnsignedShort();
if (flags >> 15 == 0) {
throw new CorruptedFrameException("not a response");
}
final DnsResponse response = new DatagramDnsResponse(
packet.sender(),
packet.recipient(),
id,
DnsOpCode.valueOf((byte) (flags >> 11 & 0xf)), DnsResponseCode.valueOf((byte) (flags & 0xf)));
response.setRecursionDesired((flags >> 8 & 1) == 1);
response.setAuthoritativeAnswer((flags >> 10 & 1) == 1);
response.setTruncated((flags >> 9 & 1) == 1);
response.setRecursionAvailable((flags >> 7 & 1) == 1);
response.setZ(flags >> 4 & 0x7);
return response;
}
@Override
protected void encode(
ChannelHandlerContext ctx,
AddressedEnvelope<DnsQuery, InetSocketAddress> in, List<Object> out) throws Exception {
final InetSocketAddress recipient = in.recipient();
final DnsQuery query = in.content();
final ByteBuf buf = allocateBuffer(ctx, in);
boolean success = false;
try {
encodeHeader(query, buf);
encodeQuestions(query, buf);
encodeRecords(query, DnsSection.ADDITIONAL, buf);
success = true;
} finally {
if (!success) {
buf.release();
}
}
out.add(new DatagramPacket(buf, recipient, null));
}
@Test
public void testEncode() throws Exception {
InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 12201);
EmbeddedChannel channel = new EmbeddedChannel(new GelfMessageUdpEncoder(remoteAddress));
// Test writing.
assertTrue(channel.writeOutbound(Unpooled.wrappedBuffer("test".getBytes(StandardCharsets.US_ASCII))));
assertTrue(channel.finish());
// Test reading.
DatagramPacket datagramPacket = (DatagramPacket) channel.readOutbound();
byte[] bytes = new byte[datagramPacket.content().readableBytes()];
datagramPacket.content().getBytes(0, bytes);
assertEquals(remoteAddress, datagramPacket.recipient());
assertEquals("test", new String(bytes, StandardCharsets.US_ASCII));
}
public UdpServer<DatagramPacket, DatagramPacket> createServer() {
UdpServer<DatagramPacket, DatagramPacket> server = RxNetty.createUdpServer(port, new ConnectionHandler<DatagramPacket, DatagramPacket>() {
@Override
public Observable<Void> handle(final ObservableConnection<DatagramPacket, DatagramPacket> newConnection) {
return newConnection.getInput().flatMap(new Func1<DatagramPacket, Observable<Void>>() {
@Override
public Observable<Void> call(final DatagramPacket received) {
return Observable.interval(delay, TimeUnit.MILLISECONDS).take(1).flatMap(new Func1<Long, Observable<Void>>() {
@Override
public Observable<Void> call(Long aLong) {
InetSocketAddress sender = received.sender();
System.out.println("Received datagram. Sender: " + sender);
ByteBuf data = newConnection.getChannel().alloc().buffer(WELCOME_MSG_BYTES.length);
data.writeBytes(WELCOME_MSG_BYTES);
return newConnection.writeAndFlush(new DatagramPacket(data, sender));
}
});
}
});
}
});
System.out.println("UDP hello server started at port: " + port);
return server;
}
@Override
public void decode(ChannelHandlerContext ctx, DatagramPacket packet, List<Object> out)
throws Exception {
ByteBuf buf = packet.content();
int length = buf.readableBytes();
if (length <= 1 || length >= MAXSIZE) {
logger
.error("UDP rcv bad packet, from {} length = {}", ctx.channel().remoteAddress(), length);
return;
}
byte[] encoded = new byte[length];
buf.readBytes(encoded);
try {
UdpEvent event = new UdpEvent(Message.parse(encoded), packet.sender());
out.add(event);
} catch (Exception e) {
logger.error("Parse msg failed, type {}, len {}, address {}", encoded[0], encoded.length,
packet.sender());
}
}
/**
* Try to add the given {@link DatagramPacket}. Returns {@code true} on success,
* {@code false} otherwise.
*/
boolean add(DatagramPacket packet) {
if (count == packets.length) {
return false;
}
ByteBuf content = packet.content();
int len = content.readableBytes();
if (len == 0) {
return true;
}
NativeDatagramPacket p = packets[count];
InetSocketAddress recipient = packet.recipient();
if (!p.init(content, recipient)) {
return false;
}
count++;
return true;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket pack) throws Exception {
final ByteBuf buf = pack.content();
final int size = buf.readableBytes();
if(size > 0){
int connId = 0;
if(buf.readByte()==Kcp.FLAG && size > 1 + 4){ //valid kcp head
connId = buf.getInt(buf.readerIndex());
}
if(connId > 0){ //valid kcp pack
pack.retain();
queueRecv.offer(pack);
// log.I("Recv kcp pack, sender="+pack.sender().toString());
}else{ //normal udp pack
log.I("Recv udp pack, sender="+pack.sender().toString());
}
}else{
log.E("Invalid pack, len=0, sender="+pack.sender().toString());
}
}
/**
* 回复find_node回复
*/
public void findNodeReceive(String messageId,InetSocketAddress address, String nodeId, List<Node> nodeList,int num) {
if(!channels.get(num).isWritable()){
return;
}
FindNodeResponse findNodeResponse=new FindNodeResponse(messageId,nodeId,new String(Node.toBytes(nodeList), CharsetUtil.ISO_8859_1));
channels.get(num).writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(bencode.encode(DHTUtil.beanToMap(findNodeResponse))), address));
}
/**
* 回复get_peers
*/
public void getPeersReceive(String messageId,InetSocketAddress address, String nodeId, String token, List<Node> nodeList,int num) {
if(!channels.get(num).isWritable()){
return;
}
GetPeersResponse getPeersResponse = new GetPeersResponse(messageId,nodeId, token, new String(Node.toBytes(nodeList), CharsetUtil.ISO_8859_1));
channels.get(num).writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(bencode.encode(DHTUtil.beanToMap(getPeersResponse))), address));
}
/**
* 回复announce_peer
*/
public void announcePeerReceive(String messageId,InetSocketAddress address, String nodeId,int num) {
if(!channels.get(num).isWritable()){
return;
}
AnnouncePeersResponse response = new AnnouncePeersResponse(messageId,nodeId);
channels.get(num).writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(bencode.encode(DHTUtil.beanToMap(response))), address));
}
/**
* announce_peer
*/
public void announcePeer(String id,Integer impliedPort,String infoHash,Integer port,String token,InetSocketAddress address,int num) {
if(!channels.get(num).isWritable()){
return;
}
AnnouncePeersRequest announcePeersRequest = new AnnouncePeersRequest(id,impliedPort,infoHash,port,token);
channels.get(num).writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(bencode.encode(DHTUtil.beanToMap(announcePeersRequest))), address));
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket datagramPacket) throws Exception {
try {
// 因为Netty对UDP进行了封装,所以接收到的是DatagramPacket对象。
String message = datagramPacket.content().toString(CharsetUtil.UTF_8);
if (ToolsKit.isEmpty(message)) {
LOG.error("UPD SERVER接收到的报文内容不能为空");
return;
}
// 将接收到的报文转至调度工厂进行处理
RobotUtil.channelReadToDispatchFactory(ctx.channel(), clientEntries, message);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
System.err.println(packet);
String s = packet.content().toString(CharsetUtil.UTF_8);
System.out.println(s);
ByteBuf buf = Unpooled.copiedBuffer("I'm alive at "+new Date(), CharsetUtil.UTF_8);
ctx.write(new DatagramPacket(buf, packet.sender()));
}
/**
* 回复 find_node 请求, 由于是模拟的 DHT 节点,所以直接回复一个空的 node 集合即可
* Response = {"t":"aa", "y":"r", "r": {"id":"0123456789abcdefghij", "nodes": "def456..."}}
*
* @param t
* @param sender
*/
private void responseFindNode(byte[] t, byte[] nid, InetSocketAddress sender) {
HashMap<String, Object> r = new HashMap<>();
r.put("id", NodeIdUtil.getNeighbor(DHTServer.SELF_NODE_ID, nid));
r.put("nodes", new byte[]{});
DatagramPacket packet = createPacket(t, "r", r, sender);
dhtServer.sendKRPC(packet);
//log.info("response find_node[{}]", sender);
}
/**
* 回复 get_peers 请求,必须回复,不然收不到 announce_peer 请求
* Response with closest nodes = {"t":"aa", "y":"r", "r": {"id":"abcdefghij0123456789", "token":"aoeusnth", "nodes": "def456..."}}
*
* @param t
* @param sender
*/
private void responseGetPeers(byte[] t, byte[] info_hash, InetSocketAddress sender) {
//check bloom filter, if exists then don't reply it
if (dhtServer.bloomFilter.check(ByteUtil.byteArrayToHex(info_hash)))
return;
HashMap<String, Object> r = new HashMap<>();
r.put("token", new byte[]{info_hash[0], info_hash[1]});
r.put("nodes", new byte[]{});
r.put("id", NodeIdUtil.getNeighbor(DHTServer.SELF_NODE_ID, info_hash));
DatagramPacket packet = createPacket(t, "r", r, sender);
dhtServer.sendKRPC(packet);
//log.info("response get_peers[{}]", sender);
}
public static ChannelFuture newBootstrapUDP(EventLoopGroup loopGroup, SimpleChannelInboundHandler<DatagramPacket> handler, int port){
return new Bootstrap().group(loopGroup)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(handler)
.bind(port);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
ByteBuf data = msg.content();
if (data.readableBytes() > 6 && data.readIntLE() == -1) {
byte[] raw = new byte[data.readableBytes() - 2];
data.readBytes(raw);
data.skipBytes(2);
//Pass to the callback
if (logEventCallback != null)
logEventCallback.accept(new SourceLogEntry(new String(raw, Charsets.UTF_8), msg.sender()));
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Gdx.app.debug(TAG, "channelRead");
boolean release = true;
try {
if (accept(msg)) {
messageReceived(ctx, (DatagramPacket) msg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (release) ReferenceCountUtil.release(msg);
}
}
/**
* Framing an UDP packet is much simpler than for a stream based protocol
* like TCP. We just assumes that everything is correct and therefore all is
* needed is to read the first line, which is assumed to be a SIP initial
* line, then read all headers as one big block and whatever is left better
* be the payload (if there is one).
*
* Of course, things do go wrong. If e.g. the UDP packet is fragmented, then
* we may end up with a partial SIP message but the user can either decide
* to double check things by calling {@link SipMessage#verify()} or the user
* will eventually notice when trying to access partial headers etc.
*
*/
@Override
protected void decode(final ChannelHandlerContext ctx, final DatagramPacket msg, final List<Object> out)
throws Exception {
final long arrivalTime = this.clock.getCurrentTimeMillis();
final ByteBuf content = msg.content();
// some clients are sending various types of pings even over
// UDP, such as linphone which is sending "jaK\n\r".
// According to RFC5626, the only valid ping over UDP
// is to use a STUN request and since such a request is
// at least 20 bytes we will simply ignore anything less
// than that. And yes, there is no way that an actual
// SIP message ever could be less than 20 bytes.
if (content.readableBytes() < 20) {
return;
}
final byte[] b = new byte[content.readableBytes()];
content.getBytes(0, b);
final Buffer buffer = Buffers.wrap(b);
SipParser.consumeSWS(buffer);
final SipMessage sipMessage = SipParser.frame(buffer);
// System.err.println("CSeq header: " + sipMessage.getCSeqHeader());
// final SipInitialLine initialLine = SipInitialLine.parse(buffer.readLine());
// final Buffer headers = buffer.readUntilDoubleCRLF();
// SipMessage sipMessage = null;
// if (initialLine.isRequestLine()) {
// sipMessage = new SipRequestImpl(initialLine.toRequestLine(), headers, buffer);
// } else {
// sipMessage = new SipResponseImpl(initialLine.toResponseLine(), headers, buffer);
// }
final Connection connection = new UdpConnection(ctx.channel(), msg.sender());
final SipMessageEvent event = new DefaultSipMessageEvent(connection, sipMessage, arrivalTime);
out.add(event);
}
@Override
protected void decode(ChannelHandlerContext ctx, DatagramPacket dgram, List<Object> out) throws Exception {
ByteBuf msg = dgram.content();
ByteBufferInput input = new ByteBufferInput(msg.nioBuffer(), protostuffEncoded);
T newMsg = schema.newMessage();
schema.mergeFrom(input, newMsg);
out.add(newMsg);
}
private ChannelFuture respondDevice(InetSocketAddress to, Channel ch, String date) {
ByteBuf data = Unpooled.buffer();
ByteBufUtil.writeUtf8(data,
"HTTP/1.1 200 OK\r\n" +
"CACHE-CONTROL: max-age=1800\r\n" +
"DATE: " + date + "\r\n" +
"EXT:\r\n" +
"LOCATION: http://" + addr + ":" + HttpServer.PORT + "/upnp/device.xml\r\n" +
"ST: uuid:" + IrisUpnpService.uuid + "\r\n" +
"USN: uuid:" + IrisUpnpService.uuid + "\r\n" +
"SERVER: Iris OS/2.0 UPnP/1.0 Iris/2.0\r\n\r\n"
);
return ch.writeAndFlush(new DatagramPacket(data,to));
}
public static ChannelFuture newBootstrapUDP(EventLoopGroup loopGroup, SimpleChannelInboundHandler<DatagramPacket> handler, int port){
return new Bootstrap().group(loopGroup)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(handler)
.bind(port);
}
@Override
public void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
System.err.println(packet);
if ("QOTM?".equals(packet.content().toString(CharsetUtil.UTF_8))) {
ctx.write(new DatagramPacket(
Unpooled.copiedBuffer("QOTM: " + nextQuote(), CharsetUtil.UTF_8), packet.sender()));
}
}
@Override
public void create() {
Gdx.app.setLogLevel(Application.LOG_DEBUG);
group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap()
.group(group)
.channel(NioDatagramChannel.class)
.handler(new ChannelInitializer<DatagramChannel>() {
@Override
protected void initChannel(DatagramChannel ch) {
UnicastEndpoint<DatagramPacket> endpoint = new ReliableEndpoint(ch, TestClient.this);
TestClient.this.endpoint = endpoint;
ch.pipeline()
.addLast(new EndpointedChannelHandler<>(DatagramPacket.class, endpoint))
;
}
});
ChannelFuture f = b.connect("localhost", TestServer.PORT).sync();
sendPacket();
} catch (Throwable t) {
Gdx.app.error(TAG, t.getMessage(), t);
Gdx.app.exit();
}
}