下面列出了io.netty.channel.socket.DatagramPacket#release ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void channelRead(@Nullable ChannelHandlerContext ctx, @Nullable Object msg) throws Exception {
DatagramPacket packet = (DatagramPacket)msg;
if (packet == null) {
return;
}
try {
ByteBuf content = packet.content();
if (log.isTraceEnabled()) {
log.trace("recv upnp message: {}", content.toString(StandardCharsets.UTF_8));
}
if (content.readableBytes() > 5 &&
content.getByte(content.readerIndex()) == 'H' &&
content.getByte(content.readerIndex()+1) == 'T' &&
content.getByte(content.readerIndex()+2) == 'T' &&
content.getByte(content.readerIndex()+3) == 'P' &&
content.getByte(content.readerIndex()+4) == '/') {
handleResponse(ctx, packet);
} else {
handleRequest(ctx, packet);
}
} catch (Throwable th) {
log.debug("error processing upnp packet:", th);
} finally {
if (packet.refCnt() > 0) {
packet.release(packet.refCnt());
}
}
}
@Test
public void writeQueryTest() throws Exception {
InetSocketAddress addr = SocketUtils.socketAddress("8.8.8.8", 53);
EmbeddedChannel embedder = new EmbeddedChannel(new DatagramDnsQueryEncoder());
List<DnsQuery> queries = new ArrayList<DnsQuery>(5);
queries.add(new DatagramDnsQuery(null, addr, 1).setRecord(
DnsSection.QUESTION,
new DefaultDnsQuestion("1.0.0.127.in-addr.arpa", DnsRecordType.PTR)));
queries.add(new DatagramDnsQuery(null, addr, 1).setRecord(
DnsSection.QUESTION,
new DefaultDnsQuestion("www.example.com", DnsRecordType.A)));
queries.add(new DatagramDnsQuery(null, addr, 1).setRecord(
DnsSection.QUESTION,
new DefaultDnsQuestion("example.com", DnsRecordType.AAAA)));
queries.add(new DatagramDnsQuery(null, addr, 1).setRecord(
DnsSection.QUESTION,
new DefaultDnsQuestion("example.com", DnsRecordType.MX)));
queries.add(new DatagramDnsQuery(null, addr, 1).setRecord(
DnsSection.QUESTION,
new DefaultDnsQuestion("example.com", DnsRecordType.CNAME)));
for (DnsQuery query: queries) {
assertThat(query.count(DnsSection.QUESTION), is(1));
assertThat(query.count(DnsSection.ANSWER), is(0));
assertThat(query.count(DnsSection.AUTHORITY), is(0));
assertThat(query.count(DnsSection.ADDITIONAL), is(0));
embedder.writeOutbound(query);
DatagramPacket packet = embedder.readOutbound();
Assert.assertTrue(packet.content().isReadable());
packet.release();
Assert.assertNull(embedder.readOutbound());
}
}
@Test
public void testEncode() {
InetSocketAddress recipient = SocketUtils.socketAddress("127.0.0.1", 10000);
InetSocketAddress sender = SocketUtils.socketAddress("127.0.0.1", 20000);
assertTrue(channel.writeOutbound(
new DefaultAddressedEnvelope<String, InetSocketAddress>("netty", recipient, sender)));
DatagramPacket packet = channel.readOutbound();
try {
assertEquals("netty", packet.content().toString(CharsetUtil.UTF_8));
assertEquals(recipient, packet.recipient());
assertEquals(sender, packet.sender());
} finally {
packet.release();
}
}
private void release(){
while(!queueRecv.isEmpty()){
final DatagramPacket pack = queueRecv.poll();
pack.release();
}
final Iterator<Kcp> itKcp = mapKcp.values().iterator();
while(itKcp.hasNext()){
itKcp.next().release();
}
}
public int onReceive(DatagramPacket pack){
if(onLoop){
if(queueRecv.offer(pack)){
return 0;
}
}
pack.release();
return 1;
}
public int onReceiveRaw(DatagramPacket pack){
if(closed){
pack.release();
return 1;
}
_tmLastRecv = _tmNow;
final ByteBuf buf = pack.content();
final byte act = buf.readByte();
if(act == ACT_ACK){ //ack packet
_procAck(buf.readInt(), buf.readInt());
pack.release();
}else if(act == ACT_DATA){ //data packet
final int packId = buf.readInt();
if(packId < recvWndR){ //valid id
if(!_recvWndPack(packId, pack)){ //pack duplicate
pack.release();
}
//send ack
//flag(1byte) + connId(4byte) + act(1byte) + packId(4byte) + recvWndL(4byte)
final ByteBuf bufAck = PooledByteBufAllocator.DEFAULT.ioBuffer(14);
bufAck.writeByte(FLAG).writeInt(_connId).writeByte(ACT_ACK)
.writeInt(packId).writeInt(recvWndL);
final DatagramPacket packAck = new DatagramPacket(bufAck, remoteAddr);
listener.onOutput(packAck); //notify logic new msg in
}else{ //invalid pack id
final ByteBuf bufTmp = pack.content();
bufTmp.readLong();
final String strTmp = (String) bufTmp.readCharSequence(bufTmp.readableBytes(), Charset.forName("utf-8"));
System.out.println("Invalid packId="+packId+", recvWndR="+recvWndR
+", data="+strTmp);
pack.release();
}
}else{ //invalid pack
pack.release();
}
return 0;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
DatagramPacket datagramPacket = (DatagramPacket) msg;
Packet packet = PacketDecoder.decodeFrame(datagramPacket);
receiver.onReceive(packet, connection);
datagramPacket.release();//最后一个使用方要释放引用
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof DatagramPacket) {
// Get packet and sender data
DatagramPacket datagram = (DatagramPacket) msg;
InetSocketAddress sender = datagram.sender();
RakNetPacket packet = new RakNetPacket(datagram);
// If an exception happens it's because of this address
this.causeAddress = sender;
// Check if the address is blocked
if (blocked.contains(sender.getAddress())) {
datagram.release(); // No longer needed
return; // Address blocked
}
// Handle the packet and release the buffer
if (packet.getId() == RakNetPacket.ID_UNCONNECTED_PONG) {
UnconnectedPong pong = new UnconnectedPong(packet);
pong.decode();
if (!pong.failed()) {
Discovery.updateDiscoveryData(sender, pong);
logger.trace("Sent unconnected pong to discovery system");
}
}
Discovery.callEvent(listener -> {
datagram.content().readerIndex(0); // Reset index
listener.handleNettyMessage(sender, datagram.content());
});
if (datagram.release() /* No longer needed */) {
logger.trace("Released datagram");
} else {
logger.error("Memory leak: Failed to deallocate datagram when releasing it");
}
// No exceptions occurred, release the suspect
this.causeAddress = null;
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof DatagramPacket) {
// Get packet and sender data
DatagramPacket datagram = (DatagramPacket) msg;
InetSocketAddress sender = datagram.sender();
RakNetPacket packet = new RakNetPacket(datagram);
// If an exception happens it's because of this address
this.causeAddress = sender;
// Handle the packet and release the buffer
client.handleMessage(sender, packet);
logger.trace("Sent packet to client and reset datagram buffer read position");
client.callEvent(listener -> {
datagram.content().readerIndex(0); // Reset position
listener.handleNettyMessage(client, sender, datagram.content());
});
if (datagram.release() /* No longer needed */) {
logger.trace("Released datagram");
} else {
logger.error("Memory leak: Failed to deallocate datagram when releasing it");
}
// No exceptions occurred, release the suspect
this.causeAddress = null;
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof DatagramPacket) {
// Get packet and sender data
DatagramPacket datagram = (DatagramPacket) msg;
InetSocketAddress sender = datagram.sender();
RakNetPacket packet = new RakNetPacket(datagram);
// If an exception happens it's because of this address
this.causeAddress = sender;
// Check if address is blocked
if (this.isAddressBlocked(sender.getAddress())) {
BlockedAddress status = blocked.get(sender.getAddress());
if (!status.shouldUnblock()) {
datagram.release(); // No longer needed
return; // Address still blocked
}
this.unblockAddress(sender.getAddress());
}
// Handle the packet and release the buffer
server.handleMessage(sender, packet);
logger.debug("Sent packet to server and reset datagram buffer read position");
server.callEvent(listener -> {
datagram.content().readerIndex(0); // Reset index
listener.handleNettyMessage(server, sender, datagram.content());
});
if (datagram.release() /* No longer needed */) {
logger.trace("Released datagram");
} else {
logger.error("Memory leak: Failed to deallocate datagram when releasing it");
}
// No exceptions occurred, release the suspect
this.causeAddress = null;
}
}
/**
* receive DatagramPacket
*
* @param dp
*/
private void onReceive(DatagramPacket dp) {
if (this.running) {
InetSocketAddress sender = dp.sender();
int hash = sender.hashCode();
hash = hash < 0 ? -hash : hash;
this.workers[hash % workers.length].input(dp);
} else {
dp.release();
}
}
/**
* 收到服务器消息
*
* @param dp
*/
private void onReceive(DatagramPacket dp) {
if (this.kcp != null && this.running) {
this.kcp.input(dp.content());
synchronized (this.waitLock) {
this.waitLock.notify();
}
} else {
dp.release();
}
}
/**
* 收到输入
*/
void input(DatagramPacket dp) {
if (this.running) {
this.inputs.add(dp);
synchronized (this.lock) {
lock.notify();
}
} else {
dp.release();
}
}
/**
* 释放所有内存
*/
private void release() {
for (DatagramPacket dp : this.inputs) {
dp.release();
}
this.inputs.clear();
for (KcpOnUdp ku : this.kcps.values()) {
if (!ku.isClosed()) {
ku.release();
}
}
this.kcps.clear();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
packet.retain();
final boolean succeeded = queue.offer(packet);
if (succeeded) {
gaugeMap.put(GAUGE_NUM_QUEUED_PACKETS, queuedPacketCount.incrementAndGet());
gaugeMap.put(GAUGE_PACKET_QUEUE_SIZE, queue.size());
} else {
gaugeMap.put(GAUGE_NUM_DROPPED_PACKETS, droppedPacketCount.incrementAndGet());
// allow Netty to collect the buffer
packet.release();
}
}
private boolean receivePacket() throws Exception {
DatagramPacket datagramPacket = this.socket.readPacket();
if (datagramPacket != null) {
// Check this early
try {
String source = datagramPacket.sender().getHostString();
currentSource = source; //in order to block address
if (this.block.containsKey(source)) {
return true;
}
if (this.ipSec.containsKey(source)) {
this.ipSec.put(source, this.ipSec.get(source) + 1);
} else {
this.ipSec.put(source, 1);
}
ByteBuf byteBuf = datagramPacket.content();
if (byteBuf.readableBytes() == 0) {
// Exit early to process another packet
return true;
}
byte[] buffer = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(buffer);
int len = buffer.length;
int port = datagramPacket.sender().getPort();
this.receiveBytes += len;
byte pid = buffer[0];
if (pid == UNCONNECTED_PONG.ID) {
return false;
}
Packet packet = this.getPacketFromPool(pid);
if (packet != null) {
packet.buffer = buffer;
this.getSession(source, port).handlePacket(packet);
return true;
} else if (pid == UNCONNECTED_PING.ID) {
packet = new UNCONNECTED_PING();
packet.buffer = buffer;
packet.decode();
UNCONNECTED_PONG pk = new UNCONNECTED_PONG();
pk.serverID = this.getID();
pk.pingID = ((UNCONNECTED_PING) packet).pingID;
pk.serverName = this.getName();
this.sendPacket(pk, source, port);
} else if (buffer.length != 0) {
this.streamRAW(source, port, buffer);
return true;
} else {
return false;
}
} finally {
datagramPacket.release();
}
}
return false;
}
private boolean receivePacket() throws Exception {
DatagramPacket datagramPacket = this.socket.readPacket();
if (datagramPacket != null) {
// Check this early
try {
String source = datagramPacket.sender().getHostString();
currentSource = source; //in order to block address
if (this.block.containsKey(source)) {
return true;
}
if (this.ipSec.containsKey(source)) {
this.ipSec.put(source, this.ipSec.get(source) + 1);
} else {
this.ipSec.put(source, 1);
}
ByteBuf byteBuf = datagramPacket.content();
if (byteBuf.readableBytes() == 0) {
// Exit early to process another packet
return true;
}
byte[] buffer = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(buffer);
int len = buffer.length;
int port = datagramPacket.sender().getPort();
this.receiveBytes += len;
byte pid = buffer[0];
if (pid == UNCONNECTED_PONG.ID) {
return false;
}
Packet packet = this.getPacketFromPool(pid);
if (packet != null) {
packet.buffer = buffer;
this.getSession(source, port).handlePacket(packet);
return true;
} else if (pid == UNCONNECTED_PING.ID) {
packet = new UNCONNECTED_PING();
packet.buffer = buffer;
packet.decode();
UNCONNECTED_PONG pk = new UNCONNECTED_PONG();
pk.serverID = this.getID();
pk.pingID = ((UNCONNECTED_PING) packet).pingID;
pk.serverName = this.getName();
this.sendPacket(pk, source, port);
} else if (buffer.length != 0) {
this.streamRAW(source, port, buffer);
return true;
} else {
return false;
}
} finally {
datagramPacket.release();
}
}
return false;
}
private boolean receivePacket() throws Exception {
DatagramPacket datagramPacket = this.socket.readPacket();
if (datagramPacket != null) {
// Check this early
String source = datagramPacket.sender().getHostString();
currentSource = source; //in order to block address
if (this.block.containsKey(source)) {
datagramPacket.release();
return true;
}
if (this.ipSec.containsKey(source)) {
this.ipSec.put(source, this.ipSec.get(source) + 1);
} else {
this.ipSec.put(source, 1);
}
ByteBuf byteBuf = datagramPacket.content();
byte[] buffer = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(buffer);
datagramPacket.release();
int len = buffer.length;
int port = datagramPacket.sender().getPort();
if (len > 0) {
this.receiveBytes += len;
byte pid = buffer[0];
if (pid == UNCONNECTED_PONG.ID) {
return false;
}
Packet packet = this.getPacketFromPool(pid);
if (packet != null) {
packet.buffer = buffer;
this.getSession(source, port).handlePacket(packet);
return true;
} else if (pid == UNCONNECTED_PING.ID) {
packet = new UNCONNECTED_PING();
packet.buffer = buffer;
packet.decode();
UNCONNECTED_PONG pk = new UNCONNECTED_PONG();
pk.serverID = this.getID();
pk.pingID = ((UNCONNECTED_PING) packet).pingID;
pk.serverName = this.getName();
this.sendPacket(pk, source, port);
} else if (buffer.length != 0) {
this.streamRAW(source, port, buffer);
return true;
} else {
return false;
}
} else {
return true;
}
}
return false;
}
private boolean receivePacket() throws Exception {
DatagramPacket datagramPacket = this.socket.readPacket();
if (datagramPacket != null) {
// Check this early
try {
String source = datagramPacket.sender().getHostString();
currentSource = source; //in order to block address
if (this.block.containsKey(source)) {
return true;
}
if (this.ipSec.containsKey(source)) {
this.ipSec.put(source, this.ipSec.get(source) + 1);
} else {
this.ipSec.put(source, 1);
}
ByteBuf byteBuf = datagramPacket.content();
if (byteBuf.readableBytes() == 0) {
// Exit early to process another packet
return true;
}
byte[] buffer = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(buffer);
int len = buffer.length;
int port = datagramPacket.sender().getPort();
this.receiveBytes += len;
byte pid = buffer[0];
if (pid == UNCONNECTED_PONG.ID) {
return false;
}
Packet packet = this.getPacketFromPool(pid);
if (packet != null) {
packet.buffer = buffer;
this.getSession(source, port).handlePacket(packet);
return true;
} else if (pid == UNCONNECTED_PING.ID) {
packet = new UNCONNECTED_PING();
packet.buffer = buffer;
packet.decode();
UNCONNECTED_PONG pk = new UNCONNECTED_PONG();
pk.serverID = this.getID();
pk.pingID = ((UNCONNECTED_PING) packet).pingID;
pk.serverName = this.getName();
this.sendPacket(pk, source, port);
} else if (buffer.length != 0) {
this.streamRAW(source, port, buffer);
return true;
} else {
return false;
}
} finally {
datagramPacket.release();
}
}
return false;
}