下面列出了io.netty.channel.ChannelFuture#syncUninterruptibly ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test(expected = ClosedChannelException.class)
public void streamClosedErrorTranslatedToClosedChannelExceptionOnWrites() throws Exception {
writer = new Writer() {
@Override
void write(Object msg, ChannelPromise promise) {
promise.tryFailure(new StreamException(inboundStream.id(), Http2Error.STREAM_CLOSED, "Stream Closed"));
}
};
LastInboundHandler inboundHandler = new LastInboundHandler();
childChannelInitializer.handler = inboundHandler;
Channel childChannel = newOutboundStream();
assertTrue(childChannel.isActive());
ChannelFuture future = childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()));
parentChannel.flush();
assertFalse(childChannel.isActive());
assertFalse(childChannel.isOpen());
inboundHandler.checkException();
future.syncUninterruptibly();
}
@Test(expected = Http2NoMoreStreamIdsException.class)
public void failedOutboundStreamCreationThrowsAndClosesChannel() throws Exception {
writer = new Writer() {
@Override
void write(Object msg, ChannelPromise promise) {
promise.tryFailure(new Http2NoMoreStreamIdsException());
}
};
LastInboundHandler inboundHandler = new LastInboundHandler();
childChannelInitializer.handler = inboundHandler;
Channel childChannel = newOutboundStream();
assertTrue(childChannel.isActive());
ChannelFuture future = childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()));
parentChannel.flush();
assertFalse(childChannel.isActive());
assertFalse(childChannel.isOpen());
inboundHandler.checkException();
future.syncUninterruptibly();
}
@Test
public void testIntegratedAction_Success_Embedded()
throws IOException, SAXException, UndeterminedStateException {
// We simply use an embedded channel in this instance
setupEmbeddedChannel(messageHandler, actionHandler);
ChannelFuture future = actionHandler.getFinishedFuture();
channel.writeOutbound(message);
channel.writeInbound(
EppUtils.docToByteBuf(getResponse(message.getExpectedResponse(), false, USER_CLIENT_TRID)));
ChannelFuture unusedFuture = future.syncUninterruptibly();
assertThat(future.isSuccess()).isTrue();
}
@Override
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
// iothreads参数值,默认cpu线程数+1 小于32
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
@Override
protected void finishConnectEvent(ChannelFuture ch) {
super.finishConnectEvent(ch);
if (ch.isSuccess()) {
// start handskake
ChannelFuture handFuture = this.handshaker.handshake(this.getChannel());
if (this.isSyncConnect()) {
handFuture.syncUninterruptibly();
}
}
}
@Override
public Channel connect() {
final String ip = serviceInstance.getIp();
final int port = serviceInstance.getPort();
final ChannelFuture future = bootstrap.connect(new InetSocketAddress(ip, port));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
log.debug("future callback, connect to {}:{} success, channel={}",
ip, port, channelFuture.channel());
// 发送clientName包到server
if (communicationOptions.getProtocol() instanceof ServerPushProtocol) {
sendClientNameToServer(future);
}
} else {
log.debug("future callback, connect to {}:{} failed due to {}",
ip, port, channelFuture.cause().getMessage());
}
}
});
future.syncUninterruptibly();
if (future.isSuccess()) {
return future.channel();
} else {
// throw exception when connect failed to the connection pool acquirer
log.error("connect to {}:{} failed, msg={}", ip, port, future.cause().getMessage());
throw new RpcException(future.cause());
}
}
private ChannelFuture start() {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(new ChatServerInitializer(channelGroup,sslContext)) ;
ChannelFuture future = bootstrap.bind(new InetSocketAddress(PORT));
//同步
future.syncUninterruptibly();
channel = future.channel() ;
return future ;
}
@Test
public void testProbingActionGenerate_newChannel() throws UndeterminedStateException {
// Sets up Protocol for when we create a new channel.
Protocol testProtocol =
Protocol.builder()
.setHandlerProviders(ImmutableList.of(() -> conversionHandler, () -> testHandler))
.setName(PROTOCOL_NAME)
.setPort(PROTOCOL_PORT)
.setPersistentConnection(false)
.build();
// Sets up generic ProbingStep that we are testing.
ProbingStep testStep =
ProbingStep.builder()
.setMessageTemplate(new TestMessage(TEST_MESSAGE))
.setBootstrap(bootstrap)
.setDuration(Duration.ZERO)
.setProtocol(testProtocol)
.build();
// Sets up testToken to return arbitrary values, and no channel. Used when we create a new
// channel.
Token testToken = testToken(ADDRESS_NAME);
// Sets up server listening at LocalAddress so generated action can have successful connection.
nettyRule.setUpServer(address);
ProbingAction testAction = testStep.generateAction(testToken);
ChannelFuture connectionFuture = testAction.channel().attr(CONNECTION_FUTURE_KEY).get();
connectionFuture = connectionFuture.syncUninterruptibly();
assertThat(connectionFuture.isSuccess()).isTrue();
assertThat(testAction.delay()).isEqualTo(Duration.ZERO);
assertThat(testAction.outboundMessage().toString()).isEqualTo(ADDRESS_NAME);
assertThat(testAction.host()).isEqualTo(ADDRESS_NAME);
assertThat(testAction.protocol()).isEqualTo(testProtocol);
}
@Test
public void testSuccess_newChannel() throws Exception {
// setup
LocalAddress address = new LocalAddress(ADDRESS_NAME);
Bootstrap bootstrap =
new Bootstrap().group(nettyRule.getEventLoopGroup()).channel(LocalChannel.class);
// Sets up a Protocol corresponding to when a new connection is created.
Protocol protocol =
Protocol.builder()
.setHandlerProviders(ImmutableList.of(() -> conversionHandler, () -> testHandler))
.setName(PROTOCOL_NAME)
.setPort(TEST_PORT)
.setPersistentConnection(false)
.build();
nettyRule.setUpServer(address);
// Sets up a ProbingAction with existing channel using test specified attributes.
ProbingAction action =
ProbingAction.builder()
.setBootstrap(bootstrap)
.setProtocol(protocol)
.setDelay(Duration.ZERO)
.setOutboundMessage(new TestMessage(TEST_MESSAGE))
.setHost(ADDRESS_NAME)
.build();
// tests main function of ProbingAction
ChannelFuture future = action.call();
// Tests to see if message is properly sent to remote server
nettyRule.assertReceivedMessage(TEST_MESSAGE);
future = future.syncUninterruptibly();
// Tests to see that, since server responds, we have set future to true
assertThat(future.isSuccess()).isTrue();
assertThat(((TestActionHandler) testHandler).getResponse().toString()).isEqualTo(TEST_MESSAGE);
}
public ChannelFuture start(InetSocketAddress address) {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(createInitializer());
ChannelFuture future = bootstrap.bind(address);
future.syncUninterruptibly();
channel = future.channel();
return future;
}
@Override
public void shutdown() {
final ChannelFuture future = disconnectAsync();
if (future != null) {
future.syncUninterruptibly();
}
super.shutdown();
}
@Test
public void testBasicAction_Success_Embedded()
throws SAXException, IOException, EppClientException, FailureException {
// We simply use an embedded channel in this instance
setupEmbeddedChannel(actionHandler);
ChannelFuture future = actionHandler.getFinishedFuture();
EppResponseMessage response = message.getExpectedResponse();
response.getDocument(
EppUtils.docToByteBuf(getResponse(message.getExpectedResponse(), false, USER_CLIENT_TRID)));
channel.writeInbound(response);
ChannelFuture unusedFuture = future.syncUninterruptibly();
assertThat(future.isSuccess()).isTrue();
}
/**
*
*/
private void startHttpServer() {
try {
int port = Integer.parseInt(getConfig(HttpConstants.HTTP_PORT, "8018"));
int bossCount = Integer.parseInt(getConfig(HttpConstants.HTTP_BOSS_COUNT, "1"));
int workCount = Integer.parseInt(getConfig(HttpConstants.HTTP_WORK_COUNT, "200"));
SslContext sslCtx = null;
if (isHttpsEnabled()) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContext.newServerContext(ssc.certificate(), ssc.privateKey());
}
LOG.info("Starting http server, listen on port->{}", port);
bossGroup = new NioEventLoopGroup(bossCount);
workerGroup = new NioEventLoopGroup(workCount);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
// .handler(new LoggingHandler(LogLevel.ERROR))
.childHandler(new HttpServerInitializer(sslCtx));
b.option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_TIMEOUT, 3000)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000).option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_LINGER, 10).option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
// .option(ChannelOption.ALLOCATOR,
// PooledByteBufAllocator.DEFAULT)
;
ChannelFuture future = b.bind(port);
future.syncUninterruptibly();
srvCh = future.channel();
LOG.info("Start http server successfully!");
} catch (Exception e) {
LOG.error(e.getMessage());
try {
this.stop();
} catch (PluginException e1) {
}
}
}
/**
* Returns a link for the remote address if cached; otherwise opens, caches and returns.
* When it opens a link for the remote address, only one attempt for the address is made at a given time
*
* @param remoteAddr the remote socket address
* @param encoder the encoder
* @param listener the link listener
* @return a link associated with the address
*/
@Override
public <T> Link<T> open(final SocketAddress remoteAddr, final Encoder<? super T> encoder,
final LinkListener<? super T> listener) throws IOException {
Link<T> link = null;
for (int i = 0; i <= this.numberOfTries; ++i) {
LinkReference linkRef = this.addrToLinkRefMap.get(remoteAddr);
if (linkRef != null) {
link = (Link<T>) linkRef.getLink();
if (LOG.isLoggable(Level.FINE)) {
LOG.log(Level.FINE, "Link {0} for {1} found", new Object[]{link, remoteAddr});
}
if (link != null) {
return link;
}
}
if (i == this.numberOfTries) {
// Connection failure
throw new ConnectException("Connection to " + remoteAddr + " refused");
}
LOG.log(Level.FINE, "No cached link for {0} thread {1}",
new Object[]{remoteAddr, Thread.currentThread()});
// no linkRef
final LinkReference newLinkRef = new LinkReference();
final LinkReference prior = this.addrToLinkRefMap.putIfAbsent(remoteAddr, newLinkRef);
final AtomicInteger flag = prior != null ?
prior.getConnectInProgress() : newLinkRef.getConnectInProgress();
synchronized (flag) {
if (!flag.compareAndSet(0, 1)) {
while (flag.get() == 1) {
try {
flag.wait();
} catch (final InterruptedException ex) {
LOG.log(Level.WARNING, "Wait interrupted", ex);
}
}
}
}
linkRef = this.addrToLinkRefMap.get(remoteAddr);
link = (Link<T>) linkRef.getLink();
if (link != null) {
return link;
}
ChannelFuture connectFuture = null;
try {
connectFuture = this.clientBootstrap.connect(remoteAddr);
connectFuture.syncUninterruptibly();
link = new NettyLink<>(connectFuture.channel(), encoder, listener);
linkRef.setLink(link);
synchronized (flag) {
flag.compareAndSet(1, 2);
flag.notifyAll();
}
break;
} catch (final Exception e) {
if (e instanceof ConnectException) {
LOG.log(Level.WARNING, "Connection refused. Retry {0} of {1}",
new Object[]{i + 1, this.numberOfTries});
synchronized (flag) {
flag.compareAndSet(1, 0);
flag.notifyAll();
}
if (i < this.numberOfTries) {
try {
Thread.sleep(retryTimeout);
} catch (final InterruptedException interrupt) {
LOG.log(Level.WARNING, "Thread {0} interrupted while sleeping", Thread.currentThread());
}
}
} else {
throw e;
}
}
}
return link;
}