下面列出了io.netty.channel.epoll.EpollDatagramChannel#io.netty.util.concurrent.DefaultThreadFactory 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* 如果有参数值指定了要自定义线程池,则根据参数值确定线程池大小并创建线程池
*/
private static Executor getExecutor(String[] args) {
Executor executor = null;
if (args != null && args.length == 2 && "set-thread-pool".equals(args[0])) {
String numStr = args[1];
int nThreads = 0;
if (MathUtils.isInteger(numStr)) {
nThreads = Integer.parseInt(numStr);
}
if (nThreads == 0) {
nThreads = Runtime.getRuntime().availableProcessors() * 2;
}
ThreadFactory factory = new DefaultThreadFactory("grpc-server-executor", true);
executor = Executors.newFixedThreadPool(nThreads, factory);
}
return executor;
}
public void run() throws Exception {
final ThreadFactory connectFactory = new DefaultThreadFactory("rendezvous");
final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1,
connectFactory, NioUdtProvider.BYTE_PROVIDER);
try {
final Bootstrap bootstrap = new Bootstrap();
bootstrap.group(connectGroup)
.channelFactory(NioUdtProvider.BYTE_RENDEZVOUS)
.handler(new ChannelInitializer<UdtChannel>() {
@Override
protected void initChannel(UdtChannel ch) throws Exception {
ch.pipeline().addLast(
new LoggingHandler(LogLevel.INFO),
new ByteEchoPeerHandler(messageSize));
}
});
final ChannelFuture future = bootstrap.connect(peerAddress, myAddress).sync();
future.channel().closeFuture().sync();
} finally {
connectGroup.shutdownGracefully();
}
}
public synchronized void start(NettyClientConfig config) {
if (started.get()) {
return;
}
initHandler();
Bootstrap bootstrap = new Bootstrap();
eventLoopGroup = new NioEventLoopGroup(1, new DefaultThreadFactory(clientName + "-boss"));
eventExecutors = new DefaultEventExecutorGroup(config.getClientWorkerThreads(), new DefaultThreadFactory(clientName + "-worker"));
connectManager = new NettyConnectManageHandler(bootstrap, config.getConnectTimeoutMillis());
bootstrap.group(this.eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, config.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, config.getClientSocketRcvBufSize())
.handler(newChannelInitializer(config, eventExecutors, connectManager));
started.set(true);
}
public Client(int id, ClientConfig config) throws SchedulerException {
this.id = id;
log.info("client cluster {}", config.getCluster());
log.info("client serverName {}", config.getServerName());
log.info("client address {}", config.getAddress());
log.info("client port {}", config.getPort());
log.info("client protocol {}", config.getProtocol());
log.info("client charset {}", config.getCharset());
log.info("client password {}", config.getPassword());
log.info("client workerThread {}", config.getWorkerThread());
log.info("client eventThread {}", config.getEventThread());
log.info("client msgLength {}", config.getMsgLength());
log.info("client heartSec {}", config.getHeartSec());
log.info("client reconnectMs {}", config.getReconnectMs());
log.info("client syncRemoteTimeOutMs {}", config.getSyncRemoteTimeOutMs());
log.info("client connectNum {}", config.getConnectNum());
workerGroup = new NioEventLoopGroup(config.getWorkerThread(), new DefaultThreadFactory("ClientWorker", true));
clientContext = new ClientContext(config, this, new NioEventLoopGroup(config.getEventThread(), new DefaultThreadFactory("ClientEvent", true)));
clientRemote = new ClientRemote(clientContext);
startCheckHeartTimeTask();
}
public static Server create(ServerConfig config) {
log.info("server cluster {}", config.getCluster());
log.info("server serverName {}", config.getServerName());
log.info("server address {}", config.getAddress());
log.info("server port {}", config.getPort());
log.info("server protocol {}", config.getProtocol());
log.info("server charset {}", config.getCharset());
log.info("server password {}", config.getPassword());
log.info("server bossThread {}", config.getBossThread());
log.info("server workerThread {}", config.getWorkerThread());
log.info("server eventThread {}", config.getEventThread());
log.info("server msgLength {}", config.getMsgLength());
log.info("server heartSec {}", config.getHeartSec());
log.info("server coldDownMs {}", config.getColdDownMs());
log.info("server allowAddressEnable {}", config.isAllowAddressEnable());
log.info("server allowAddressList {}", Arrays.toString(config.getAllowAddressList()));
log.info("server optionSoBacklog {}", config.getOptionSoBacklog());
config.setUseMainServerThreadPool(false);
return new Server(config,
new NioEventLoopGroup(config.getBossThread(), new DefaultThreadFactory("ServerBoss")),
new NioEventLoopGroup(config.getWorkerThread(), new DefaultThreadFactory("ServerWorker")),
Executors.newFixedThreadPool(ThreadCountUtil.convert(config.getEventThread()), new DefaultThreadFactory("ServerEvent")),
Executors.newSingleThreadExecutor(new DefaultThreadFactory("ServerPush")));
}
@Test
public void testGetStats() throws Exception {
String topicName = "test-stats";
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("pulsar://localhost:6650");
conf.setStatsIntervalSeconds(100);
ThreadFactory threadFactory = new DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon());
EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
PulsarClientImpl clientImpl = new PulsarClientImpl(conf, eventLoopGroup);
ProducerConfigurationData producerConfData = new ProducerConfigurationData();
producerConfData.setMessageRoutingMode(MessageRoutingMode.CustomPartition);
producerConfData.setCustomMessageRouter(new CustomMessageRouter());
assertEquals(Long.parseLong("100"), clientImpl.getConfiguration().getStatsIntervalSeconds());
PartitionedProducerImpl impl = new PartitionedProducerImpl(
clientImpl, topicName, producerConfData,
1, null, null, null);
impl.getStats();
}
public NettySharedConnPool(MergeConfig config, NettyClient client) {
poolName = "eagleClientPool-" + poolNum.getAndIncrement();
this.config = config;
this.client = client;
totalConnections = new AtomicInteger(0);
connectionBag = new ConcurrentBag<>(this);
ThreadFactory threadFactory = new DefaultThreadFactory(poolName + " housekeeper", true);
int maxClientConnection = config.getExtInt(ConfigEnum.maxClientConnection.getName(), ConfigEnum.maxClientConnection.getIntValue());
this.addConnectionExecutor = createThreadPoolExecutor(maxClientConnection, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardPolicy());
this.closeConnectionExecutor = createThreadPoolExecutor(maxClientConnection, poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
this.houseKeepingExecutorService = new ScheduledThreadPoolExecutor(1, threadFactory, new ThreadPoolExecutor.DiscardPolicy());
this.houseKeepingExecutorService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
this.houseKeepingExecutorService.setRemoveOnCancelPolicy(true);
this.houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, HOUSEKEEPING_PERIOD_MS, MILLISECONDS);
fillPool();
}
/**
* Configures an {@link Executor}, {@link ProjectManager} and {@link CommandExecutor},
* then starts the {@link CommandExecutor} and initializes internal projects.
*/
@Override
public void before(ExtensionContext context) throws Exception {
tempDir.create();
final Executor repositoryWorker = newWorker();
purgeWorker = Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory("purge-worker", true));
projectManager = newProjectManager(repositoryWorker, purgeWorker);
executor = newCommandExecutor(projectManager, repositoryWorker);
executor.start().get();
ProjectInitializer.initializeInternalProject(executor);
afterExecutorStarted();
}
/**
* Returns a new {@link PluginGroup} which holds the {@link Plugin}s loaded from the classpath.
* {@code null} is returned if there is no {@link Plugin} whose target equals to the specified
* {@code target}.
*
* @param classLoader which is used to load the {@link Plugin}s
* @param target the {@link PluginTarget} which would be loaded
*/
@Nullable
static PluginGroup loadPlugins(ClassLoader classLoader, PluginTarget target, CentralDogmaConfig config) {
requireNonNull(classLoader, "classLoader");
requireNonNull(target, "target");
requireNonNull(config, "config");
final ServiceLoader<Plugin> loader = ServiceLoader.load(Plugin.class, classLoader);
final Builder<Plugin> plugins = new Builder<>();
for (Plugin plugin : loader) {
if (target == plugin.target() && plugin.isEnabled(config)) {
plugins.add(plugin);
}
}
final List<Plugin> list = plugins.build();
if (list.isEmpty()) {
return null;
}
return new PluginGroup(list, Executors.newSingleThreadExecutor(new DefaultThreadFactory(
"plugins-for-" + target.name().toLowerCase().replace("_", "-"), true)));
}
/**
* Create a LoadSimulationClient with the given JCommander arguments.
*
* @param arguments
* Arguments to configure this from.
*/
public LoadSimulationClient(final MainArguments arguments) throws Exception {
payloadCache = new ConcurrentHashMap<>();
topicsToTradeUnits = new ConcurrentHashMap<>();
admin = PulsarAdmin.builder()
.serviceHttpUrl(arguments.serviceURL)
.build();
client = PulsarClient.builder()
.serviceUrl(arguments.serviceURL)
.connectionsPerBroker(4)
.ioThreads(Runtime.getRuntime().availableProcessors())
.statsInterval(0, TimeUnit.SECONDS)
.build();
port = arguments.port;
executor = Executors.newCachedThreadPool(new DefaultThreadFactory("test-client"));
}
public NettyClientImpl(URL url) {
super(url);
this.remoteAddress = new InetSocketAddress(url.getHost(), url.getPort());
this.timeout = url.getIntParameter(URLParam.requestTimeout.getName(), URLParam.requestTimeout.getIntValue());
this.scheduledExecutorService = Executors.newScheduledThreadPool(5,
new DefaultThreadFactory(String.format("%s-%s", Constants.FRAMEWORK_NAME, "future")));
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
scanRpcFutureTable();
}
}, 0, 5000, TimeUnit.MILLISECONDS);
}
private LeaderLatch createNewLeaderLatch(String leaderPath) {
final LeaderLatch newLeaderLatch = new LeaderLatch(curator, leaderPath, "127.0.0.1");
newLeaderLatch.addListener(
new LeaderLatchListener() {
@Override
public void isLeader() {
announceLeader();
}
@Override
public void notLeader() {
leaderActivator.stopBeingLeader();
}
}, Executors.newSingleThreadExecutor(new DefaultThreadFactory("LeaderLatchListener-%s")));
return newLeaderLatch;
}
@Override
protected void doStart(Listener listener) throws Throwable {
workerGroup = new NioEventLoopGroup(http_work, new DefaultThreadFactory(ThreadNames.T_HTTP_CLIENT));
b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.option(ChannelOption.TCP_NODELAY, true);
b.option(ChannelOption.SO_REUSEADDR, true);
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 4000);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("decoder", new HttpResponseDecoder());
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(maxContentLength));
ch.pipeline().addLast("encoder", new HttpRequestEncoder());
ch.pipeline().addLast("handler", new HttpClientHandler(NettyHttpClient.this));
}
});
timer = new HashedWheelTimer(new NamedThreadFactory(T_HTTP_TIMER), 1, TimeUnit.SECONDS, 64);
listener.onSuccess();
}
public PulsarService(ServiceConfiguration config, Optional<WorkerService> functionWorkerService,
Consumer<Integer> processTerminator) {
// Validate correctness of configuration
PulsarConfigurationLoader.isComplete(config);
// validate `advertisedAddress`, `advertisedListeners`, `internalListenerName`
Map<String, AdvertisedListener> result = MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
if (result != null) {
this.advertisedListeners = Collections.unmodifiableMap(result);
} else {
this.advertisedListeners = Collections.unmodifiableMap(Collections.emptyMap());
}
state = State.Init;
// use `internalListenerName` listener as `advertisedAddress`
this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getBindAddress());
if (!this.advertisedListeners.isEmpty()) {
this.advertisedAddress = this.advertisedListeners.get(config.getInternalListenerName()).getBrokerServiceUrl().getHost();
} else {
this.advertisedAddress = advertisedAddress(config);
}
this.brokerVersion = PulsarVersion.getVersion();
this.config = config;
this.shutdownService = new MessagingServiceShutdownHook(this, processTerminator);
this.loadManagerExecutor = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-load-manager"));
this.functionWorkerService = functionWorkerService;
}
@Test
public void testSingleIpAddress() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("test"));
ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
conf.setServiceUrl(serviceUrl);
PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool);
List<InetAddress> result = Lists.newArrayList();
result.add(InetAddress.getByName("127.0.0.1"));
Mockito.when(pool.resolveName("non-existing-dns-name")).thenReturn(CompletableFuture.completedFuture(result));
client.newProducer().topic("persistent://sample/standalone/ns/my-topic").create();
client.close();
eventLoop.shutdownGracefully();
}
@Test
public void testDoubleIpAddress() throws Exception {
String serviceUrl = "pulsar://non-existing-dns-name:" + pulsar.getBrokerListenPort().get();
ClientConfigurationData conf = new ClientConfigurationData();
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("test"));
ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
conf.setServiceUrl(serviceUrl);
PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool);
List<InetAddress> result = Lists.newArrayList();
// Add a non existent IP to the response to check that we're trying the 2nd address as well
result.add(InetAddress.getByName("127.0.0.99"));
result.add(InetAddress.getByName("127.0.0.1"));
Mockito.when(pool.resolveName("non-existing-dns-name")).thenReturn(CompletableFuture.completedFuture(result));
// Create producer should succeed by trying the 2nd IP
client.newProducer().topic("persistent://sample/standalone/ns/my-topic").create();
client.close();
eventLoop.shutdownGracefully();
}
@Test
public void testNoConnectionPool() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setConnectionsPerBroker(0);
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(8, new DefaultThreadFactory("test"));
ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
InetSocketAddress brokerAddress =
InetSocketAddress.createUnresolved("127.0.0.1", pulsar.getBrokerListenPort().get());
IntStream.range(1, 5).forEach(i -> {
pool.getConnection(brokerAddress).thenAccept(cnx -> {
Assert.assertTrue(cnx.channel().isActive());
pool.releaseConnection(cnx);
Assert.assertTrue(cnx.channel().isActive());
});
});
Assert.assertEquals(pool.getPoolSize(), 0);
pool.closeAllConnections();
pool.close();
}
@Test
public void testEnableConnectionPool() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setConnectionsPerBroker(5);
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(8, new DefaultThreadFactory("test"));
ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
InetSocketAddress brokerAddress =
InetSocketAddress.createUnresolved("127.0.0.1", pulsar.getBrokerListenPort().get());
IntStream.range(1, 10).forEach(i -> {
pool.getConnection(brokerAddress).thenAccept(cnx -> {
Assert.assertTrue(cnx.channel().isActive());
pool.releaseConnection(cnx);
Assert.assertTrue(cnx.channel().isActive());
});
});
Assert.assertTrue(pool.getPoolSize() <= 5 && pool.getPoolSize() > 0);
pool.closeAllConnections();
pool.close();
}
private static PulsarClient getClientActiveConsumerChangeNotSupported(ClientConfigurationData conf)
throws Exception {
ThreadFactory threadFactory = new DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon());
EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
ConnectionPool cnxPool = new ConnectionPool(conf, eventLoopGroup, () -> {
return new ClientCnx(conf, eventLoopGroup, ProtocolVersion.v11_VALUE) {
@Override
protected void handleActiveConsumerChange(CommandActiveConsumerChange change) {
throw new UnsupportedOperationException();
}
};
});
return new PulsarClientImpl(conf, eventLoopGroup, cnxPool);
}
private static PulsarClient getClientActiveConsumerChangeNotSupported(ClientConfigurationData conf)
throws Exception {
ThreadFactory threadFactory = new DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon());
EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
ConnectionPool cnxPool = new ConnectionPool(conf, eventLoopGroup, () -> {
return new ClientCnx(conf, eventLoopGroup, ProtocolVersion.v11_VALUE) {
@Override
protected void handleActiveConsumerChange(CommandActiveConsumerChange change) {
throw new UnsupportedOperationException();
}
};
});
return new PulsarClientImpl(conf, eventLoopGroup, cnxPool);
}
@Override
public EventLoopGroup create() {
// Use Netty's DefaultThreadFactory in order to get the benefit of FastThreadLocal.
boolean useDaemonThreads = true;
ThreadFactory threadFactory = new DefaultThreadFactory(name, useDaemonThreads);
int parallelism = numEventLoops == 0
? Runtime.getRuntime().availableProcessors() * 2 : numEventLoops;
return new NioEventLoopGroup(parallelism, threadFactory);
}
private void start() throws IOException {
int port = 50051;
Executor executorPool = Executors.newFixedThreadPool(1,
new DefaultThreadFactory("grpc-server-executor", true));
EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup(1,
new DefaultThreadFactory("grpc-worker-group", true));
//server = ServerBuilder.forPort(port)
server = NettyServerBuilder.forPort(port)
.executor(executorPool)// 自定义grpc服务端线程池
.workerEventLoopGroup(workerEventLoopGroup)// 自定义netty的worker线程池
.addService(new GreeterImpl())
.build()
.start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
SingleThreadServer.this.stop();
System.err.println("*** server shut down");
}
});
}
private static NettyChannelBuilder newNettyClientChannel(Transport transport,
SocketAddress address, boolean tls, boolean testca, int flowControlWindow)
throws IOException {
NettyChannelBuilder builder =
NettyChannelBuilder.forAddress(address).flowControlWindow(flowControlWindow);
if (!tls) {
builder.usePlaintext();
} else if (testca) {
File cert = TestUtils.loadCert("ca.pem");
builder.sslContext(GrpcSslContexts.forClient().trustManager(cert).build());
}
DefaultThreadFactory tf = new DefaultThreadFactory("client-elg-", true /*daemon */);
switch (transport) {
case NETTY_NIO:
builder
.eventLoopGroup(new NioEventLoopGroup(0, tf))
.channelType(NioSocketChannel.class);
break;
case NETTY_EPOLL:
// These classes only work on Linux.
builder
.eventLoopGroup(new EpollEventLoopGroup(0, tf))
.channelType(EpollSocketChannel.class);
break;
case NETTY_UNIX_DOMAIN_SOCKET:
// These classes only work on Linux.
builder
.eventLoopGroup(new EpollEventLoopGroup(0, tf))
.channelType(EpollDomainSocketChannel.class);
break;
default:
// Should never get here.
throw new IllegalArgumentException("Unsupported transport: " + transport);
}
return builder;
}
/** Returns a fixed object pool of handshaker service channel for testing only. */
static FixedObjectPool<ManagedChannel> getHandshakerChannelPoolForTesting(
String handshakerAddress) {
ThreadFactory clientThreadFactory = new DefaultThreadFactory("handshaker pool", true);
ManagedChannel channel =
NettyChannelBuilder.forTarget(handshakerAddress)
.directExecutor()
.eventLoopGroup(new NioEventLoopGroup(1, clientThreadFactory))
.usePlaintext()
.build();
return new FixedObjectPool<ManagedChannel>(channel);
}
@Override
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
// iothreads参数值,默认cpu线程数+1 小于32
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
/**
* start server, bind port
*/
public void start() throws Throwable {
if (!hasStarted.compareAndSet(false, true)) {
return;
}
boss = new NioEventLoopGroup(0, new DefaultThreadFactory("qos-boss", true));
worker = new NioEventLoopGroup(0, new DefaultThreadFactory("qos-worker", true));
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);
serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
serverBootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new QosProcessHandler(welcome, acceptForeignIp));
}
});
try {
serverBootstrap.bind(port).sync();
logger.info("qos-server bind localhost:" + port);
} catch (Throwable throwable) {
logger.error("qos-server can not bind localhost:" + port, throwable);
throw throwable;
}
}
public static void main(String[] args) throws Exception {
// Configure the client.
final ThreadFactory connectFactory = new DefaultThreadFactory("connect");
final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1,
connectFactory, NioUdtProvider.MESSAGE_PROVIDER);
try {
final Bootstrap boot = new Bootstrap();
boot.group(connectGroup)
.channelFactory(NioUdtProvider.MESSAGE_CONNECTOR)
.handler(new ChannelInitializer<UdtChannel>() {
@Override
public void initChannel(final UdtChannel ch)
throws Exception {
ch.pipeline().addLast(
new LoggingHandler(LogLevel.INFO),
new MsgEchoClientHandler());
}
});
// Start the client.
final ChannelFuture f = boot.connect(HOST, PORT).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
connectGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
final ThreadFactory acceptFactory = new DefaultThreadFactory("accept");
final ThreadFactory connectFactory = new DefaultThreadFactory("connect");
final NioEventLoopGroup acceptGroup =
new NioEventLoopGroup(1, acceptFactory, NioUdtProvider.MESSAGE_PROVIDER);
final NioEventLoopGroup connectGroup =
new NioEventLoopGroup(1, connectFactory, NioUdtProvider.MESSAGE_PROVIDER);
// Configure the server.
try {
final ServerBootstrap boot = new ServerBootstrap();
boot.group(acceptGroup, connectGroup)
.channelFactory(NioUdtProvider.MESSAGE_ACCEPTOR)
.option(ChannelOption.SO_BACKLOG, 10)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<UdtChannel>() {
@Override
public void initChannel(final UdtChannel ch)
throws Exception {
ch.pipeline().addLast(
new LoggingHandler(LogLevel.INFO),
new MsgEchoServerHandler());
}
});
// Start the server.
final ChannelFuture future = boot.bind(PORT).sync();
// Wait until the server socket is closed.
future.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
acceptGroup.shutdownGracefully();
connectGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
// Configure the client.
final ThreadFactory connectFactory = new DefaultThreadFactory("connect");
final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1,
connectFactory, NioUdtProvider.BYTE_PROVIDER);
try {
final Bootstrap boot = new Bootstrap();
boot.group(connectGroup)
.channelFactory(NioUdtProvider.BYTE_CONNECTOR)
.handler(new ChannelInitializer<UdtChannel>() {
@Override
public void initChannel(final UdtChannel ch)
throws Exception {
ch.pipeline().addLast(
new LoggingHandler(LogLevel.INFO),
new ByteEchoClientHandler());
}
});
// Start the client.
final ChannelFuture f = boot.connect(HOST, PORT).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
connectGroup.shutdownGracefully();
}
}
private void init(ExecutorService executor) {
tree = Maps.newTreeMap();
if (executor != null) {
this.executor = executor;
} else {
this.executor = Executors.newFixedThreadPool(1, new DefaultThreadFactory("mock-zookeeper"));
}
SetMultimap<String, Watcher> w = HashMultimap.create();
watchers = Multimaps.synchronizedSetMultimap(w);
stopped = false;
alwaysFail = new AtomicReference<>(KeeperException.Code.OK);
failures = new CopyOnWriteArrayList<>();
}