下面列出了怎么用io.netty.util.HashedWheelTimer的API类实例代码及写法,或者点击链接到github查看源代码。
@Inject
public AlexaPlatformService(AlexaPlatformServiceConfig config, PlatformMessageBus bus) {
this.bus = bus;
workerPool = new ThreadPoolBuilder()
.withMaxPoolSize(config.getMaxListenerThreads())
.withKeepAliveMs(config.getListenerThreadKeepAliveMs())
.withNameFormat(DISPATCHER_POOL_NAME + "-%d")
.withBlockingBacklog()
.withMetrics("alexa.bridge")
.build();
timeoutPool = new HashedWheelTimer(new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(TIMEOUT_POOL_NAME + "-%d")
.setUncaughtExceptionHandler(new LoggingUncaughtExceptionHandler(logger))
.build());
defaultTimeoutSecs = config.getDefaultTimeoutSecs();
}
@Inject
public CommandExecutor(
PlatformMessageBus bus,
@Named(VoiceConfig.NAME_EXECUTOR) ExecutorService executor,
VoiceConfig config,
@Named(VoiceConfig.NAME_TIMEOUT_TIMER) HashedWheelTimer timeoutTimer,
ResponseCompleter responseCompleter,
PlacePopulationCacheManager populationCacheMgr
) {
this.executor = executor;
this.config = config;
this.timeoutTimer = timeoutTimer;
this.responseCompleter = responseCompleter;
this.populationCacheMgr = populationCacheMgr;
this.busClient = new PlatformBusClient(bus, executor, ImmutableSet.of(AddressMatchers.equals(Address.platformService(VoiceService.NAMESPACE))));
}
/**
* Starts Socket.IO server with current configuration settings.
*
* @throws IllegalStateException
* if server already started
*/
public synchronized void start() {
if (isStarted()) {
throw new IllegalStateException("Failed to start Socket.IO server: server already started");
}
log.info("Socket.IO server starting");
// Configure heartbeat scheduler
timer = new HashedWheelTimer();
timer.start();
SocketIOHeartbeatScheduler.setHashedWheelTimer(timer);
SocketIOHeartbeatScheduler.setHeartbeatInterval(configuration.getHeartbeatInterval());
SocketIOHeartbeatScheduler.setHeartbeatTimeout(configuration.getHeartbeatTimeout());
// Configure and bind server
ServerBootstrapFactory bootstrapFactory = serverBootstrapFactory != null
? serverBootstrapFactory
: new DefaultServerBootstrapFactory(configuration);
bootstrap = bootstrapFactory.createServerBootstrap();
bootstrap.childHandler(new SocketIOChannelInitializer(configuration, listener, pipelineModifier));
bootstrap.bind(configuration.getPort()).syncUninterruptibly();
state = State.STARTED;
log.info("Socket.IO server started: {}", configuration);
}
@Inject
public GoogleCommandExecutor(
@Named(VoiceConfig.NAME_EXECUTOR) ExecutorService executor,
@Named(VoiceConfig.NAME_TIMEOUT_TIMER) HashedWheelTimer timeoutTimer,
GoogleConfig config,
CommandExecutor commandExecutor,
GoogleWhitelist whitelist,
ProductCatalogManager prodCat,
Provider<VoiceContextExecutorRegistry> registry
) {
this.executor = executor;
this.timeoutTimer = timeoutTimer;
this.config = config;
this.commandExecutor = commandExecutor;
this.whitelist = whitelist;
this.prodCat = prodCat;
this.registry = registry;
}
@Inject
public HomeGraphAPI(GoogleConfig config,
GoogleRpcContext rpcContext,
ProductCatalogManager prodCat,
GoogleWhitelist whitelist,
@Named(EXECUTOR_NAME) HashedWheelTimer executor
) {
this.config = config;
requestConfig = RequestConfig.custom()
.setConnectionRequestTimeout(config.getConnectionRequestTimeoutMs())
.setConnectTimeout(config.getConnectionTimeoutMs())
.setSocketTimeout(config.getSocketTimeoutMs())
.build();
pool = new PoolingHttpClientConnectionManager(config.getTimeToLiveMs(), TimeUnit.MILLISECONDS);
pool.setDefaultMaxPerRoute(config.getRouteMaxConnections());
pool.setMaxTotal(config.getMaxConnections());
pool.setValidateAfterInactivity(config.getValidateAfterInactivityMs());
this.gRpcContext = rpcContext;
this.prodCat = prodCat;
this.whitelist = whitelist;
this.executor = executor;
}
@Inject
@SuppressWarnings("null")
public ReflexController(
ReflexLocalProcessing localProcessing,
ZigbeeLocalProcessing zigbee,
ZWaveLocalProcessing zwave,
/*
SercommLocalProcessing sercomm,
*/
Router router
) {
this.timer = new HashedWheelTimer();
this.router = router;
this.zigbee = zigbee;
this.zwave = zwave;
/*
this.sercomm = sercomm;
*/
this.localProcessing = localProcessing;
this.pinToUser = new HashMap<>();
this.userToPin = new HashMap<>();
this.processors = new HashMap<>();
}
public DnsNamingService(BrpcURL namingUrl) {
Validate.notNull(namingUrl);
Validate.notEmpty(namingUrl.getHostPorts());
this.namingUrl = namingUrl;
String[] splits = namingUrl.getHostPorts().split(":");
this.host = splits[0];
if (splits.length == 2) {
this.port = Integer.valueOf(splits[1]);
} else {
this.port = 80;
}
this.hostPort = this.host + ":" + this.port;
this.updateInterval = namingUrl.getIntParameter(
Constants.INTERVAL, Constants.DEFAULT_INTERVAL);
namingServiceTimer = new HashedWheelTimer(new CustomThreadFactory("namingService-timer-thread"));
}
@Test
public void testTryWithResourcesShouldCloseAllClustersButNotTimerIfProvided() throws Exception {
EventLoopGroup eventLoop;
Timer timer = new HashedWheelTimer();
try (Server server = Server.builder().withTimer(timer).build()) {
// Do nothing here, since this is a unit test, we don't want to create any inet sockets
// which is what Server does by default.
eventLoop = server.eventLoopGroup;
}
// event loop should have been closed since a custom one was not provided.
assertThat(eventLoop.isShutdown()).isTrue();
// timer should not have been closed since a custom one was provided.
timer.newTimeout(
timeout -> {
// noop
},
1,
TimeUnit.SECONDS);
timer.stop();
}
public static AsyncHttpClient newClient(final String accountName,
final boolean isSecure,
final HashedWheelTimer poolTimer) {
final DefaultAsyncHttpClientConfig.Builder configBuilder = config()
// TODO: Confirm a new thread pool is not getting created everytime
.setThreadPoolName(accountName + "-azurestorage-async-client")
.setChannelPool(new DefaultChannelPool(DEFAULT_IDLE_TIME, -1, poolTimer, DEFAULT_CLEANER_PERIOD))
.setRequestTimeout(DEFAULT_REQUEST_TIMEOUT)
.setResponseBodyPartFactory(AsyncHttpClientConfig.ResponseBodyPartFactory.LAZY)
.setMaxRequestRetry(MAX_RETRIES);
try {
if (isSecure) {
configBuilder.setSslContext(SslContextBuilder.forClient().build());
}
} catch (SSLException e) {
logger.error("Error while setting ssl context in Async Client", e);
}
poolTimer.start();
return asyncHttpClient(configBuilder.build());
}
@Test
public void testResourceClosures() throws IOException {
AsyncHttpClient asyncHttpClient = mock(AsyncHttpClient.class);
PowerMockito.mockStatic(AzureAsyncHttpClientUtils.class);
AtomicReference<HashedWheelTimer> timerInstance = new AtomicReference<>();
when(AzureAsyncHttpClientUtils.newClient(eq(ACCOUNT), eq(true), any(HashedWheelTimer.class))).then(invocationOnMock -> {
timerInstance.set(invocationOnMock.getArgument(2, HashedWheelTimer.class));
return asyncHttpClient;
});
AzureStorageFileSystem azureStorageFileSystem = new AzureStorageFileSystem();
azureStorageFileSystem.setup(getMockHadoopConf());
// Close
azureStorageFileSystem.close();
verify(asyncHttpClient, times(1)).close();
try {
timerInstance.get().start();
fail("Timer cannot be started if it was stopped properly at resource closure");
} catch (IllegalStateException e) {
assertEquals("cannot be started once stopped", e.getMessage());
}
}
@Override
protected void doStart(Listener listener) throws Throwable {
workerGroup = new NioEventLoopGroup(http_work, new DefaultThreadFactory(ThreadNames.T_HTTP_CLIENT));
b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.option(ChannelOption.TCP_NODELAY, true);
b.option(ChannelOption.SO_REUSEADDR, true);
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 4000);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("decoder", new HttpResponseDecoder());
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(maxContentLength));
ch.pipeline().addLast("encoder", new HttpRequestEncoder());
ch.pipeline().addLast("handler", new HttpClientHandler(NettyHttpClient.this));
}
});
timer = new HashedWheelTimer(new NamedThreadFactory(T_HTTP_TIMER), 1, TimeUnit.SECONDS, 64);
listener.onSuccess();
}
BookKeeperClient(DistributedLogConfiguration conf,
String name,
String zkServers,
ZooKeeperClient zkc,
String ledgersPath,
EventLoopGroup eventLoopGroup,
HashedWheelTimer requestTimer,
StatsLogger statsLogger,
Optional<FeatureProvider> featureProvider) {
this.conf = conf;
this.name = name;
this.zkServers = zkServers;
this.ledgersPath = ledgersPath;
this.passwd = conf.getBKDigestPW().getBytes(UTF_8);
this.eventLoopGroup = eventLoopGroup;
this.requestTimer = requestTimer;
this.statsLogger = statsLogger;
this.featureProvider = featureProvider;
this.ownZK = null == zkc;
if (null != zkc) {
// reference the passing zookeeper client
this.zkc = zkc;
}
}
private BookKeeperClientBuilder createBKCBuilder(String bkcName,
DistributedLogConfiguration conf,
String zkServers,
String ledgersPath,
EventLoopGroup eventLoopGroup,
HashedWheelTimer requestTimer,
Optional<FeatureProvider> featureProviderOptional,
StatsLogger statsLogger) {
BookKeeperClientBuilder builder = BookKeeperClientBuilder.newBuilder()
.name(bkcName)
.dlConfig(conf)
.zkServers(zkServers)
.ledgersPath(ledgersPath)
.eventLoopGroup(eventLoopGroup)
.requestTimer(requestTimer)
.featureProvider(featureProviderOptional)
.statsLogger(statsLogger);
LOG.info("Created shared client builder {} : zkServers = {}, ledgersPath = {}, numIOThreads = {}",
new Object[] { bkcName, zkServers, ledgersPath, conf.getBKClientNumberIOThreads() });
return builder;
}
@Test
public void testCreateProducerOnNotExistsTopic() throws PulsarClientException, InterruptedException {
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();
try {
pulsarClient.newProducer()
.topic("persistent://public/default/" + UUID.randomUUID().toString())
.sendTimeout(100, TimeUnit.MILLISECONDS)
.create();
Assert.fail("Create producer should failed while topic does not exists.");
} catch (PulsarClientException ignore) {
}
Thread.sleep(2000);
HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) pulsarClient).timer();
Assert.assertEquals(timer.pendingTimeouts(), 0);
Assert.assertEquals(((PulsarClientImpl) pulsarClient).producersCount(), 0);
pulsarClient.close();
}
@Test
public void testCreateConsumerOnNotExistsTopic() throws PulsarClientException, InterruptedException {
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 1);
try {
pulsarClient.newConsumer()
.topic("persistent://public/default/" + UUID.randomUUID().toString())
.subscriptionName("test")
.subscribe();
Assert.fail("Create consumer should failed while topic does not exists.");
} catch (PulsarClientException ignore) {
}
Thread.sleep(2000);
HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) pulsarClient).timer();
Assert.assertEquals(timer.pendingTimeouts(), 0);
Assert.assertEquals(((PulsarClientImpl) pulsarClient).consumersCount(), 0);
}
public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool)
throws PulsarClientException {
if (conf == null || isBlank(conf.getServiceUrl()) || eventLoopGroup == null) {
throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration");
}
this.eventLoopGroup = eventLoopGroup;
setAuth(conf);
this.conf = conf;
this.clientClock = conf.getClock();
conf.getAuthentication().start();
this.cnxPool = cnxPool;
externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), getThreadFactory("pulsar-external-listener"));
if (conf.getServiceUrl().startsWith("http")) {
lookup = new HttpLookupService(conf, eventLoopGroup);
} else {
lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(), conf.isUseTls(), externalExecutorProvider.getExecutor());
}
timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
producers = Maps.newIdentityHashMap();
consumers = Maps.newIdentityHashMap();
state.set(State.Open);
}
@Test
public void testIncrementNumAcksReceived() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setStatsIntervalSeconds(1);
PulsarClientImpl client = mock(PulsarClientImpl.class);
when(client.getConfiguration()).thenReturn(conf);
Timer timer = new HashedWheelTimer();
when(client.timer()).thenReturn(timer);
ProducerImpl<?> producer = mock(ProducerImpl.class);
when(producer.getTopic()).thenReturn("topic-test");
when(producer.getProducerName()).thenReturn("producer-test");
when(producer.getPendingQueueSize()).thenReturn(1);
ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData();
ProducerStatsRecorderImpl recorder = new ProducerStatsRecorderImpl(client, producerConfigurationData, producer);
long latencyNs = TimeUnit.SECONDS.toNanos(1);
recorder.incrementNumAcksReceived(latencyNs);
Thread.sleep(1200);
assertEquals(1000.0, recorder.getSendLatencyMillisMax(), 0.5);
}
protected void initTimer(MasterSlaveServersConfig config) {
int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout()};
Arrays.sort(timeouts);
int minTimeout = timeouts[0];
if (minTimeout % 100 != 0) {
minTimeout = (minTimeout % 100) / 2;
} else if (minTimeout == 100) {
minTimeout = 50;
} else {
minTimeout = 100;
}
timer = new HashedWheelTimer(new DefaultThreadFactory("redisson-timer"), minTimeout, TimeUnit.MILLISECONDS, 1024, false);
connectionWatcher = new IdleConnectionWatcher(this, config);
subscribeService = new PublishSubscribeService(this, config);
}
public static <T> CompletableFuture<T> enforceTimeout(
CompletionStage<T> underlyingFuture,
HashedWheelTimer timer,
long timeout,
TimeUnit timeUnit,
Supplier<Exception> exceptionSupplier
) {
// We don't want to muck with the underlying future passed in, so
// chaining a .thenApply(x -> x) forces a new future to be created with its own
// completion tracking. In this way, the original future is left alone and can
// time out on its own schedule.
CompletableFuture<T> future = underlyingFuture
.thenApply(x -> x)
.toCompletableFuture();
Timeout hwtTimeout = timer.newTimeout(
ignored -> future.completeExceptionally(exceptionSupplier.get()),
timeout,
timeUnit
);
future.whenComplete((result, throwable) -> hwtTimeout.cancel());
return future;
}
public static void init(int timeoutCheckerThreads) {
timeoutCheckers = new RoundRobinPickerList<>(timeoutCheckerThreads);
for (int i = 0; i < timeoutCheckerThreads; i++) {
//1024*10ms = 10s for one wheel.
HashedWheelTimer timeoutChecker = new HashedWheelTimer(
new ThreadFactoryBuilder().setNameFormat("ServerTimeoutChecker-%d").setDaemon(true).build(),
10, TimeUnit.MILLISECONDS, 1024);
timeoutChecker.start();
timeoutCheckers.add(timeoutChecker);
}
isInited = true;
}
@Inject
public PlatformBusClient(@Named(IpcdService.PROP_THREADPOOL) Executor executor, PlatformMessageBus bus) {
this.executor = executor;
this.bus = bus;
timeoutPool = new HashedWheelTimer(new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(TIMEOUT_POOL_NAME + "-%d")
.setUncaughtExceptionHandler(new LoggingUncaughtExceptionHandler(logger))
.build());
}
@Provides
@Named(VoiceConfig.NAME_TIMEOUT_TIMER)
@Singleton
public HashedWheelTimer timeoutTimer() {
return new HashedWheelTimer(new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("voice-execute-timeout-%d")
.setUncaughtExceptionHandler(new LoggingUncaughtExceptionHandler(LoggerFactory.getLogger(CommandExecutor.class)))
.build());
}
public FileNamingService(BrpcURL namingUrl) {
Validate.notNull(namingUrl);
Validate.notNull(namingUrl.getPath());
this.namingUrl = namingUrl;
this.filePath = namingUrl.getPath();
this.updateInterval = namingUrl.getIntParameter(
Constants.INTERVAL, Constants.DEFAULT_INTERVAL);
namingServiceTimer = new HashedWheelTimer(new CustomThreadFactory("namingService-timer-thread"));
}
public SpringCloudNamingService(BrpcURL namingUrl) {
this.updateInterval = Constants.DEFAULT_INTERVAL;
namingServiceTimer = new HashedWheelTimer(new CustomThreadFactory("namingService-timer-thread"));
discoveryClient = BrpcApplicationContextUtils.getBean("discoveryClient", DiscoveryClient.class);
if (discoveryClient == null) {
throw new RuntimeException("discovery client is null");
}
}
public static void init(int timeoutCheckerThreads) {
timeoutCheckers = new RoundRobinPickerList<>(timeoutCheckerThreads);
for (int i = 0; i < timeoutCheckerThreads; i++) {
//1024*10ms = 10s for one wheel.
HashedWheelTimer timeoutChecker = new HashedWheelTimer(
new ThreadFactoryBuilder().setNameFormat("ServerTimeoutChecker-%d").setDaemon(true).build(),
10, TimeUnit.MILLISECONDS, 1024);
timeoutChecker.start();
timeoutCheckers.add(timeoutChecker);
}
isInited = true;
}
public static void main(String[] args) throws InterruptedException {
HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(1000, TimeUnit.MILLISECONDS, 16);
hashedWheelTimer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println(System.currentTimeMillis() + " === executed");
}
}, 1, TimeUnit.SECONDS);
}
@Override
public void doRun() {
// 获取真实房间 ID
String roomID = getRoomId(BakaDanmakuConfig.livePlatform.bilibiliRoom.liveRoom);
// 提示,相关房间信息已经获取
sendChatMessage("§8§l直播房间 ID 已经获取,ID 为 " + roomID);
EventLoopGroup group = new NioEventLoopGroup();
io.netty.util.Timer timer = new HashedWheelTimer();
try {
Bootstrap clientBootstrap = new Bootstrap();
clientBootstrap.group(group);
clientBootstrap.channel(NioSocketChannel.class);
clientBootstrap.remoteAddress(LIVE_URL, PORT);
clientBootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new BilibiliChannalInboundHandler(roomID));
}
});
ChannelFuture channelFuture = clientBootstrap.connect().sync();
timer.newTimeout(timeout -> {
channelFuture.channel().writeAndFlush(sendDataPack(Unpooled.buffer(), 2, ""));
}, 30000, TimeUnit.MILLISECONDS);
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException ioe) {
ioe.printStackTrace();
} finally {
try {
timer.stop();
group.shutdownGracefully().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Test
public void testTryWithResourcesShouldCloseAllClustersButNotEventLoopAndTimerIfProvided()
throws Exception {
EventLoopGroup eventLoop = new DefaultEventLoopGroup();
Timer timer = new HashedWheelTimer();
BoundCluster cluster;
MockClient client;
try (Server server =
Server.builder()
.withAddressResolver(localAddressResolver)
.withTimer(timer)
.withEventLoopGroup(eventLoop, LocalServerChannel.class)
.build()) {
cluster = server.register(ClusterSpec.builder().withNodes(5));
BoundNode node = cluster.node(0);
SocketAddress address = node.getAddress();
client = new MockClient(eventLoop);
client.connect(address);
}
// event loop should not have been closed.
assertThat(eventLoop.isShutdown()).isFalse();
// timer should not have since a custom one was not provided.
cluster
.getServer()
.timer
.newTimeout(
timeout -> {
// noop
},
1,
TimeUnit.SECONDS);
eventLoop.shutdownGracefully(0, 0, TimeUnit.SECONDS);
timer.stop();
}
@Override
public void init() {
if (heartbeatCheck) {
long tickDuration = TimeUnit.SECONDS.toMillis(1);//1s 每秒钟走一步,一个心跳周期内大致走一圈
int ticksPerWheel = (int) (CC.mp.core.max_heartbeat / tickDuration);
this.timer = new HashedWheelTimer(
new NamedThreadFactory(ThreadNames.T_CONN_TIMER),
tickDuration, TimeUnit.MILLISECONDS, ticksPerWheel
);
}
}
@Override
public EnsemblePlacementPolicy initialize(
ClientConfiguration conf,
Optional<DNSToSwitchMapping> optionalDnsResolver,
HashedWheelTimer hashedWheelTimer,
FeatureProvider featureProvider, StatsLogger statsLogger
) {
return this;
}