下面列出了java.nio.channels.DatagramChannel#receive() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void receive() throws IOException {
DatagramChannel dc = DatagramChannel.open();
dc.configureBlocking(false);
dc.bind(new InetSocketAddress(9898));
Selector selector = Selector.open();
dc.register(selector, SelectionKey.OP_READ);
while (selector.select() > 0) {
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey sk = it.next();
if (sk.isReadable()) {
ByteBuffer buf = ByteBuffer.allocate(1024);
dc.receive(buf)
;
buf.flip();
System.out.println(new String(buf.array(), 0, buf.limit()));
buf.clear();
}
}
it.remove();
}
}
@Test
public void TestServer() throws Exception {
//接受组播和发送组播的数据报服务都要把组播地址添加进来
String host = "239.239.239.88";//多播地址
int port = 9998;
InetAddress group = InetAddress.getByName(host);
DatagramChannel channel = DatagramChannel.open(StandardProtocolFamily.INET);
channel.bind(new InetSocketAddress(port));
channel.join(group, Utils.getLocalNetworkInterface());
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketAddress sender = channel.receive(buffer);
buffer.flip();
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
System.out.println(new String(data));
}
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
DatagramChannel ch = javaChannel();
DatagramChannelConfig config = config();
RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
ByteBuf data = allocHandle.allocate(config.getAllocator());
allocHandle.attemptedBytesRead(data.writableBytes());
boolean free = true;
try {
ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes());
int pos = nioData.position();
InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(nioData);
if (remoteAddress == null) {
return 0;
}
allocHandle.lastBytesRead(nioData.position() - pos);
buf.add(new DatagramPacket(data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead()),
localAddress(), remoteAddress));
free = false;
return 1;
} catch (Throwable cause) {
PlatformDependent.throwException(cause);
return -1;
} finally {
if (free) {
data.release();
}
}
}
/**
* Will receive UDP data from channel and won't receive anything unless the
* given buffer has enough space for at least one full max udp packet.
*
* @param key selection key
* @param buffer to fill
* @return bytes read
* @throws IOException if error filling buffer from channel
*/
@Override
protected int fillBuffer(final SelectionKey key, final ByteBuffer buffer) throws IOException {
final DatagramChannel dChannel = (DatagramChannel) key.channel();
final int initialBufferPosition = buffer.position();
while (buffer.remaining() > MAX_UDP_PACKET_SIZE && key.isValid() && key.isReadable()) {
if (dChannel.receive(buffer) == null || readSingleDatagram) {
break;
}
}
return buffer.position() - initialBufferPosition;
}
@Test
public void receive() throws IOException {
DatagramChannel dc = DatagramChannel.open();
dc.configureBlocking(false);
dc.bind(new InetSocketAddress(9898));
Selector selector = Selector.open();
dc.register(selector, SelectionKey.OP_READ);
while (selector.select() > 0) {
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey sk = it.next();
if (sk.isReadable()) {
ByteBuffer buf = ByteBuffer.allocate(1024);
dc.receive(buf);
buf.flip();
System.out.println(new String(buf.array(), 0, buf.limit()));
buf.clear();
}
}
it.remove();
}
}
@Override
@TruffleBoundary
protected long receiveDataFrom(final SelectionKey key, final ByteBuffer data) throws IOException {
final DatagramChannel from = (DatagramChannel) key.channel();
from.receive(data);
return data.position();
}
/** Read, decode, handle messages
* @param udp Socket to use
* @param buffer Receive buffer to use
*/
protected void listen(final DatagramChannel udp, final ByteBuffer buffer)
{
logger.log(Level.FINE, "Starting " + Thread.currentThread().getName());
while (running)
{
try
{
// Wait for next UDP packet
buffer.clear();
final InetSocketAddress from = (InetSocketAddress) udp.receive(buffer);
buffer.flip();
// XXX Check against list of ignored addresses?
logger.log(Level.FINER, () -> "Received UDP from " + from + "\n" + Hexdump.toHexdump(buffer));
handleMessages(from, buffer);
}
catch (Exception ex)
{
if (running)
logger.log(Level.WARNING, "UDP receive error", ex);
// else: Ignore, closing
}
}
logger.log(Level.FINE, "Exiting " + Thread.currentThread().getName());
}
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
DatagramChannel ch = javaChannel();
UkcpServerChannelConfig config = config();
RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
ByteBuf data = allocHandle.allocate(config.getAllocator());
allocHandle.attemptedBytesRead(data.writableBytes());
boolean free = true;
try {
ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes());
int pos = nioData.position();
InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(nioData);
if (remoteAddress == null) {
return 0;
}
allocHandle.lastBytesRead(nioData.position() - pos);
buf.add(UkcpPacket.newInstance(data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead()),
remoteAddress));
free = false;
return 1;
} catch (Throwable cause) {
PlatformDependent.throwException(cause);
return -1;
} finally {
if (free) {
data.release();
}
}
}
public static void handleRead(SelectionKey key, ByteBuffer readBuffer) throws IOException {
// Log.i("UdpForwarder", "Handling Read");
DatagramChannel channel = (DatagramChannel) key.channel();
ClientRecord clientRecord = (ClientRecord) key.attachment();
// Ensure the buffer is empty
readBuffer.clear();
// Receive the data
channel.receive(readBuffer);
// Get read to wrte, then send
readBuffer.flip();
channel.send(readBuffer, clientRecord.toAddress);
// If there is anything remaining in the buffer
if (readBuffer.remaining() > 0) {
clientRecord.writeBuffer.put(readBuffer);
key.interestOps(SelectionKey.OP_WRITE);
}
// ClientRecord clientRecord = (ClientRecord) key.attachment();
// clientRecord.buffer.clear(); // Prepare buffer for receiving
// clientRecord.clientAddress = channel.receive(clientRecord.buffer);
//
// if (clientRecord.clientAddress != null) { // Did we receive something?
// // Register write with the selector
// key.interestOps(SelectionKey.OP_WRITE);
// }
}
@Test
public void testBlockSockets() throws Exception {
DatagramChannel channel1 = DatagramChannel.open();
channel1.configureBlocking(true);
// No empty datagram for connected socket
// https://bugs.openjdk.java.net/browse/JDK-8013175
// channel1.connect(bindAddress);
DatagramChannel channel2 = DatagramChannel.open();
channel2.configureBlocking(true);
channel2.bind(REFLECTOR_ADDRESS);
ByteBuffer bb = ByteBuffer.allocate(0);
bb.clear();
try {
bb.flip();
int sent = channel1.send(bb, REFLECTOR_ADDRESS);
Assert.assertEquals(0, sent);
Thread.sleep(100);
bb.clear();
InetSocketAddress address = (InetSocketAddress) channel2.receive(bb);
Assert.assertNotNull(address);
Assert.assertEquals(0, bb.position());
} finally {
NioUtils.close(channel2);
NioUtils.close(channel1);
}
}
@Test
public void testNoCrusher() throws Exception {
CyclicBarrier barrier = new CyclicBarrier(2);
DatagramBulkReflector reflector = new DatagramBulkReflector("REFLECTOR", REFLECTOR_ADDRESS, 1, barrier);
reflector.open();
DatagramChannel channel = DatagramChannel.open();
channel.configureBlocking(true);
// No empty datagram for connected socket
// https://bugs.openjdk.java.net/browse/JDK-8013175
// channel.connect(reflectorAddress);
barrier.await();
Thread.sleep(1000);
ByteBuffer bb = ByteBuffer.allocate(0);
try {
// sent
bb.clear();
bb.flip();
int sent = channel.send(bb, REFLECTOR_ADDRESS);
Assert.assertEquals(0, sent);
// read
bb.clear();
InetSocketAddress address = (InetSocketAddress) channel.receive(bb);
Assert.assertNotNull(address);
Assert.assertEquals(REFLECTOR_ADDRESS, address);
Assert.assertEquals(0, bb.position());
} finally {
NioUtils.close(channel);
NioUtils.close(reflector);
}
}
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
DatagramChannel ch = javaChannel();
DatagramChannelConfig config = config();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
ByteBuf data = allocHandle.allocate(config.getAllocator());
boolean free = true;
try {
ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes());
int pos = nioData.position();
InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(nioData);
if (remoteAddress == null) {
return 0;
}
int readBytes = nioData.position() - pos;
data.writerIndex(data.writerIndex() + readBytes);
allocHandle.record(readBytes);
buf.add(new DatagramPacket(data, localAddress(), remoteAddress));
free = false;
return 1;
} catch (Throwable cause) {
PlatformDependent.throwException(cause);
return -1;
} finally {
if (free) {
data.release();
}
}
}
/**
* Will receive UDP data from channel and won't receive anything unless the
* given buffer has enough space for at least one full max udp packet.
*
* @param key selection key
* @param buffer to fill
* @return bytes read
* @throws IOException if error filling buffer from channel
*/
@Override
protected int fillBuffer(final SelectionKey key, final ByteBuffer buffer) throws IOException {
final DatagramChannel dChannel = (DatagramChannel) key.channel();
final int initialBufferPosition = buffer.position();
while (buffer.remaining() > MAX_UDP_PACKET_SIZE && key.isValid() && key.isReadable()) {
if (dChannel.receive(buffer) == null || readSingleDatagram) {
break;
}
}
return buffer.position() - initialBufferPosition;
}
public InetSocketAddress readFromAddress () throws IOException {
DatagramChannel datagramChannel = this.datagramChannel;
if (datagramChannel == null) throw new SocketException("Connection is closed.");
lastCommunicationTime = System.currentTimeMillis();
if(!datagramChannel.isConnected())
return (InetSocketAddress)datagramChannel.receive(readBuffer); // always null on Android >= 5.0
datagramChannel.read(readBuffer);
return connectedAddress;
}
@Override
protected SocketAddress receive(DatagramChannel handle, IoBuffer buffer) throws Exception {
return handle.receive(buffer.buf());
}
@Test
public void test() throws Exception {
CyclicBarrier barrier = new CyclicBarrier(2);
DatagramBulkReflector reflector = new DatagramBulkReflector("REFLECTOR", REFLECTOR_ADDRESS, 1, barrier);
reflector.open();
barrier.await();
Thread.sleep(1000);
DatagramChannel channel = DatagramChannel.open();
channel.configureBlocking(true);
ByteBuffer bb = ByteBuffer.allocate(100);
try {
// sent
bb.clear();
bb.flip();
int sent = channel.send(bb, CRUSHER_ADDRESS);
Assert.assertEquals(0, sent);
// check
Thread.sleep(500);
Assert.assertEquals(1, crusher.getClientTotalCount());
RateMeters innerByteMeters = crusher.getInnerByteMeters();
Assert.assertEquals(0, innerByteMeters.getReadMeter().getTotalCount());
Assert.assertEquals(0, innerByteMeters.getSentMeter().getTotalCount());
RateMeters innerPacketMeters = crusher.getInnerPacketMeters();
Assert.assertEquals(1, innerPacketMeters.getReadMeter().getTotalCount());
Assert.assertEquals(1, innerPacketMeters.getSentMeter().getTotalCount());
// read
bb.clear();
InetSocketAddress address = (InetSocketAddress) channel.receive(bb);
Assert.assertNotNull(address);
Assert.assertEquals(CRUSHER_ADDRESS, address);
Assert.assertEquals(0, bb.position());
} finally {
NioUtils.close(channel);
NioUtils.close(reflector);
}
}
private void run() throws IOException
{
final InetSocketAddress sendAddress = new InetSocketAddress("localhost", Common.PONG_PORT);
final ByteBuffer buffer = ByteBuffer.allocateDirect(Configuration.MTU_LENGTH_DEFAULT);
final DatagramChannel receiveChannel = DatagramChannel.open();
Common.init(receiveChannel);
receiveChannel.bind(new InetSocketAddress("localhost", Common.PING_PORT));
final DatagramChannel sendChannel = DatagramChannel.open();
Common.init(sendChannel);
final Selector selector = Selector.open();
final IntSupplier handler =
() ->
{
try
{
buffer.clear();
receiveChannel.receive(buffer);
final long receivedSequenceNumber = buffer.getLong(0);
final long receivedTimestamp = buffer.getLong(SIZE_OF_LONG);
buffer.clear();
buffer.putLong(receivedSequenceNumber);
buffer.putLong(receivedTimestamp);
buffer.flip();
sendChannel.send(buffer, sendAddress);
}
catch (final IOException ex)
{
ex.printStackTrace();
}
return 1;
};
receiveChannel.register(selector, OP_READ, handler);
final AtomicBoolean running = new AtomicBoolean(true);
SigInt.register(() -> running.set(false));
while (true)
{
while (selector.selectNow() == 0)
{
if (!running.get())
{
return;
}
}
final Set<SelectionKey> selectedKeys = selector.selectedKeys();
final Iterator<SelectionKey> iter = selectedKeys.iterator();
while (iter.hasNext())
{
final SelectionKey key = iter.next();
if (key.isReadable())
{
((IntSupplier)key.attachment()).getAsInt();
}
iter.remove();
}
}
}
private void run() throws IOException
{
final Histogram histogram = new Histogram(TimeUnit.SECONDS.toNanos(10), 3);
final ByteBuffer buffer = ByteBuffer.allocateDirect(Configuration.MTU_LENGTH_DEFAULT);
final DatagramChannel receiveChannel = DatagramChannel.open();
Common.init(receiveChannel);
receiveChannel.bind(new InetSocketAddress("localhost", Common.PONG_PORT));
final DatagramChannel sendChannel = DatagramChannel.open();
Common.init(sendChannel);
final Selector selector = Selector.open();
final IntSupplier handler =
() ->
{
try
{
buffer.clear();
receiveChannel.receive(buffer);
final long receivedSequenceNumber = buffer.getLong(0);
final long timestampNs = buffer.getLong(SIZE_OF_LONG);
if (receivedSequenceNumber != sequenceNumber)
{
throw new IllegalStateException(
"data Loss:" + sequenceNumber + " to " + receivedSequenceNumber);
}
final long durationNs = System.nanoTime() - timestampNs;
histogram.recordValue(durationNs);
}
catch (final IOException ex)
{
ex.printStackTrace();
}
return 1;
};
receiveChannel.register(selector, OP_READ, handler);
final AtomicBoolean running = new AtomicBoolean(true);
SigInt.register(() -> running.set(false));
while (running.get())
{
measureRoundTrip(histogram, SEND_ADDRESS, buffer, sendChannel, selector, running);
histogram.reset();
System.gc();
LockSupport.parkNanos(1000 * 1000 * 1000);
}
}
private void run() throws IOException
{
final InetSocketAddress sendAddress = new InetSocketAddress("localhost", Common.PONG_PORT);
final ByteBuffer buffer = ByteBuffer.allocateDirect(Configuration.MTU_LENGTH_DEFAULT);
final DatagramChannel receiveChannel = DatagramChannel.open();
Common.init(receiveChannel);
receiveChannel.bind(new InetSocketAddress("localhost", Common.PING_PORT));
final DatagramChannel sendChannel = DatagramChannel.open();
Common.init(sendChannel);
final Selector selector = Selector.open();
final NioSelectedKeySet keySet = Common.keySet(selector);
final ToIntFunction<SelectionKey> handler =
(key) ->
{
try
{
buffer.clear();
receiveChannel.receive(buffer);
final long receivedSequenceNumber = buffer.getLong(0);
final long receivedTimestamp = buffer.getLong(SIZE_OF_LONG);
buffer.clear();
buffer.putLong(receivedSequenceNumber);
buffer.putLong(receivedTimestamp);
buffer.flip();
sendChannel.send(buffer, sendAddress);
}
catch (final IOException ex)
{
ex.printStackTrace();
}
return 1;
};
receiveChannel.register(selector, OP_READ, null);
final AtomicBoolean running = new AtomicBoolean(true);
SigInt.register(() -> running.set(false));
while (true)
{
while (selector.selectNow() == 0)
{
if (!running.get())
{
return;
}
}
keySet.forEach(handler);
}
}
public InetSocketAddress readFromAddress () throws IOException {
DatagramChannel datagramChannel = this.datagramChannel;
if (datagramChannel == null) throw new SocketException("Connection is closed.");
lastCommunicationTime = System.currentTimeMillis();
return (InetSocketAddress)datagramChannel.receive(readBuffer);
}