下面列出了java.nio.channels.DatagramChannel#register() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void receive() throws IOException {
DatagramChannel dc = DatagramChannel.open();
dc.configureBlocking(false);
dc.bind(new InetSocketAddress(9898));
Selector selector = Selector.open();
dc.register(selector, SelectionKey.OP_READ);
while (selector.select() > 0) {
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey sk = it.next();
if (sk.isReadable()) {
ByteBuffer buf = ByteBuffer.allocate(1024);
dc.receive(buf)
;
buf.flip();
System.out.println(new String(buf.array(), 0, buf.limit()));
buf.clear();
}
}
it.remove();
}
}
@Override
protected DatagramChannel open(SocketAddress localAddress) throws Exception {
final DatagramChannel c = DatagramChannel.open();
boolean success = false;
try {
new NioDatagramSessionConfig(c).setAll(getSessionConfig());
c.configureBlocking(false);
c.socket().bind(localAddress);
c.register(selector, SelectionKey.OP_READ);
success = true;
} finally {
if (!success) {
close(c);
}
}
return c;
}
@Test
public void receive() throws IOException {
DatagramChannel dc = DatagramChannel.open();
dc.configureBlocking(false);
dc.bind(new InetSocketAddress(9898));
Selector selector = Selector.open();
dc.register(selector, SelectionKey.OP_READ);
while (selector.select() > 0) {
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey sk = it.next();
if (sk.isReadable()) {
ByteBuffer buf = ByteBuffer.allocate(1024);
dc.receive(buf);
buf.flip();
System.out.println(new String(buf.array(), 0, buf.limit()));
buf.clear();
}
}
it.remove();
}
}
/**
* 开启一个UDP通道
*
* @param host 绑定本机地址
* @param port 指定绑定端口号,为0则随机指定
*/
public UdpChannel<Request> open(String host, int port) throws IOException {
if (selector == null) {
synchronized (this) {
if (selector == null) {
selector = Selector.open();
}
}
}
DatagramChannel channel = DatagramChannel.open();
channel.configureBlocking(false);
if (port > 0) {
channel.socket().bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port));
}
if (status == Status.STATUS_RUNNING) {
selector.wakeup();
}
SelectionKey selectionKey = channel.register(selector, SelectionKey.OP_READ);
UdpChannel<Request> udpChannel = new UdpChannel<>(channel, selectionKey, config, bufferPage);
selectionKey.attach(udpChannel);
//启动线程服务
if (status == Status.STATUS_INIT) {
initThreadServer();
}
return udpChannel;
}
/**
* Receive messages. Invoke the message listener on each message. Continue
* until a packet indicating the End of Session is received.
*
* @param multicastInterface the multicast interface
* @param multicastGroup the multicast group
* @param requestAddress the request address
* @param listener a message listener
* @throws IOException if an I/O error occurs
*/
public static void receive(NetworkInterface multicastInterface,
InetSocketAddress multicastGroup, InetSocketAddress requestAddress,
MessageListener listener) throws IOException {
DatagramChannel channel = DatagramChannel.open(StandardProtocolFamily.INET);
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
channel.bind(new InetSocketAddress(multicastGroup.getPort()));
channel.join(multicastGroup.getAddress(), multicastInterface);
channel.configureBlocking(false);
DatagramChannel requestChannel = DatagramChannel.open(StandardProtocolFamily.INET);
requestChannel.configureBlocking(false);
StatusListener statusListener = new StatusListener();
try (Selector selector = Selector.open();
MoldUDP64Client client = new MoldUDP64Client(channel, requestChannel,
requestAddress, listener, statusListener)) {
SelectionKey channelKey = channel.register(selector, SelectionKey.OP_READ);
SelectionKey requestChannelKey = requestChannel.register(selector, SelectionKey.OP_READ);
while (statusListener.receive) {
while (selector.select() == 0);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
if (selectedKeys.contains(channelKey))
client.receive();
if (selectedKeys.contains(requestChannelKey))
client.receiveResponse();
selectedKeys.clear();
}
}
}
@SuppressWarnings("resource")
public static @Nullable SelectionKey openBroadcastChannel(@Nullable Selector selector, String logId,
int broadcastPort) throws IOException {
if (selector == null) {
return null;
}
DatagramChannel broadcastChannel = DatagramChannel.open(StandardProtocolFamily.INET)
.setOption(StandardSocketOptions.SO_REUSEADDR, true)
.setOption(StandardSocketOptions.SO_BROADCAST, true);
broadcastChannel.configureBlocking(false);
LOGGER.debug("{} : Binding the broadcast channel on port {}", logId, broadcastPort);
broadcastChannel.bind(new InetSocketAddress(broadcastPort));
return broadcastChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
@SuppressWarnings("resource")
public static @Nullable SelectionKey openUnicastChannel(@Nullable Selector selector, String logId,
@Nullable InetSocketAddress address) throws IOException {
if (selector == null || address == null) {
return null;
}
DatagramChannel unicastChannel = DatagramChannel.open(StandardProtocolFamily.INET)
.setOption(StandardSocketOptions.SO_REUSEADDR, true);
unicastChannel.configureBlocking(false);
unicastChannel.connect(address);
LOGGER.trace("{} : Connected to light via {}", logId, unicastChannel.getLocalAddress().toString());
return unicastChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
private AsyncUdpSocketNio(@NotNull Eventloop eventloop, @NotNull DatagramChannel channel) throws IOException {
this.eventloop = eventloop;
this.channel = channel;
this.key = channel.register(eventloop.ensureSelector(), 0, this);
}
private void run() throws IOException
{
final InetSocketAddress sendAddress = new InetSocketAddress("localhost", Common.PONG_PORT);
final ByteBuffer buffer = ByteBuffer.allocateDirect(Configuration.MTU_LENGTH_DEFAULT);
final DatagramChannel receiveChannel = DatagramChannel.open();
Common.init(receiveChannel);
receiveChannel.bind(new InetSocketAddress("localhost", Common.PING_PORT));
final DatagramChannel sendChannel = DatagramChannel.open();
Common.init(sendChannel);
final Selector selector = Selector.open();
final IntSupplier handler =
() ->
{
try
{
buffer.clear();
receiveChannel.receive(buffer);
final long receivedSequenceNumber = buffer.getLong(0);
final long receivedTimestamp = buffer.getLong(SIZE_OF_LONG);
buffer.clear();
buffer.putLong(receivedSequenceNumber);
buffer.putLong(receivedTimestamp);
buffer.flip();
sendChannel.send(buffer, sendAddress);
}
catch (final IOException ex)
{
ex.printStackTrace();
}
return 1;
};
receiveChannel.register(selector, OP_READ, handler);
final AtomicBoolean running = new AtomicBoolean(true);
SigInt.register(() -> running.set(false));
while (true)
{
while (selector.selectNow() == 0)
{
if (!running.get())
{
return;
}
}
final Set<SelectionKey> selectedKeys = selector.selectedKeys();
final Iterator<SelectionKey> iter = selectedKeys.iterator();
while (iter.hasNext())
{
final SelectionKey key = iter.next();
if (key.isReadable())
{
((IntSupplier)key.attachment()).getAsInt();
}
iter.remove();
}
}
}
private void run() throws IOException
{
final Histogram histogram = new Histogram(TimeUnit.SECONDS.toNanos(10), 3);
final ByteBuffer buffer = ByteBuffer.allocateDirect(Configuration.MTU_LENGTH_DEFAULT);
final DatagramChannel receiveChannel = DatagramChannel.open();
Common.init(receiveChannel);
receiveChannel.bind(new InetSocketAddress("localhost", Common.PONG_PORT));
final DatagramChannel sendChannel = DatagramChannel.open();
Common.init(sendChannel);
final Selector selector = Selector.open();
final IntSupplier handler =
() ->
{
try
{
buffer.clear();
receiveChannel.receive(buffer);
final long receivedSequenceNumber = buffer.getLong(0);
final long timestampNs = buffer.getLong(SIZE_OF_LONG);
if (receivedSequenceNumber != sequenceNumber)
{
throw new IllegalStateException(
"data Loss:" + sequenceNumber + " to " + receivedSequenceNumber);
}
final long durationNs = System.nanoTime() - timestampNs;
histogram.recordValue(durationNs);
}
catch (final IOException ex)
{
ex.printStackTrace();
}
return 1;
};
receiveChannel.register(selector, OP_READ, handler);
final AtomicBoolean running = new AtomicBoolean(true);
SigInt.register(() -> running.set(false));
while (running.get())
{
measureRoundTrip(histogram, SEND_ADDRESS, buffer, sendChannel, selector, running);
histogram.reset();
System.gc();
LockSupport.parkNanos(1000 * 1000 * 1000);
}
}
private void run() throws IOException
{
final InetSocketAddress sendAddress = new InetSocketAddress("localhost", Common.PONG_PORT);
final ByteBuffer buffer = ByteBuffer.allocateDirect(Configuration.MTU_LENGTH_DEFAULT);
final DatagramChannel receiveChannel = DatagramChannel.open();
Common.init(receiveChannel);
receiveChannel.bind(new InetSocketAddress("localhost", Common.PING_PORT));
final DatagramChannel sendChannel = DatagramChannel.open();
Common.init(sendChannel);
final Selector selector = Selector.open();
final NioSelectedKeySet keySet = Common.keySet(selector);
final ToIntFunction<SelectionKey> handler =
(key) ->
{
try
{
buffer.clear();
receiveChannel.receive(buffer);
final long receivedSequenceNumber = buffer.getLong(0);
final long receivedTimestamp = buffer.getLong(SIZE_OF_LONG);
buffer.clear();
buffer.putLong(receivedSequenceNumber);
buffer.putLong(receivedTimestamp);
buffer.flip();
sendChannel.send(buffer, sendAddress);
}
catch (final IOException ex)
{
ex.printStackTrace();
}
return 1;
};
receiveChannel.register(selector, OP_READ, null);
final AtomicBoolean running = new AtomicBoolean(true);
SigInt.register(() -> running.set(false));
while (true)
{
while (selector.selectNow() == 0)
{
if (!running.get())
{
return;
}
}
keySet.forEach(handler);
}
}
/**
* Binds to listen for data grams on the given local IPAddress/port and
* restricts receipt of datagrams to those from the provided host and port,
* must specify both. This improves performance for datagrams coming from a
* sender that is known a-priori.
*
* @param nicIPAddress - if null will listen on wildcard address, which
* means datagrams will be received on all local network interfaces.
* Otherwise, will bind to the provided IP address associated with some NIC.
* @param port - the port to listen on. This is used to provide a well-known
* destination for a sender.
* @param receiveBufferSize - the number of bytes to request for a receive
* buffer from OS
* @param sendingHost - the hostname, or IP address, of the sender of
* datagrams. Only datagrams from this host will be received. If this is
* null the wildcard ip is used, which means datagrams may be received from
* any network interface on the local host.
* @param sendingPort - the port used by the sender of datagrams. Only
* datagrams from this port will be received.
* @throws IOException if unable to add channel
*/
public void addDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize, final String sendingHost,
final Integer sendingPort) throws IOException {
if (sendingHost == null || sendingPort == null) {
addDatagramChannel(nicIPAddress, port, receiveBufferSize);
return;
}
final DatagramChannel dChannel = createAndBindDatagramChannel(nicIPAddress, port, receiveBufferSize);
dChannel.connect(new InetSocketAddress(sendingHost, sendingPort));
dChannel.register(socketChannelSelector, SelectionKey.OP_READ);
}
/**
* Binds to listen for data grams on the given local IPAddress/port and
* restricts receipt of datagrams to those from the provided host and port,
* must specify both. This improves performance for datagrams coming from a
* sender that is known a-priori.
*
* @param nicIPAddress - if null will listen on wildcard address, which
* means datagrams will be received on all local network interfaces.
* Otherwise, will bind to the provided IP address associated with some NIC.
* @param port - the port to listen on. This is used to provide a well-known
* destination for a sender.
* @param receiveBufferSize - the number of bytes to request for a receive
* buffer from OS
* @param sendingHost - the hostname, or IP address, of the sender of
* datagrams. Only datagrams from this host will be received. If this is
* null the wildcard ip is used, which means datagrams may be received from
* any network interface on the local host.
* @param sendingPort - the port used by the sender of datagrams. Only
* datagrams from this port will be received.
* @throws IOException if unable to add channel
*/
public void addDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize, final String sendingHost,
final Integer sendingPort) throws IOException {
if (sendingHost == null || sendingPort == null) {
addDatagramChannel(nicIPAddress, port, receiveBufferSize);
return;
}
final DatagramChannel dChannel = createAndBindDatagramChannel(nicIPAddress, port, receiveBufferSize);
dChannel.connect(new InetSocketAddress(sendingHost, sendingPort));
dChannel.register(socketChannelSelector, SelectionKey.OP_READ);
}
/**
* Binds to listen for data grams on the given local IPAddress/port
*
* @param nicIPAddress - if null will listen on wildcard address, which
* means datagrams will be received on all local network interfaces.
* Otherwise, will bind to the provided IP address associated with some NIC.
* @param port - the port to listen on
* @param receiveBufferSize - the number of bytes to request for a receive
* buffer from OS
* @throws IOException if unable to add channel
*/
public void addDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize)
throws IOException {
final DatagramChannel dChannel = createAndBindDatagramChannel(nicIPAddress, port, receiveBufferSize);
dChannel.register(socketChannelSelector, SelectionKey.OP_READ);
}
/**
* Binds to listen for data grams on the given local IPAddress/port
*
* @param nicIPAddress - if null will listen on wildcard address, which
* means datagrams will be received on all local network interfaces.
* Otherwise, will bind to the provided IP address associated with some NIC.
* @param port - the port to listen on
* @param receiveBufferSize - the number of bytes to request for a receive
* buffer from OS
* @throws IOException if unable to add channel
*/
public void addDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize)
throws IOException {
final DatagramChannel dChannel = createAndBindDatagramChannel(nicIPAddress, port, receiveBufferSize);
dChannel.register(socketChannelSelector, SelectionKey.OP_READ);
}