下面列出了java.nio.channels.AlreadyBoundException#com.sun.nio.sctp.MessageInfo 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
void checkGetterSetters(MessageInfo info) {
check(info.streamNumber(TEST_STREAM_NUMBER).streamNumber() ==
TEST_STREAM_NUMBER, "stream number not being set correctly");
check(info.complete(false).isComplete() == false,
"complete not being set correctly");
check(info.unordered(true).isUnordered() == true,
"unordered not being set correctly");
check(info.payloadProtocolID(TEST_PPID).payloadProtocolID() ==
TEST_PPID, "PPID not being set correctly");
check(info.timeToLive(TEST_TTL).timeToLive() == TEST_TTL,
"TTL not being set correctly");
}
void checkGetterSetters(MessageInfo info) {
check(info.streamNumber(TEST_STREAM_NUMBER).streamNumber() ==
TEST_STREAM_NUMBER, "stream number not being set correctly");
check(info.complete(false).isComplete() == false,
"complete not being set correctly");
check(info.unordered(true).isUnordered() == true,
"unordered not being set correctly");
check(info.payloadProtocolID(TEST_PPID).payloadProtocolID() ==
TEST_PPID, "PPID not being set correctly");
check(info.timeToLive(TEST_TTL).timeToLive() == TEST_TTL,
"TTL not being set correctly");
}
void checkGetterSetters(MessageInfo info) {
check(info.streamNumber(TEST_STREAM_NUMBER).streamNumber() ==
TEST_STREAM_NUMBER, "stream number not being set correctly");
check(info.complete(false).isComplete() == false,
"complete not being set correctly");
check(info.unordered(true).isUnordered() == true,
"unordered not being set correctly");
check(info.payloadProtocolID(TEST_PPID).payloadProtocolID() ==
TEST_PPID, "PPID not being set correctly");
check(info.timeToLive(TEST_TTL).timeToLive() == TEST_TTL,
"TTL not being set correctly");
}
@Override
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
SctpMessage packet = (SctpMessage) msg;
ByteBuf data = packet.content();
int dataLen = data.readableBytes();
if (dataLen == 0) {
return true;
}
ByteBufAllocator alloc = alloc();
boolean needsCopy = data.nioBufferCount() != 1;
if (!needsCopy) {
if (!data.isDirect() && alloc.isDirectBufferPooled()) {
needsCopy = true;
}
}
ByteBuffer nioData;
if (!needsCopy) {
nioData = data.nioBuffer();
} else {
data = alloc.directBuffer(dataLen).writeBytes(data);
nioData = data.nioBuffer();
}
final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
mi.payloadProtocolID(packet.protocolIdentifier());
mi.streamNumber(packet.streamIdentifier());
final int writtenBytes = javaChannel().send(nioData, mi);
return writtenBytes > 0;
}
private int send(int fd,
ByteBuffer src,
int assocId,
SocketAddress target,
MessageInfo messageInfo)
throws IOException {
int streamNumber = messageInfo.streamNumber();
boolean unordered = messageInfo.isUnordered();
int ppid = messageInfo.payloadProtocolID();
if (src instanceof DirectBuffer)
return sendFromNativeBuffer(fd, src, target, assocId,
streamNumber, unordered, ppid);
/* Substitute a native buffer */
int pos = src.position();
int lim = src.limit();
assert (pos <= lim && streamNumber >= 0);
int rem = (pos <= lim ? lim - pos : 0);
ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
try {
bb.put(src);
bb.flip();
/* Do not update src until we see how many bytes were written */
src.position(pos);
int n = sendFromNativeBuffer(fd, bb, target, assocId,
streamNumber, unordered, ppid);
if (n > 0) {
/* now update src */
src.position(pos + n);
}
return n;
} finally {
Util.releaseTemporaryDirectBuffer(bb);
}
}
void checkDefaults(MessageInfo info) {
check(info.isUnordered() == false, "default unordered value not false");
check(info.timeToLive() == 0L, "timeToLive should be 0L");
check(info.isComplete() == true, "default complete value not true");
check(info.payloadProtocolID() == 0, "default PPID not 0");
check(info.bytes() == 0, "default bytes value not 0");
check(info.streamNumber() == DEFAULT_STREAM_NUMBER,
"incorrect default stream number");
check(info.address().equals(addr), "incorrect address");
}
void checkDefaults(MessageInfo info) {
check(info.isUnordered() == false, "default unordered value not false");
check(info.timeToLive() == 0L, "timeToLive should be 0L");
check(info.isComplete() == true, "default complete value not true");
check(info.payloadProtocolID() == 0, "default PPID not 0");
check(info.bytes() == 0, "default bytes value not 0");
check(info.streamNumber() == DEFAULT_STREAM_NUMBER,
"incorrect default stream number");
check(info.address().equals(addr), "incorrect address");
}
void test(String[] args) {
/* TEST 1 : createOutGoing(SocketAddress,int) */
MessageInfo info = MessageInfo.createOutgoing(addr,
DEFAULT_STREAM_NUMBER);
checkDefaults(info);
checkGetterSetters(info);
/* TEST 2 : createOutGoing(Association,SocketAddress,int) */
info = MessageInfo.createOutgoing(assoc, addr, DEFAULT_STREAM_NUMBER);
checkDefaults(info);
check(info.association().equals(assoc), "incorrect association");
checkGetterSetters(info);
/* TEST 3: null values */
info = MessageInfo.createOutgoing(null, 0);
check(info.address() == null, "address should be null");
check(info.association() == null, "association should be null");
info = MessageInfo.createOutgoing(assoc, null, 0);
check(info.address() == null, "address should be null");
/* Test 4: IllegalArgumentException */
testIAE(new Runnable() {
public void run() { MessageInfo.createOutgoing(addr, -1); } });
testIAE(new Runnable() {
public void run() { MessageInfo.createOutgoing(addr, 65537); } });
testIAE(new Runnable() {
public void run() { MessageInfo.createOutgoing(null, addr, 0); } });
testIAE(new Runnable() {
public void run() { MessageInfo.createOutgoing(assoc, addr, -1); } });
testIAE(new Runnable() {
public void run() { MessageInfo.createOutgoing(assoc, addr, 65537);}});
final MessageInfo iaeInfo = MessageInfo.createOutgoing(assoc, addr, 0);
testIAE(new Runnable() {
public void run() { iaeInfo.streamNumber(-1); } });
testIAE(new Runnable() {
public void run() { iaeInfo.streamNumber(65537); } });
}
private int send(int fd,
ByteBuffer src,
int assocId,
SocketAddress target,
MessageInfo messageInfo)
throws IOException {
int streamNumber = messageInfo.streamNumber();
boolean unordered = messageInfo.isUnordered();
int ppid = messageInfo.payloadProtocolID();
if (src instanceof DirectBuffer)
return sendFromNativeBuffer(fd, src, target, assocId,
streamNumber, unordered, ppid);
/* Substitute a native buffer */
int pos = src.position();
int lim = src.limit();
assert (pos <= lim && streamNumber >= 0);
int rem = (pos <= lim ? lim - pos : 0);
ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
try {
bb.put(src);
bb.flip();
/* Do not update src until we see how many bytes were written */
src.position(pos);
int n = sendFromNativeBuffer(fd, bb, target, assocId,
streamNumber, unordered, ppid);
if (n > 0) {
/* now update src */
src.position(pos + n);
}
return n;
} finally {
Util.releaseTemporaryDirectBuffer(bb);
}
}
private int send(int fd,
ByteBuffer src,
int assocId,
SocketAddress target,
MessageInfo messageInfo)
throws IOException {
int streamNumber = messageInfo.streamNumber();
boolean unordered = messageInfo.isUnordered();
int ppid = messageInfo.payloadProtocolID();
if (src instanceof DirectBuffer)
return sendFromNativeBuffer(fd, src, target, assocId,
streamNumber, unordered, ppid);
/* Substitute a native buffer */
int pos = src.position();
int lim = src.limit();
assert (pos <= lim && streamNumber >= 0);
int rem = (pos <= lim ? lim - pos : 0);
ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
try {
bb.put(src);
bb.flip();
/* Do not update src until we see how many bytes were written */
src.position(pos);
int n = sendFromNativeBuffer(fd, bb, target, assocId,
streamNumber, unordered, ppid);
if (n > 0) {
/* now update src */
src.position(pos + n);
}
return n;
} finally {
Util.releaseTemporaryDirectBuffer(bb);
}
}
private int send(int fd, ByteBuffer src, MessageInfo messageInfo)
throws IOException {
int streamNumber = messageInfo.streamNumber();
SocketAddress target = messageInfo.address();
boolean unordered = messageInfo.isUnordered();
int ppid = messageInfo.payloadProtocolID();
if (src instanceof DirectBuffer)
return sendFromNativeBuffer(fd, src, target, streamNumber,
unordered, ppid);
/* Substitute a native buffer */
int pos = src.position();
int lim = src.limit();
assert (pos <= lim && streamNumber >= 0);
int rem = (pos <= lim ? lim - pos : 0);
ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
try {
bb.put(src);
bb.flip();
/* Do not update src until we see how many bytes were written */
src.position(pos);
int n = sendFromNativeBuffer(fd, bb, target, streamNumber,
unordered, ppid);
if (n > 0) {
/* now update src */
src.position(pos + n);
}
return n;
} finally {
Util.releaseTemporaryDirectBuffer(bb);
}
}
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SctpChannel ch = javaChannel();
RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
ByteBuf buffer = allocHandle.allocate(config().getAllocator());
boolean free = true;
try {
ByteBuffer data = buffer.internalNioBuffer(buffer.writerIndex(), buffer.writableBytes());
int pos = data.position();
MessageInfo messageInfo = ch.receive(data, null, notificationHandler);
if (messageInfo == null) {
return 0;
}
allocHandle.lastBytesRead(data.position() - pos);
buf.add(new SctpMessage(messageInfo,
buffer.writerIndex(buffer.writerIndex() + allocHandle.lastBytesRead())));
free = false;
return 1;
} catch (Throwable cause) {
PlatformDependent.throwException(cause);
return -1;
} finally {
if (free) {
buffer.release();
}
}
}
private int send(int fd,
ByteBuffer src,
int assocId,
SocketAddress target,
MessageInfo messageInfo)
throws IOException {
int streamNumber = messageInfo.streamNumber();
boolean unordered = messageInfo.isUnordered();
int ppid = messageInfo.payloadProtocolID();
if (src instanceof DirectBuffer)
return sendFromNativeBuffer(fd, src, target, assocId,
streamNumber, unordered, ppid);
/* Substitute a native buffer */
int pos = src.position();
int lim = src.limit();
assert (pos <= lim && streamNumber >= 0);
int rem = (pos <= lim ? lim - pos : 0);
ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
try {
bb.put(src);
bb.flip();
/* Do not update src until we see how many bytes were written */
src.position(pos);
int n = sendFromNativeBuffer(fd, bb, target, assocId,
streamNumber, unordered, ppid);
if (n > 0) {
/* now update src */
src.position(pos + n);
}
return n;
} finally {
Util.releaseTemporaryDirectBuffer(bb);
}
}
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SctpChannel ch = javaChannel();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config().getRecvByteBufAllocator().newHandle();
}
ByteBuf buffer = allocHandle.allocate(config().getAllocator());
boolean free = true;
try {
ByteBuffer data = buffer.internalNioBuffer(buffer.writerIndex(), buffer.writableBytes());
int pos = data.position();
MessageInfo messageInfo = ch.receive(data, null, notificationHandler);
if (messageInfo == null) {
return 0;
}
buf.add(new SctpMessage(messageInfo, buffer.writerIndex(buffer.writerIndex() + data.position() - pos)));
free = false;
return 1;
} catch (Throwable cause) {
PlatformDependent.throwException(cause);
return -1;
} finally {
int bytesRead = buffer.readableBytes();
allocHandle.record(bytesRead);
if (free) {
buffer.release();
}
}
}
void checkDefaults(MessageInfo info) {
check(info.isUnordered() == false, "default unordered value not false");
check(info.timeToLive() == 0L, "timeToLive should be 0L");
check(info.isComplete() == true, "default complete value not true");
check(info.payloadProtocolID() == 0, "default PPID not 0");
check(info.bytes() == 0, "default bytes value not 0");
check(info.streamNumber() == DEFAULT_STREAM_NUMBER,
"incorrect default stream number");
check(info.address().equals(addr), "incorrect address");
}
@Override
public int send(ByteBuffer buffer, MessageInfo messageInfo)
throws IOException {
if (buffer == null)
throw new IllegalArgumentException("buffer cannot be null");
if (messageInfo == null)
throw new IllegalArgumentException("messageInfo cannot be null");
checkAssociation(messageInfo.association());
checkStreamNumber(messageInfo.streamNumber());
synchronized (sendLock) {
ensureSendOpen();
int n = 0;
try {
begin();
synchronized (stateLock) {
if(!isOpen())
return 0;
senderThread = NativeThread.current();
}
do {
n = send(fdVal, buffer, messageInfo);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
} finally {
senderCleanup();
end((n > 0) || (n == IOStatus.UNAVAILABLE));
assert IOStatus.check(n);
}
}
}
private int send(int fd, ByteBuffer src, MessageInfo messageInfo)
throws IOException {
int streamNumber = messageInfo.streamNumber();
SocketAddress target = messageInfo.address();
boolean unordered = messageInfo.isUnordered();
int ppid = messageInfo.payloadProtocolID();
if (src instanceof DirectBuffer)
return sendFromNativeBuffer(fd, src, target, streamNumber,
unordered, ppid);
/* Substitute a native buffer */
int pos = src.position();
int lim = src.limit();
assert (pos <= lim && streamNumber >= 0);
int rem = (pos <= lim ? lim - pos : 0);
ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
try {
bb.put(src);
bb.flip();
/* Do not update src until we see how many bytes were written */
src.position(pos);
int n = sendFromNativeBuffer(fd, bb, target, streamNumber,
unordered, ppid);
if (n > 0) {
/* now update src */
src.position(pos + n);
}
return n;
} finally {
Util.releaseTemporaryDirectBuffer(bb);
}
}
void test(String[] args) {
/* TEST 1 : createOutGoing(SocketAddress,int) */
MessageInfo info = MessageInfo.createOutgoing(addr,
DEFAULT_STREAM_NUMBER);
checkDefaults(info);
checkGetterSetters(info);
/* TEST 2 : createOutGoing(Association,SocketAddress,int) */
info = MessageInfo.createOutgoing(assoc, addr, DEFAULT_STREAM_NUMBER);
checkDefaults(info);
check(info.association().equals(assoc), "incorrect association");
checkGetterSetters(info);
/* TEST 3: null values */
info = MessageInfo.createOutgoing(null, 0);
check(info.address() == null, "address should be null");
check(info.association() == null, "association should be null");
info = MessageInfo.createOutgoing(assoc, null, 0);
check(info.address() == null, "address should be null");
/* Test 4: IllegalArgumentException */
testIAE(new Runnable() {
public void run() { MessageInfo.createOutgoing(addr, -1); } });
testIAE(new Runnable() {
public void run() { MessageInfo.createOutgoing(addr, 65537); } });
testIAE(new Runnable() {
public void run() { MessageInfo.createOutgoing(null, addr, 0); } });
testIAE(new Runnable() {
public void run() { MessageInfo.createOutgoing(assoc, addr, -1); } });
testIAE(new Runnable() {
public void run() { MessageInfo.createOutgoing(assoc, addr, 65537);}});
final MessageInfo iaeInfo = MessageInfo.createOutgoing(assoc, addr, 0);
testIAE(new Runnable() {
public void run() { iaeInfo.streamNumber(-1); } });
testIAE(new Runnable() {
public void run() { iaeInfo.streamNumber(65537); } });
}
void test(String[] args) {
/* TEST 1 : createOutGoing(SocketAddress,int) */
MessageInfo info = MessageInfo.createOutgoing(addr,
DEFAULT_STREAM_NUMBER);
checkDefaults(info);
checkGetterSetters(info);
/* TEST 2 : createOutGoing(Association,SocketAddress,int) */
info = MessageInfo.createOutgoing(assoc, addr, DEFAULT_STREAM_NUMBER);
checkDefaults(info);
check(info.association().equals(assoc), "incorrect association");
checkGetterSetters(info);
/* TEST 3: null values */
info = MessageInfo.createOutgoing(null, 0);
check(info.address() == null, "address should be null");
check(info.association() == null, "association should be null");
info = MessageInfo.createOutgoing(assoc, null, 0);
check(info.address() == null, "address should be null");
/* Test 4: IllegalArgumentException */
testIAE(new Runnable() {
public void run() { MessageInfo.createOutgoing(addr, -1); } });
testIAE(new Runnable() {
public void run() { MessageInfo.createOutgoing(addr, 65537); } });
testIAE(new Runnable() {
public void run() { MessageInfo.createOutgoing(null, addr, 0); } });
testIAE(new Runnable() {
public void run() { MessageInfo.createOutgoing(assoc, addr, -1); } });
testIAE(new Runnable() {
public void run() { MessageInfo.createOutgoing(assoc, addr, 65537);}});
final MessageInfo iaeInfo = MessageInfo.createOutgoing(assoc, addr, 0);
testIAE(new Runnable() {
public void run() { iaeInfo.streamNumber(-1); } });
testIAE(new Runnable() {
public void run() { iaeInfo.streamNumber(65537); } });
}
void test(String[] args) {
/* TEST 1 : createOutGoing(SocketAddress,int) */
MessageInfo info = MessageInfo.createOutgoing(addr,
DEFAULT_STREAM_NUMBER);
checkDefaults(info);
checkGetterSetters(info);
/* TEST 2 : createOutGoing(Association,SocketAddress,int) */
info = MessageInfo.createOutgoing(assoc, addr, DEFAULT_STREAM_NUMBER);
checkDefaults(info);
check(info.association().equals(assoc), "incorrect association");
checkGetterSetters(info);
/* TEST 3: null values */
info = MessageInfo.createOutgoing(null, 0);
check(info.address() == null, "address should be null");
check(info.association() == null, "association should be null");
info = MessageInfo.createOutgoing(assoc, null, 0);
check(info.address() == null, "address should be null");
/* Test 4: IllegalArgumentException */
testIAE(new Runnable() {
public void run() { MessageInfo.createOutgoing(addr, -1); } });
testIAE(new Runnable() {
public void run() { MessageInfo.createOutgoing(addr, 65537); } });
testIAE(new Runnable() {
public void run() { MessageInfo.createOutgoing(null, addr, 0); } });
testIAE(new Runnable() {
public void run() { MessageInfo.createOutgoing(assoc, addr, -1); } });
testIAE(new Runnable() {
public void run() { MessageInfo.createOutgoing(assoc, addr, 65537);}});
final MessageInfo iaeInfo = MessageInfo.createOutgoing(assoc, addr, 0);
testIAE(new Runnable() {
public void run() { iaeInfo.streamNumber(-1); } });
testIAE(new Runnable() {
public void run() { iaeInfo.streamNumber(65537); } });
}
void checkDefaults(MessageInfo info) {
check(info.isUnordered() == false, "default unordered value not false");
check(info.timeToLive() == 0L, "timeToLive should be 0L");
check(info.isComplete() == true, "default complete value not true");
check(info.payloadProtocolID() == 0, "default PPID not 0");
check(info.bytes() == 0, "default bytes value not 0");
check(info.streamNumber() == DEFAULT_STREAM_NUMBER,
"incorrect default stream number");
check(info.address().equals(addr), "incorrect address");
}
void test(String[] args) {
/* TEST 1 : createOutGoing(SocketAddress,int) */
MessageInfo info = MessageInfo.createOutgoing(addr,
DEFAULT_STREAM_NUMBER);
checkDefaults(info);
checkGetterSetters(info);
/* TEST 2 : createOutGoing(Association,SocketAddress,int) */
info = MessageInfo.createOutgoing(assoc, addr, DEFAULT_STREAM_NUMBER);
checkDefaults(info);
check(info.association().equals(assoc), "incorrect association");
checkGetterSetters(info);
/* TEST 3: null values */
info = MessageInfo.createOutgoing(null, 0);
check(info.address() == null, "address should be null");
check(info.association() == null, "association should be null");
info = MessageInfo.createOutgoing(assoc, null, 0);
check(info.address() == null, "address should be null");
/* Test 4: IllegalArgumentException */
testIAE(new Runnable() {
public void run() { MessageInfo.createOutgoing(addr, -1); } });
testIAE(new Runnable() {
public void run() { MessageInfo.createOutgoing(addr, 65537); } });
testIAE(new Runnable() {
public void run() { MessageInfo.createOutgoing(null, addr, 0); } });
testIAE(new Runnable() {
public void run() { MessageInfo.createOutgoing(assoc, addr, -1); } });
testIAE(new Runnable() {
public void run() { MessageInfo.createOutgoing(assoc, addr, 65537);}});
final MessageInfo iaeInfo = MessageInfo.createOutgoing(assoc, addr, 0);
testIAE(new Runnable() {
public void run() { iaeInfo.streamNumber(-1); } });
testIAE(new Runnable() {
public void run() { iaeInfo.streamNumber(65537); } });
}
private int send(int fd, ByteBuffer src, MessageInfo messageInfo)
throws IOException {
int streamNumber = messageInfo.streamNumber();
SocketAddress target = messageInfo.address();
boolean unordered = messageInfo.isUnordered();
int ppid = messageInfo.payloadProtocolID();
if (src instanceof DirectBuffer)
return sendFromNativeBuffer(fd, src, target, streamNumber,
unordered, ppid);
/* Substitute a native buffer */
int pos = src.position();
int lim = src.limit();
assert (pos <= lim && streamNumber >= 0);
int rem = (pos <= lim ? lim - pos : 0);
ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
try {
bb.put(src);
bb.flip();
/* Do not update src until we see how many bytes were written */
src.position(pos);
int n = sendFromNativeBuffer(fd, bb, target, streamNumber,
unordered, ppid);
if (n > 0) {
/* now update src */
src.position(pos + n);
}
return n;
} finally {
Util.releaseTemporaryDirectBuffer(bb);
}
}
private int send(int fd,
ByteBuffer src,
int assocId,
SocketAddress target,
MessageInfo messageInfo)
throws IOException {
int streamNumber = messageInfo.streamNumber();
boolean unordered = messageInfo.isUnordered();
int ppid = messageInfo.payloadProtocolID();
if (src instanceof DirectBuffer)
return sendFromNativeBuffer(fd, src, target, assocId,
streamNumber, unordered, ppid);
/* Substitute a native buffer */
int pos = src.position();
int lim = src.limit();
assert (pos <= lim && streamNumber >= 0);
int rem = (pos <= lim ? lim - pos : 0);
ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
try {
bb.put(src);
bb.flip();
/* Do not update src until we see how many bytes were written */
src.position(pos);
int n = sendFromNativeBuffer(fd, bb, target, assocId,
streamNumber, unordered, ppid);
if (n > 0) {
/* now update src */
src.position(pos + n);
}
return n;
} finally {
Util.releaseTemporaryDirectBuffer(bb);
}
}
@Override
public int send(ByteBuffer buffer, MessageInfo messageInfo)
throws IOException {
if (buffer == null)
throw new IllegalArgumentException("buffer cannot be null");
if (messageInfo == null)
throw new IllegalArgumentException("messageInfo cannot be null");
checkAssociation(messageInfo.association());
checkStreamNumber(messageInfo.streamNumber());
synchronized (sendLock) {
ensureSendOpen();
int n = 0;
try {
begin();
synchronized (stateLock) {
if(!isOpen())
return 0;
senderThread = NativeThread.current();
}
do {
n = send(fdVal, buffer, messageInfo);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
} finally {
senderCleanup();
end((n > 0) || (n == IOStatus.UNAVAILABLE));
assert IOStatus.check(n);
}
}
}
private int send(int fd, ByteBuffer src, MessageInfo messageInfo)
throws IOException {
int streamNumber = messageInfo.streamNumber();
SocketAddress target = messageInfo.address();
boolean unordered = messageInfo.isUnordered();
int ppid = messageInfo.payloadProtocolID();
if (src instanceof DirectBuffer)
return sendFromNativeBuffer(fd, src, target, streamNumber,
unordered, ppid);
/* Substitute a native buffer */
int pos = src.position();
int lim = src.limit();
assert (pos <= lim && streamNumber >= 0);
int rem = (pos <= lim ? lim - pos : 0);
ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
try {
bb.put(src);
bb.flip();
/* Do not update src until we see how many bytes were written */
src.position(pos);
int n = sendFromNativeBuffer(fd, bb, target, streamNumber,
unordered, ppid);
if (n > 0) {
/* now update src */
src.position(pos + n);
}
return n;
} finally {
Util.releaseTemporaryDirectBuffer(bb);
}
}
@Override
public int send(ByteBuffer buffer, MessageInfo messageInfo)
throws IOException {
throw new UnsupportedOperationException(message);
}
@Override
public <T> MessageInfo receive(ByteBuffer dst, T attachment,
NotificationHandler<T> handler) throws IOException {
throw new UnsupportedOperationException(message);
}
void doTest(SocketAddress peerAddress) {
SctpMultiChannel channel = null;
ByteBuffer buffer = ByteBuffer.allocate(Util.LARGE_BUFFER);
MessageInfo info = MessageInfo.createOutgoing(null, 0);
try {
channel = SctpMultiChannel.open();
/* setup an association implicitly by sending a small message */
int streamNumber = 0;
debug("sending to " + peerAddress + " on stream number: " + streamNumber);
info = MessageInfo.createOutgoing(peerAddress, streamNumber);
buffer.put(Util.SMALL_MESSAGE.getBytes("ISO-8859-1"));
buffer.flip();
int position = buffer.position();
int remaining = buffer.remaining();
debug("sending small message: " + buffer);
int sent = channel.send(buffer, info);
check(sent == remaining, "sent should be equal to remaining");
check(buffer.position() == (position + sent),
"buffers position should have been incremented by sent");
/* Receive the COMM_UP */
buffer.clear();
BranchNotificationHandler handler = new BranchNotificationHandler();
info = channel.receive(buffer, null, handler);
check(handler.receivedCommUp(), "COMM_UP no received");
Set<Association> associations = channel.associations();
check(!associations.isEmpty(),"There should be some associations");
Association bassoc = associations.iterator().next();
/* TEST 1: branch */
SctpChannel bchannel = channel.branch(bassoc);
check(!bchannel.getAllLocalAddresses().isEmpty(),
"branched channel should be bound");
check(!bchannel.getRemoteAddresses().isEmpty(),
"branched channel should be connected");
check(channel.associations().isEmpty(),
"there should be no associations since the only one was branched off");
buffer.clear();
info = bchannel.receive(buffer, null, null);
buffer.flip();
check(info != null, "info is null");
check(info.streamNumber() == streamNumber,
"message not sent on the correct stream");
check(info.bytes() == Util.SMALL_MESSAGE.getBytes("ISO-8859-1").
length, "bytes received not equal to message length");
check(info.bytes() == buffer.remaining(), "bytes != remaining");
check(Util.compare(buffer, Util.SMALL_MESSAGE),
"received message not the same as sent message");
} catch (IOException ioe) {
unexpected(ioe);
} finally {
clientFinishedLatch.countDown();
try { serverFinishedLatch.await(10L, TimeUnit.SECONDS); }
catch (InterruptedException ie) { unexpected(ie); }
if (channel != null) {
try { channel.close(); }
catch (IOException e) { unexpected (e);}
}
}
}
@Override
public MessageInfo timeToLive(long millis) {
timeToLive = millis;
return this;
}