下面列出了io.netty.channel.ChannelFuture#sync ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testTooManyAcceptedChannels() throws Exception {
EventLoopGroup g = new OioEventLoopGroup(1);
ServerBootstrap sb = new ServerBootstrap();
sb.channel(OioServerSocketChannel.class);
sb.group(g);
sb.childHandler(new ChannelInboundHandlerAdapter());
ChannelFuture f1 = sb.bind(0);
f1.sync();
Socket s = new Socket(NetUtil.LOCALHOST, ((InetSocketAddress) f1.channel().localAddress()).getPort());
assertThat(s.getInputStream().read(), is(-1));
s.close();
g.shutdownGracefully();
}
@Test(timeout = 20000)
public void testCreateServer() throws Exception {
final int port = InetSocketAddressUtil.getRandomPort();
final BmpDispatcherImpl bmpDispatcher = new BmpDispatcherImpl(
new NioEventLoopGroup(), new NioEventLoopGroup(), this.registry, this.sessionFactory);
final ChannelFuture futureServer = this.bmpMockDispatcher.createServer(
new InetSocketAddress(InetAddresses.forString("0.0.0.0"), port));
futureServer.sync();
final ChannelFuture channelFuture = bmpDispatcher.createClient(
InetSocketAddressUtil.getRandomLoopbackInetSocketAddress(port), this.slf, KeyMapping.getKeyMapping());
final Channel channel = channelFuture.sync().channel();
assertTrue(channel.isActive());
checkEquals(() -> assertTrue(this.sl.getStatus()));
assertTrue(futureServer.channel().isActive());
channel.close();
bmpDispatcher.close();
this.bmpMockDispatcher.close();
checkEquals(() -> assertFalse(this.sl.getStatus()));
}
@Override
public void bind(SocketAddress address) throws IOException {
InetSocketAddress inetAddress = (InetSocketAddress) address;
ChannelFuture f = bootstrap.bind(inetAddress);
Channel channel = f.channel();
channelGroup.add(channel);
try {
f.sync();
SocketAddress bound = channel.localAddress();
boundAddresses.put(bound, channel);
channel.closeFuture().addListener(fut -> {
boundAddresses.remove(bound);
});
} catch (Exception e) {
throw Helper.toIOException(e);
}
}
@Override
public void start() {
if (bootstrap == null) {
createBootstrap();
}
try {
ChannelFuture future = doConnect();
future.await();
client = future.channel();
future.sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
stop();
}
}));
}
}
@Override
public void unbind(SocketAddress address) {
Channel channel = boundAddresses.get(address);
if (channel != null) {
ChannelFuture fut;
if (channel.isOpen()) {
fut = channel.close();
} else {
fut = channel.closeFuture();
}
try {
fut.sync();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
/**
* Stitches all channel handlers into server bootstrap.
*/
private void run() {
try {
final Bootstrap bootstrap = createServerBootstrap();
configBootstrapOptions(bootstrap);
lispPorts.forEach(p -> {
InetSocketAddress sa = new InetSocketAddress(p);
channelFutures.add(bootstrap.bind(sa));
log.info("Listening for LISP router connections on {}", sa);
});
for (ChannelFuture f : channelFutures) {
f.sync();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void unbind(SocketAddress address) {
Channel channel = boundAddresses.get(address);
if (channel != null) {
ChannelFuture fut;
if (channel.isOpen()) {
fut = channel.close();
} else {
fut = channel.closeFuture();
}
try {
fut.sync();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
public OvsdbClient connectWithSsl(final InetAddress address, final int port,
final ICertificateManager certificateManagerSrv) {
final ChannelFuture future = bootstrapFactory.newClient()
.handler(certificateManagerSrv == null ? new ClientChannelInitializer() :
new SslClientChannelInitializer(certificateManagerSrv, address, port))
.connect(address, port);
try {
future.sync();
} catch (InterruptedException e) {
LOG.warn("Failed to connect {}:{}", address, port, e);
return null;
} catch (Throwable throwable) {
// sync() re-throws exceptions declared as Throwable, so the compiler doesn't see them
LOG.error("Error while binding to address {}, port {}", address, port, throwable);
throw throwable;
}
return getChannelClient(future.channel(), ConnectionType.ACTIVE, SocketConnectionType.SSL);
}
@Test(timeout = 20000)
public void testCustomizeBootstrap() throws InterruptedException {
final int port = InetSocketAddressUtil.getRandomPort();
final InetSocketAddress clientAddr1 = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress(port);
final InetSocketAddress clientAddr2 = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress(port);
final KeyMapping keys = KeyMapping.getKeyMapping(clientAddr1.getAddress(), "CLIENT1_ADDRESS");
keys.put(clientAddr2.getAddress(), "CLIENT2_ADDRESS".getBytes());
doReturn(new InetSocketAddress("0.0.0.0", port)).when(this.dispatcherDependencies).getAddress();
doReturn(this.listenerFactory).when(this.dispatcherDependencies).getListenerFactory();
doReturn(new SimpleSessionListener()).when(this.listenerFactory).getSessionListener();
final ChannelFuture futureChannel = this.disp2Spy.createServer(this.dispatcherDependencies);
futureChannel.sync();
Mockito.verify(this.disp2Spy).createServerBootstrap(any(PCEPDispatcherImpl.ChannelPipelineInitializer.class));
}
@Test(timeout = 20000)
public void testMainInPassiveMode() throws Exception {
final InetSocketAddress serverAddr = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress();
final BmpSessionListenerFactory bmpSessionListenerFactory = () -> BmpMockTest.this.sessionListener;
// create a local server in passive mode instead
final List<ChannelFuture> futureServers = BmpMock.deploy(new String[]
{"--local_address", InetSocketAddressUtil.toHostAndPort(serverAddr).toString(),
"--peers_count", "3", "--pre_policy_routes", "3", "--passive"});
Assert.assertEquals(1, futureServers.size());
futureServers.get(0).sync();
final ChannelFuture futureClient = this.bmpDispatcher.createClient(serverAddr,
bmpSessionListenerFactory, KeyMapping.getKeyMapping());
futureClient.sync();
final Channel serverChannel;
final int sessionUpWait;
if (futureClient.isSuccess()) {
serverChannel = futureClient.channel();
sessionUpWait = 10;
} else {
serverChannel = null;
// wait longer for the reconnection attempt
sessionUpWait = 40;
}
verify(this.sessionListener, Mockito.timeout(TimeUnit.SECONDS.toMillis(sessionUpWait)))
.onSessionUp(Mockito.any(BmpSession.class));
//1 * Initiate message + 3 * PeerUp Notification + 9 * Route Monitoring message
verify(this.sessionListener, Mockito.timeout(TimeUnit.SECONDS.toMillis(10))
.times(13))
.onMessage(Mockito.any(Notification.class));
if (serverChannel != null) {
serverChannel.close().sync();
}
}
@Test(timeout = 20000)
public void testClientReconnect() throws Exception {
final Future<PCEPSession> futureSession = this.dispatcher
.createClient(this.serverAddress, 1, new TestingSessionListenerFactory(), this.nf,
KeyMapping.getKeyMapping(), this.clientAddress);
final TestingSessionListenerFactory slf = new TestingSessionListenerFactory();
doReturn(slf).when(this.dispatcherDependencies).getListenerFactory();
final ChannelFuture futureServer = this.pcepDispatcher.createServer(this.dispatcherDependencies);
futureServer.sync();
final Channel channel = futureServer.channel();
Assert.assertNotNull(futureSession.get());
checkSessionListenerNotNull(slf, this.clientAddress.getHostString());
final TestingSessionListener sl
= checkSessionListenerNotNull(slf, this.clientAddress.getAddress().getHostAddress());
Assert.assertNotNull(sl.getSession());
Assert.assertTrue(sl.isUp());
channel.close().get();
closeEventLoopGroups();
this.workerGroup = new NioEventLoopGroup();
this.bossGroup = new NioEventLoopGroup();
this.pcepDispatcher = new PCEPDispatcherImpl(ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance()
.getMessageHandlerRegistry(),
this.nf, this.bossGroup, this.workerGroup);
final TestingSessionListenerFactory slf2 = new TestingSessionListenerFactory();
doReturn(slf2).when(this.dispatcherDependencies).getListenerFactory();
final ChannelFuture future2 = this.pcepDispatcher.createServer(this.dispatcherDependencies);
future2.sync();
final Channel channel2 = future2.channel();
final TestingSessionListener sl2
= checkSessionListenerNotNull(slf2, this.clientAddress.getAddress().getHostAddress());
Assert.assertNotNull(sl2.getSession());
Assert.assertTrue(sl2.isUp());
channel2.close();
}
/**
* Write the given object to the channel.
* @param object
*/
public void send(Object obj) throws InterruptedException {
ChannelFuture lastWriteFuture = null;
lastWriteFuture = channel.writeAndFlush(obj);
if (lastWriteFuture != null) {
lastWriteFuture.sync();
}
}
@Test
public void testTooManyServerChannels() throws Exception {
EventLoopGroup g = new OioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.channel(OioServerSocketChannel.class);
b.group(g);
b.childHandler(new ChannelInboundHandlerAdapter());
ChannelFuture f1 = b.bind(0);
f1.sync();
ChannelFuture f2 = b.bind(0);
f2.await();
assertThat(f2.cause(), is(instanceOf(ChannelException.class)));
assertThat(f2.cause().getMessage().toLowerCase(), containsString("too many channels"));
final CountDownLatch notified = new CountDownLatch(1);
f2.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
notified.countDown();
}
});
notified.await();
g.shutdownGracefully();
}
private void initFactory(String host, int port, final PipelineInitializer pipeliner) {
ServerBootstrap bootsrap = new ServerBootstrap();
bootsrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
try {
pipeliner.init(pipeline);
} catch (Throwable th) {
LOG.error("Severe error during pipeline creation", th);
throw th;
}
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true);
try {
// Bind and start to accept incoming connections.
ChannelFuture future = bootsrap.bind(host, port);
LOG.info("Server binded host: {}, port: {}", host, port);
future.sync();
} catch (InterruptedException ex) {
LOG.error(null, ex);
}
}
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx = SslContext.newClientContext(InsecureTrustManagerFactory.INSTANCE);
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new SecureChatClientInitializer(sslCtx));
// Start the connection attempt.
Channel ch = b.connect(HOST, PORT).sync().channel();
// Read commands from the stdin.
ChannelFuture lastWriteFuture = null;
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
for (;;) {
String line = in.readLine();
if (line == null) {
break;
}
// Sends the received line to the server.
lastWriteFuture = ch.writeAndFlush(line + "\r\n");
// If user typed the 'bye' command, wait until the server closes
// the connection.
if ("bye".equals(line.toLowerCase())) {
ch.closeFuture().sync();
break;
}
}
// Wait until all messages are flushed before closing the channel.
if (lastWriteFuture != null) {
lastWriteFuture.sync();
}
} finally {
// The connection is closed automatically on shutdown.
group.shutdownGracefully();
}
}
@After
public void tearDown() throws InterruptedException {
ChannelFuture clientCloseFuture = null;
ChannelFuture serverConnectedCloseFuture = null;
ChannelFuture serverCloseFuture = null;
if (clientChannel != null) {
clientCloseFuture = clientChannel.close();
clientChannel = null;
}
if (serverConnectedChannel != null) {
serverConnectedCloseFuture = serverConnectedChannel.close();
serverConnectedChannel = null;
}
if (serverChannel != null) {
serverCloseFuture = serverChannel.close();
serverChannel = null;
}
// We must wait for the Channel cleanup to finish. In the case if the ReferenceCountedOpenSslEngineTest
// the ReferenceCountedOpenSslEngine depends upon the SslContext and so we must wait the cleanup the
// SslContext to avoid JVM core dumps!
//
// See https://github.com/netty/netty/issues/5692
if (clientCloseFuture != null) {
clientCloseFuture.sync();
}
if (serverConnectedCloseFuture != null) {
serverConnectedCloseFuture.sync();
}
if (serverCloseFuture != null) {
serverCloseFuture.sync();
}
if (serverSslCtx != null) {
cleanupServerSslContext(serverSslCtx);
serverSslCtx = null;
}
if (clientSslCtx != null) {
cleanupClientSslContext(clientSslCtx);
clientSslCtx = null;
}
Future<?> serverGroupShutdownFuture = null;
Future<?> serverChildGroupShutdownFuture = null;
Future<?> clientGroupShutdownFuture = null;
if (sb != null) {
serverGroupShutdownFuture = sb.config().group().shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
serverChildGroupShutdownFuture = sb.config().childGroup().shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
}
if (cb != null) {
clientGroupShutdownFuture = cb.config().group().shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
}
if (serverGroupShutdownFuture != null) {
serverGroupShutdownFuture.sync();
serverChildGroupShutdownFuture.sync();
}
if (clientGroupShutdownFuture != null) {
clientGroupShutdownFuture.sync();
}
serverException = null;
}
@SuppressWarnings("FutureReturnValueIgnored")
private void uploadBlocking(String key, long length, InputStream in, boolean casUpload)
throws IOException, InterruptedException {
InputStream wrappedIn =
new FilterInputStream(in) {
@Override
public void close() {
// Ensure that the InputStream can't be closed somewhere in the Netty
// pipeline, so that we can support retries. The InputStream is closed in
// the finally block below.
}
};
UploadCommand upload = new UploadCommand(uri, casUpload, key, wrappedIn, length);
Channel ch = null;
boolean success = false;
if (storedBlobs.putIfAbsent((casUpload ? CAS_PREFIX : AC_PREFIX) + key, true) == null) {
try {
ch = acquireUploadChannel();
ChannelFuture uploadFuture = ch.writeAndFlush(upload);
uploadFuture.sync();
success = true;
} catch (Exception e) {
// e can be of type HttpException, because Netty uses Unsafe.throwException to re-throw a
// checked exception that hasn't been declared in the method signature.
if (e instanceof HttpException) {
HttpResponse response = ((HttpException) e).response();
if (authTokenExpired(response)) {
refreshCredentials();
// The error is due to an auth token having expired. Let's try again.
if (!reset(in)) {
// The InputStream can't be reset and thus we can't retry as most likely
// bytes have already been read from the InputStream.
throw e;
}
putAfterCredentialRefresh(upload);
success = true;
return;
}
}
throw e;
} finally {
if (!success) {
storedBlobs.remove(key);
}
in.close();
if (ch != null) {
releaseUploadChannel(ch);
}
}
}
}
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
sslCtx = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new TelnetClientInitializer(sslCtx));
// Start the connection attempt.
Channel ch = b.connect(HOST, PORT).sync().channel();
// Read commands from the stdin.
ChannelFuture lastWriteFuture = null;
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
for (; ; ) {
String line = in.readLine();
if (line == null) {
break;
}
// Sends the received line to the server.
lastWriteFuture = ch.writeAndFlush(line + "\r\n");
// If user typed the 'bye' command, wait until the server closes
// the connection.
if ("bye".equals(line.toLowerCase())) {
ch.closeFuture().sync();
break;
}
}
// Wait until all messages are flushed before closing the channel.
if (lastWriteFuture != null) {
lastWriteFuture.sync();
}
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
sslCtx = SslContext.newClientContext(InsecureTrustManagerFactory.INSTANCE);
} else {
sslCtx = null;
}
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new TelnetClientInitializer(sslCtx));
// Start the connection attempt.
Channel ch = b.connect(HOST, PORT).sync().channel();
// Read commands from the stdin.
ChannelFuture lastWriteFuture = null;
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
for (;;) {
String line = in.readLine();
if (line == null) {
break;
}
// Sends the received line to the server.
lastWriteFuture = ch.writeAndFlush(line + "\r\n");
// If user typed the 'bye' command, wait until the server closes
// the connection.
if ("bye".equals(line.toLowerCase())) {
ch.closeFuture().sync();
break;
}
}
// Wait until all messages are flushed before closing the channel.
if (lastWriteFuture != null) {
lastWriteFuture.sync();
}
} finally {
group.shutdownGracefully();
}
}
@FXML
public void send() {
if( logger.isDebugEnabled() ) {
logger.debug("[SEND]");
}
if( !connected.get() ) {
if( logger.isWarnEnabled() ) {
logger.warn("client not connected; skipping write");
}
return;
}
final String toSend = tfSend.getText();
Task<Void> task = new Task<Void>() {
@Override
protected Void call() throws Exception {
ChannelFuture f = channel.writeAndFlush( Unpooled.copiedBuffer(toSend, CharsetUtil.UTF_8) );
f.sync();
return null;
}
@Override
protected void failed() {
Throwable exc = getException();
logger.error( "client send error", exc );
Alert alert = new Alert(AlertType.ERROR);
alert.setTitle("Client");
alert.setHeaderText( exc.getClass().getName() );
alert.setContentText( exc.getMessage() );
alert.showAndWait();
connected.set(false);
}
};
hboxStatus.visibleProperty().bind( task.runningProperty() );
lblStatus.textProperty().bind( task.messageProperty() );
piStatus.progressProperty().bind(task.progressProperty());
new Thread(task).start();
}