下面列出了io.netty.channel.ChannelFuture#await ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testNoOfflineNotifsExpected() throws Exception {
Profile profile = parseProfile(readTestUserProfile());
DashBoard dashBoard = profile.getDashById(1);
dashBoard.isNotificationsOff = true;
Notification notification = dashBoard.getNotificationWidget();
notification.notifyWhenOffline = false;
clientPair.appClient.updateDash(dashBoard);
clientPair.appClient.verifyResult(ok(1));
ChannelFuture channelFuture = clientPair.hardwareClient.stop();
channelFuture.await();
verify(holder.gcmWrapper, after(500).never()).send(any(), any(), any());
clientPair.appClient.never(deviceOffline(0, "1-0"));
}
@Override
public void startService() {
super.startService();
if(this.channel==null){
eventLoopGroup = new NioEventLoopGroup(3);
Bootstrap boot = NettyUtils.buildBootStrap(eventLoopGroup,this);
boot.remoteAddress(this.getHost(), this.getPort());
try {
ChannelFuture f = boot.connect().sync();
f.await();
this.channel = (AbstractChannel)f.channel();
this.fireStartNetListeners();
} catch (InterruptedException e) {
logger.info("interrupted start to exist");
this.stopService();
}
}
}
public void testConnectCancellation(Bootstrap cb) throws Throwable {
cb.handler(new TestHandler()).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 4000);
ChannelFuture future = cb.connect(BAD_HOST, BAD_PORT);
try {
if (future.await(1000)) {
if (future.isSuccess()) {
fail("A connection attempt to " + BAD_HOST + " must not succeed.");
} else {
throw future.cause();
}
}
if (future.cancel(true)) {
assertThat(future.channel().closeFuture().await(500), is(true));
assertThat(future.isCancelled(), is(true));
} else {
// Cancellation not supported by the transport.
}
} finally {
future.channel().close();
}
}
@Test
public void testHardwareDeviceWentOfflineAndPushWorks() throws Exception {
Profile profile = parseProfile(readTestUserProfile());
Notification notification = profile.getDashById(1).getNotificationWidget();
notification.notifyWhenOffline = true;
clientPair.appClient.updateDash(profile.getDashById(1));
clientPair.appClient.verifyResult(ok(1));
ChannelFuture channelFuture = clientPair.hardwareClient.stop();
channelFuture.await();
ArgumentCaptor<AndroidGCMMessage> objectArgumentCaptor = ArgumentCaptor.forClass(AndroidGCMMessage.class);
verify(holder.gcmWrapper, timeout(500).times(1)).send(objectArgumentCaptor.capture(), any(), any());
AndroidGCMMessage message = objectArgumentCaptor.getValue();
String expectedJson = new AndroidGCMMessage("token", Priority.normal, "Your My Device went offline.", 1).toJson();
assertEquals(expectedJson, message.toJson());
}
@Test
public void testSendPinModeCommandWhenHardwareGoesOnline() throws Exception {
ChannelFuture channelFuture = clientPair.hardwareClient.stop();
channelFuture.await();
assertTrue(channelFuture.isDone());
String body = "vw 13 1";
clientPair.appClient.send("hardware 1 " + body);
verify(clientPair.appClient.responseMock, timeout(500)).channelRead(any(), eq(new ResponseMessage(1, DEVICE_NOT_IN_NETWORK)));
TestHardClient hardClient = new TestHardClient("localhost", properties.getHttpPort());
hardClient.start();
hardClient.login(clientPair.token);
verify(hardClient.responseMock, timeout(1000)).channelRead(any(), eq(ok(1)));
verify(hardClient.responseMock, timeout(500)).channelRead(any(), eq(hardware(1, "pm 1 out 2 out 3 out 5 out 6 in 7 in 30 in 8 in")));
verify(hardClient.responseMock, times(2)).channelRead(any(), any());
hardClient.stop().awaitUninterruptibly();
}
public void testConnectCancellation(Bootstrap cb) throws Throwable {
cb.handler(new TestHandler()).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 4000);
ChannelFuture future = cb.connect(BAD_HOST, BAD_PORT);
try {
if (future.await(1000)) {
if (future.isSuccess()) {
fail("A connection attempt to " + BAD_HOST + " must not succeed.");
} else {
throw future.cause();
}
}
if (future.cancel(true)) {
assertThat(future.channel().closeFuture().await(500), is(true));
assertThat(future.isCancelled(), is(true));
} else {
// Cancellation not supported by the transport.
}
} finally {
future.channel().close();
}
}
@Override
public void start() {
if (bootstrap == null) {
createBootstrap();
}
try {
ChannelFuture future = doConnect();
future.await();
client = future.channel();
future.sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
stop();
}
}));
}
}
/**
* connects to all replicas
*
* @throws InterruptedException
*/
@SuppressFBWarnings("SIC_INNER_SHOULD_BE_STATIC_ANON")
public void connect() throws InterruptedException {
for (Entry<Integer, Integer> e : this.serverList.entrySet()) {
int replicaId = e.getKey();
int replicaPort = e.getValue();
if (replicaId != myServerId) {
Bootstrap b = new Bootstrap();
b.group(loopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// enable SSL/TLS support
SSLEngine engine = SSLContextFactory.getClientContext().createSSLEngine();
engine.setUseClientMode(true);
ch.pipeline().addLast(
new SslHandler(engine),
new ObjectEncoder(),
new ObjectDecoder(OzymandiasServer.maxObjectSize, ClassResolvers.cacheDisabled(null)));
}
});
/* wait till server is connected */
ChannelFuture f = null;
do {
f = b.connect("127.0.0.1", replicaPort);
f.await();
} while (!(f.isDone() && f.isSuccess()));
this.channels.add(f.sync().channel());
}
}
}
@Test
public void test_ungraceful_disconnect_remove_mapping() throws Exception {
final String[] topics = new String[]{"topic1", "topic2", "topic3"};
embeddedChannel.attr(ChannelAttributes.TOPIC_ALIAS_MAPPING).set(topics);
final ChannelFuture future = embeddedChannel.close();
future.await();
verify(topicAliasLimiter).finishUsage(topics);
}
@Test
public void test_ungraceful_disconnect_metric() throws Exception {
final ChannelFuture future = embeddedChannel.close();
future.await();
assertEquals(1, metricsHolder.getClosedConnectionsCounter().getCount());
}
@Override
protected void test() throws Exception {
final long TIMEOUT = 2000;
for (ChannelHandler h: clientHandlers) {
if (h instanceof ProxyHandler) {
((ProxyHandler) h).setConnectTimeoutMillis(TIMEOUT);
}
}
final FailureTestHandler testHandler = new FailureTestHandler();
Bootstrap b = new Bootstrap();
b.group(group);
b.channel(NioSocketChannel.class);
b.resolver(NoopAddressResolverGroup.INSTANCE);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(clientHandlers);
p.addLast(new LineBasedFrameDecoder(64));
p.addLast(testHandler);
}
});
ChannelFuture cf = b.connect(DESTINATION).channel().closeFuture();
boolean finished = cf.await(TIMEOUT * 2, TimeUnit.MILLISECONDS);
finished &= testHandler.latch.await(TIMEOUT * 2, TimeUnit.MILLISECONDS);
logger.debug("Recorded exceptions: {}", testHandler.exceptions);
assertProxyHandlers(false);
assertThat(testHandler.exceptions.size(), is(1));
Throwable e = testHandler.exceptions.poll();
assertThat(e, is(instanceOf(ProxyConnectException.class)));
assertThat(String.valueOf(e), containsString("timeout"));
assertThat(finished, is(true));
}
@Test
public void unknownFrameTypeShouldThrowAndBeReleased() throws Exception {
class UnknownHttp2Frame extends AbstractReferenceCounted implements Http2Frame {
@Override
public String name() {
return "UNKNOWN";
}
@Override
protected void deallocate() {
}
@Override
public ReferenceCounted touch(Object hint) {
return this;
}
}
UnknownHttp2Frame frame = new UnknownHttp2Frame();
assertEquals(1, frame.refCnt());
ChannelFuture f = channel.write(frame);
f.await();
assertTrue(f.isDone());
assertFalse(f.isSuccess());
assertThat(f.cause(), instanceOf(UnsupportedMessageTypeException.class));
assertEquals(0, frame.refCnt());
}
@Test
public void testTooManyServerChannels() throws Exception {
EventLoopGroup g = new OioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.channel(OioServerSocketChannel.class);
b.group(g);
b.childHandler(new ChannelInboundHandlerAdapter());
ChannelFuture f1 = b.bind(0);
f1.sync();
ChannelFuture f2 = b.bind(0);
f2.await();
assertThat(f2.cause(), is(instanceOf(ChannelException.class)));
assertThat(f2.cause().getMessage().toLowerCase(), containsString("too many channels"));
final CountDownLatch notified = new CountDownLatch(1);
f2.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
notified.countDown();
}
});
notified.await();
g.shutdownGracefully();
}
@Test
public void testTooManyClientChannels() throws Exception {
EventLoopGroup g = new OioEventLoopGroup(1);
ServerBootstrap sb = new ServerBootstrap();
sb.channel(OioServerSocketChannel.class);
sb.group(g);
sb.childHandler(new ChannelInboundHandlerAdapter());
ChannelFuture f1 = sb.bind(0);
f1.sync();
Bootstrap cb = new Bootstrap();
cb.channel(OioSocketChannel.class);
cb.group(g);
cb.handler(new ChannelInboundHandlerAdapter());
ChannelFuture f2 = cb.connect(NetUtil.LOCALHOST, ((InetSocketAddress) f1.channel().localAddress()).getPort());
f2.await();
assertThat(f2.cause(), is(instanceOf(ChannelException.class)));
assertThat(f2.cause().getMessage().toLowerCase(), containsString("too many channels"));
final CountDownLatch notified = new CountDownLatch(1);
f2.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
notified.countDown();
}
});
notified.await();
g.shutdownGracefully();
}
/**
* 执行
*
* @param supplier
* @return
*/
protected boolean execute(final Supplier<ChannelFuture> supplier) {
ChannelFuture future = supplier.get();
try {
future.await();
} catch (InterruptedException e) {
}
return future.isSuccess();
}
public boolean start() {
try {
ChannelFuture future = boot.connect(node.getIp(), node.getRemotePort());
future.await();
return future.isSuccess();
} catch (Exception e) {
Log.error(e);
if (node.getChannel() != null) {
node.getChannel().close();
}
return false;
}
}
@Test
public void testPushWhenHardwareOffline() throws Exception {
ChannelFuture channelFuture = clientPair.hardwareClient.stop();
channelFuture.await();
ArgumentCaptor<AndroidGCMMessage> objectArgumentCaptor = ArgumentCaptor.forClass(AndroidGCMMessage.class);
verify(holder.gcmWrapper, timeout(750).times(1)).send(objectArgumentCaptor.capture(), any(), any());
AndroidGCMMessage message = objectArgumentCaptor.getValue();
String expectedJson = new AndroidGCMMessage("token", Priority.normal, "Your My Device went offline.", 1).toJson();
assertEquals(expectedJson, message.toJson());
}
@Test
public void testTooManyServerChannels() throws Exception {
EventLoopGroup g = new OioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.channel(OioServerSocketChannel.class);
b.group(g);
b.childHandler(new ChannelInboundHandlerAdapter());
ChannelFuture f1 = b.bind(0);
f1.sync();
ChannelFuture f2 = b.bind(0);
f2.await();
assertThat(f2.cause(), is(instanceOf(ChannelException.class)));
assertThat(f2.cause().getMessage().toLowerCase(), containsString("too many channels"));
final CountDownLatch notified = new CountDownLatch(1);
f2.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
notified.countDown();
}
});
notified.await();
g.shutdownGracefully();
}
@SuppressWarnings("unused")
public void disconnect() throws InterruptedException {
final ChannelFuture disconnectFuture = disconnectAsync();
if (disconnectFuture != null) {
disconnectFuture.await();
}
}
@Override
public IMessage send(Object message) throws InterruptedException {
if (client.getChannel() == null) {
throw new EPSCommonException("Channel not ready (channel == null)");
}
if (!(message instanceof IMessage)) {
throw new EPSCommonException("Illegal type of Message");
}
IMessage msg = (IMessage) message;
ChannelFuture future = client.getChannel().writeAndFlush(msg)
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
boolean isSendSuccess = true;
StringBuilder errorMsg = new StringBuilder("Cause: ");
if (future.await(1000)) {
if (!future.isDone()) {
errorMsg.append("Send operation is not done.\n");
logger.error("Send operation is not done. Session: {}", this);
isSendSuccess = false;
}
if (!future.isSuccess()) {
errorMsg.append("Write operation was not successful.\n");
logger.error("Write operation was not successful. Session: {}", this);
isSendSuccess = false;
}
} else {
errorMsg.append("Send operation is not completed.\n");
logger.error("Send operation is not completed. Session: {}", this);
isSendSuccess = false;
}
if(future.cause() != null) {
throw new EPSCommonException("Message sent failed. Session: " + this, future.cause());
}
if (!isSendSuccess) {
throw new SendMessageFailedException(
"Message wasn't send during 1 second." + errorMsg + " Session: " + this);
}
return msg;
}