下面列出了io.netty.util.concurrent.Future#get ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static byte[] syncWrite(String action, byte[] byteData) {
try {
CoreProto.TransportPackageData.Builder packageDataBuilder = CoreProto.TransportPackageData.newBuilder();
Map<Integer, String> header = new HashMap<Integer, String>();
header.put(CoreProto.HeaderKey.SITE_SERVER_VERSION_VALUE, CommandConst.SITE_VERSION);
packageDataBuilder.putAllHeader(header);
packageDataBuilder.setData(ByteString.copyFrom(byteData));
PlatformSSLClient nettyClient = new PlatformSSLClient();
nettyClient.connect(AKAXIN_PUSH_ADDRESS, AKAXIN_PUSH_PORT);
Future<IRedisCommandResponse> future = nettyClient.sendRedisCommand(new RedisCommand()
.add(CommandConst.PROTOCOL_VERSION).add(action).add(packageDataBuilder.build().toByteArray()));
IRedisCommandResponse response = future.get(5, TimeUnit.SECONDS);
nettyClient.disconnect();
if (response != null && response.isSuccess()) {
return getResponseBytes(response.getRedisCommand());
}
logger.debug("write push to platform finish response={}", response);
} catch (Exception e) {
logger.error("sync send package error ", e);
}
return null;
}
private void onSslHandshakeComplete(Future<? super Channel> result, SslHandler handler) {
try {
if(!result.isSuccess()) {
if (logger.isDebugEnabled()) {
Throwable cause = result.cause();
if (!(cause instanceof ClosedChannelException)) {
logger.debug("SSL handshake failed: {}", (cause == null) ? "unknown" : cause.getMessage(), cause);
}
}
return;
}
String clientName = extractClientName(handler);
if(clientName != null) {
Channel channel = (Channel) result.get();
Client.bind(channel, registry.load(clientName));
}
}
catch(Exception e) {
logger.debug("Unable to determine client auth", e);
}
}
@Test(timeout = 20000)
public void testCreateClient() throws InterruptedException, ExecutionException {
final InetSocketAddress serverAddress = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress();
final Channel serverChannel = createServer(serverAddress);
final Future<BGPSessionImpl> futureClient = this.clientDispatcher.createClient(this.clientAddress,
serverAddress, 2, true);
futureClient.sync();
final BGPSessionImpl session = futureClient.get();
Assert.assertEquals(State.UP, this.clientListener.getState());
Assert.assertEquals(State.UP, this.serverListener.getState());
Assert.assertEquals(AS_NUMBER, session.getAsNumber());
Assert.assertEquals(Sets.newHashSet(IPV_4_TT), session.getAdvertisedTableTypes());
Assert.assertTrue(serverChannel.isWritable());
session.close();
this.serverListener.releaseConnection();
checkIdleState(this.clientListener);
checkIdleState(this.serverListener);
}
public static void asyncWrite(String action, byte[] byteData) {
try {
CoreProto.TransportPackageData.Builder packageDataBuilder = CoreProto.TransportPackageData.newBuilder();
Map<Integer, String> header = new HashMap<Integer, String>();
header.put(CoreProto.HeaderKey.SITE_SERVER_VERSION_VALUE, CommandConst.SITE_VERSION);
packageDataBuilder.putAllHeader(header);
packageDataBuilder.setData(ByteString.copyFrom(byteData));
PlatformSSLClient nettyClient = new PlatformSSLClient();
nettyClient.connect(AKAXIN_PUSH_ADDRESS, AKAXIN_PUSH_PORT);
Future<IRedisCommandResponse> future = nettyClient.sendRedisCommand(new RedisCommand()
.add(CommandConst.PROTOCOL_VERSION).add(action).add(packageDataBuilder.build().toByteArray()));
IRedisCommandResponse response = future.get(5, TimeUnit.SECONDS);
logger.debug("write push to platform finish response={}", response);
nettyClient.disconnect();
} catch (Exception e) {
logger.error("async send package to platform error", e);
}
}
public static byte[] syncWrite(String action, byte[] byteData) {
try {
CoreProto.TransportPackageData.Builder packageDataBuilder = CoreProto.TransportPackageData.newBuilder();
Map<Integer, String> header = new HashMap<Integer, String>();
header.put(CoreProto.HeaderKey.SITE_SERVER_VERSION_VALUE, CommandConst.SITE_VERSION);
packageDataBuilder.putAllHeader(header);
packageDataBuilder.setData(ByteString.copyFrom(byteData));
PlatformSSLClient nettyClient = new PlatformSSLClient();
nettyClient.connect(AKAXIN_PLATFROM_HOST, AKAXIN_PLATFROM_PORT);
Future<IRedisCommandResponse> future = nettyClient.sendRedisCommand(new RedisCommand()
.add(CommandConst.PROTOCOL_VERSION).add(action).add(packageDataBuilder.build().toByteArray()));
IRedisCommandResponse response = future.get(5, TimeUnit.SECONDS);
nettyClient.disconnect();
if (response != null && response.isSuccess()) {
return getResponseBytes(response.getRedisCommand());
}
logger.debug("sync write data to platform with response={}", response);
} catch (Exception e) {
logger.error("sync send package error ", e);
}
return null;
}
public static byte[] syncWrite(String action, byte[] byteData) {
try {
CoreProto.TransportPackageData.Builder packageDataBuilder = CoreProto.TransportPackageData.newBuilder();
Map<Integer, String> header = new HashMap<Integer, String>();
header.put(CoreProto.HeaderKey.SITE_SERVER_VERSION_VALUE, CommandConst.SITE_VERSION);
packageDataBuilder.putAllHeader(header);
packageDataBuilder.setData(ByteString.copyFrom(byteData));
PlatformSSLClient nettyClient = new PlatformSSLClient();
nettyClient.connect(AKAXIN_PUSH_ADDRESS, AKAXIN_PUSH_PORT);
Future<IRedisCommandResponse> future = nettyClient.sendRedisCommand(new RedisCommand()
.add(CommandConst.PROTOCOL_VERSION).add(action).add(packageDataBuilder.build().toByteArray()));
IRedisCommandResponse response = future.get(5, TimeUnit.SECONDS);
nettyClient.disconnect();
if (response != null && response.isSuccess()) {
return getResponseBytes(response.getRedisCommand());
}
logger.debug("write push to platform finish response={}", response);
} catch (Exception e) {
logger.error("sync send package error ", e);
}
return null;
}
public static void asyncWrite(String action, byte[] byteData) {
try {
CoreProto.TransportPackageData.Builder packageDataBuilder = CoreProto.TransportPackageData.newBuilder();
Map<Integer, String> header = new HashMap<Integer, String>();
header.put(CoreProto.HeaderKey.SITE_SERVER_VERSION_VALUE, CommandConst.SITE_VERSION);
packageDataBuilder.putAllHeader(header);
packageDataBuilder.setData(ByteString.copyFrom(byteData));
PlatformSSLClient nettyClient = new PlatformSSLClient();
nettyClient.connect(AKAXIN_PUSH_ADDRESS, AKAXIN_PUSH_PORT);
Future<IRedisCommandResponse> future = nettyClient.sendRedisCommand(new RedisCommand()
.add(CommandConst.PROTOCOL_VERSION).add(action).add(packageDataBuilder.build().toByteArray()));
IRedisCommandResponse response = future.get(5, TimeUnit.SECONDS);
logger.debug("write push to platform finish response={}", response);
nettyClient.disconnect();
} catch (Exception e) {
logger.error("async send package to platform error", e);
}
}
public static byte[] syncWrite(String action, byte[] byteData) {
try {
CoreProto.TransportPackageData.Builder packageDataBuilder = CoreProto.TransportPackageData.newBuilder();
Map<Integer, String> header = new HashMap<Integer, String>();
header.put(CoreProto.HeaderKey.SITE_SERVER_VERSION_VALUE, CommandConst.SITE_VERSION);
packageDataBuilder.putAllHeader(header);
packageDataBuilder.setData(ByteString.copyFrom(byteData));
PlatformSSLClient nettyClient = new PlatformSSLClient();
nettyClient.connect(AKAXIN_PLATFROM_HOST, AKAXIN_PLATFROM_PORT);
Future<IRedisCommandResponse> future = nettyClient.sendRedisCommand(new RedisCommand()
.add(CommandConst.PROTOCOL_VERSION).add(action).add(packageDataBuilder.build().toByteArray()));
IRedisCommandResponse response = future.get(5, TimeUnit.SECONDS);
nettyClient.disconnect();
if (response != null && response.isSuccess()) {
return getResponseBytes(response.getRedisCommand());
}
logger.debug("sync write data to platform with response={}", response);
} catch (Exception e) {
logger.error("sync send package error ", e);
}
return null;
}
@Test
public void testConnectingSuccessfulConnection() throws Exception {
// set up fake origin backend server so we can connect to it, we connect to port 8888
// the outbound 8888 is specified in the ClientConnectionManagerIntegrationTest.conf
TlsConfig tlsConfig =
TlsConfig.builderFrom(ConfigFactory.load().getConfig("xio.h2BackendServer.settings.tls"))
.build();
server = OkHttpUnsafe.getSslMockWebServer(TlsHelper.getKeyManagers(tlsConfig));
server.setProtocols(Arrays.asList(Protocol.HTTP_2, Protocol.HTTP_1_1));
// tell the server to bind to 8888
server.start(8888);
subject = subjectFactory(true);
Future<Void> connectionResult = subject.connect();
assertEquals(ClientConnectionState.CONNECTING, subject.connectionState());
Thread.sleep(100); // todo: (WK) do something smarter
try {
connectionResult.get(5, TimeUnit.SECONDS);
} catch (Exception e) {
System.out.println("Connection exception = " + e.toString());
} finally {
assertEquals(ClientConnectionState.CONNECTED, subject.connectionState());
server.close();
}
}
@Test
public void badDownstreamAcquiresCausesException() throws Exception {
stubForIgnoredTimeout();
stubBadDownstreamAcquire();
Future<Channel> acquire = channelPool.acquire();
try {
acquire.get(5, TimeUnit.SECONDS);
} catch (ExecutionException e) {
// Expected
}
assertThat(acquire.isDone()).isTrue();
assertThat(acquire.isSuccess()).isFalse();
assertThat(acquire.cause()).isInstanceOf(IOException.class);
Mockito.verify(downstreamChannelPool, Mockito.times(1)).acquire(any());
}
@Test
public void slowAcquireTimesOut() throws Exception {
stubIncompleteDownstreamAcquire();
Mockito.when(eventLoopGroup.schedule(Mockito.any(Runnable.class), Mockito.eq(10), Mockito.eq(TimeUnit.MILLISECONDS)))
.thenAnswer(i -> scheduledFuture);
Future<Channel> acquire = channelPool.acquire();
ArgumentCaptor<Runnable> timeoutTask = ArgumentCaptor.forClass(Runnable.class);
Mockito.verify(eventLoopGroup).schedule(timeoutTask.capture(), anyLong(), any());
timeoutTask.getValue().run();
try {
acquire.get(5, TimeUnit.SECONDS);
} catch (ExecutionException e) {
// Expected
}
assertThat(acquire.isDone()).isTrue();
assertThat(acquire.isSuccess()).isFalse();
assertThat(acquire.cause()).isInstanceOf(TimeoutException.class);
Mockito.verify(downstreamChannelPool, Mockito.times(1)).acquire(any());
}
@Test
public void testConnectingFailingConnection() throws Exception {
subject = subjectFactory(false);
// don't set up fake origin backend server so we can connect to it
Future<Void> connectionResult = subject.connect();
assertEquals(ClientConnectionState.CONNECTING, subject.connectionState());
Thread.sleep(100); // todo: (WK) do something smarter
try {
connectionResult.get(30, TimeUnit.SECONDS);
} catch (Exception ignored) {
} finally {
assertEquals(ClientConnectionState.CLOSED_CONNECTION, subject.connectionState());
}
}
void close() throws Exception {
logger.debug("close(): stopping http server");
Future<?> workerShutdownFuture = workerGroup.shutdownGracefully(1, 5, SECONDS);
Future<?> bossShutdownFuture = bossGroup.shutdownGracefully(1, 5, SECONDS);
Stopwatch stopwatch = Stopwatch.createStarted();
workerShutdownFuture.get(10, SECONDS);
long remainingMillis = Math.max(0, 10000 - stopwatch.elapsed(MILLISECONDS));
bossShutdownFuture.get(remainingMillis, MILLISECONDS);
logger.debug("close(): http server stopped");
}
@Override
public void operationComplete(Future<Channel> future) throws Exception {
if(parent != null){
if(future.isSuccess()) {
Channel c = future.get();
parent.sslConnectionHandler.operationComplete(future);
parent.parent.setSslChannel(c);
} else {
throw new DrillException("SSL handshake failed.", future.cause());
}
} else {
throw new RpcException("RPC Setup error. SSL handshake complete handler is not set up.");
}
}
public HttpReceive request(HttpSend httpSend, long timeout, TimeUnit timeUnit) {
final HttpReceive httpReceive = new HttpReceive();
Future<Channel> fch = channelPool.acquire();
Channel channel = null;
try {
channel = fch.get(timeout, timeUnit);
ChannelPipeline p = channel.pipeline();
p.addLast(new HttpClientHandler(httpSend, httpReceive));
final FullHttpRequest fullHttpRequest = convertRequest(httpSend);
p.writeAndFlush(fullHttpRequest);
channel.closeFuture().await(timeout, timeUnit);
if (!httpReceive.getIsDone()) {
httpReceive.setHaveError(true);
httpReceive.setErrMsg("请求已经超时");
}
} catch (Exception e) {
if (log.isWarnEnabled()) {
log.warn(e.getMessage(), e);
}
httpReceive.setHaveError(true)
.setErrMsg(e.getMessage())
.setThrowable(e)
.setIsDone(true);
} finally {
if (channel != null) {
channelPool.release(channel);
}
}
return httpReceive;
}
@Test
public void acquireCanMakeJustOneCall() throws Exception {
stubForIgnoredTimeout();
stubAcquireHealthySequence(true);
Future<Channel> acquire = channelPool.acquire();
acquire.get(5, TimeUnit.SECONDS);
assertThat(acquire.isDone()).isTrue();
assertThat(acquire.isSuccess()).isTrue();
assertThat(acquire.getNow()).isEqualTo(channels.get(0));
Mockito.verify(downstreamChannelPool, Mockito.times(1)).acquire(any());
}
@Test
public void acquireCanMakeManyCalls() throws Exception {
stubForIgnoredTimeout();
stubAcquireHealthySequence(false, false, false, false, true);
Future<Channel> acquire = channelPool.acquire();
acquire.get(5, TimeUnit.SECONDS);
assertThat(acquire.isDone()).isTrue();
assertThat(acquire.isSuccess()).isTrue();
assertThat(acquire.getNow()).isEqualTo(channels.get(4));
Mockito.verify(downstreamChannelPool, Mockito.times(5)).acquire(any());
}
@Test
public void acquireActiveAndKeepAliveTrue_shouldAcquireOnce() throws Exception {
stubForIgnoredTimeout();
stubAcquireActiveAndKeepAlive();
Future<Channel> acquire = channelPool.acquire();
acquire.get(5, TimeUnit.SECONDS);
assertThat(acquire.isDone()).isTrue();
assertThat(acquire.isSuccess()).isTrue();
assertThat(acquire.getNow()).isEqualTo(channels.get(0));
Mockito.verify(downstreamChannelPool, Mockito.times(1)).acquire(any());
}
@Test
public void acquire_firstChannelKeepAliveFalse_shouldAcquireAnother() throws Exception {
stubForIgnoredTimeout();
stubAcquireTwiceFirstTimeNotKeepAlive();
Future<Channel> acquire = channelPool.acquire();
acquire.get(5, TimeUnit.SECONDS);
assertThat(acquire.isDone()).isTrue();
assertThat(acquire.isSuccess()).isTrue();
assertThat(acquire.getNow()).isEqualTo(channels.get(1));
Mockito.verify(downstreamChannelPool, Mockito.times(2)).acquire(any());
}
/**
* 停止網絡服務
*/
@Override
public void stop() {
this.state = ServiceState.STOPPED;
Future<?> bf = bossGroup.shutdownGracefully();
Future<?> wf = workerGroup.shutdownGracefully();
try {
bf.get(5000, TimeUnit.MILLISECONDS);
wf.get(5000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
LOGGER.info("Netty服务器关闭失败", e);
}
LOGGER.info("Netty Server on port:{} is closed", port);
}