下面列出了com.google.common.util.concurrent.RateLimiter#create ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void main(String[] args) {
try {
RateLimiter limit = RateLimiter.create(100d);
PropertyConfigurator.configure("conf/log4j.properties");
while (true) {
if (limit.tryAcquire()) {
final HelloCommand command = new HelloCommand();
// System.out.println("result: " + command.execute());
// System.out.println("");
Future<String> future = command.queue();
System.out.println("result: " + future.get());
System.out.println("");
}
}
// Observable<String> observe = command.observe();
// observe.asObservable().subscribe((result) -> {
// System.out.println(result);
// });
} catch (Exception e) {
e.printStackTrace();
}
}
SendMessageTask(DataSourceInfoModel datasourceInfo, DatabaseDriverMapping mapping, CachedMessageClientStore messageClientStore,
Gson serializer, long timeout, MessageProducer messageProducer) {
this.messageProducer = messageProducer;
Preconditions.checkNotNull(datasourceInfo);
Preconditions.checkNotNull(mapping);
Preconditions.checkNotNull(messageClientStore);
Preconditions.checkNotNull(serializer);
this.dataSourceInfo = datasourceInfo;
this.driver = mapping.getDatabaseMapping(dataSourceInfo.getScheme());
this.dataSource = driver.makeDataSource(dataSourceInfo.getUrl(), datasourceInfo.getUserName(), datasourceInfo.getPassword());
this.messageClientStore = messageClientStore;
this.serializer = serializer;
this.timeout = timeout;
this.limiter = RateLimiter.create(50);
}
/**
* Constructor to initialize QueryScheduler
* @param queryExecutor QueryExecutor engine to use
* @param resourceManager for managing server thread resources
* @param serverMetrics server metrics collector
*/
public QueryScheduler(@Nonnull Configuration config, @Nonnull QueryExecutor queryExecutor,
@Nonnull ResourceManager resourceManager, @Nonnull ServerMetrics serverMetrics,
@Nonnull LongAccumulator latestQueryTime) {
Preconditions.checkNotNull(config);
Preconditions.checkNotNull(queryExecutor);
Preconditions.checkNotNull(resourceManager);
Preconditions.checkNotNull(serverMetrics);
this.serverMetrics = serverMetrics;
this.resourceManager = resourceManager;
this.queryExecutor = queryExecutor;
this.latestQueryTime = latestQueryTime;
this.queryLogRateLimiter = RateLimiter.create(config.getDouble(QUERY_LOG_MAX_RATE_KEY, DEFAULT_QUERY_LOG_MAX_RATE));
this.numDroppedLogRateLimiter = RateLimiter.create(1.0d);
this.numDroppedLogCounter = new AtomicInteger(0);
LOGGER.info("Query log max rate: {}", queryLogRateLimiter.getRate());
}
private void createBatchOfPassengers(ThreadLocalRandom random) {
@SuppressWarnings("UnstableApiUsage")
RateLimiter rate = RateLimiter.create(2);
for (int j = 0; j < passengerCount; j++) {
rate.acquire();
int startCell, endCell;
do {
startCell = random.nextInt(gridDimension * gridDimension);
int dx = random.nextInt(-10, 10);
int dy = random.nextInt(-10, 10);
endCell = moveSlightly(startCell, dx, dy);
} while (startCell == endCell);
String id = "passenger-" + UUID.randomUUID();
Simulatee passenger = new Passenger(communication, id, startCell, endCell);
scheduler.add(passenger);
}
}
public static void main(String[] args) throws InterruptedException {
// qps设置为5,代表一秒钟只允许处理五个并发请求
RateLimiter rateLimiter = RateLimiter.create(5);
ExecutorService executorService = Executors.newFixedThreadPool(5);
int nTasks = 10;
CountDownLatch countDownLatch = new CountDownLatch(nTasks);
long start = System.currentTimeMillis();
for (int i = 0; i < nTasks; i++) {
final int j = i;
executorService.submit(() -> {
rateLimiter.acquire(1);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread().getName() + " gets job " + j + " done");
countDownLatch.countDown();
});
}
executorService.shutdown();
countDownLatch.await();
long end = System.currentTimeMillis();
System.out.println("10 jobs gets done by 5 threads concurrently in " + (end - start) + " milliseconds");
}
@Provides
@Named("cloudDns")
static RateLimiter provideRateLimiter() {
// This is the default max QPS for Cloud DNS. It can be increased by contacting the team
// via the Quotas page on the Cloud Console.
int cloudDnsMaxQps = 50;
return RateLimiter.create(cloudDnsMaxQps);
}
ProfilerFileForProxyTask(String id, long maxRunningMs, ResponseHandler handler, String profilerId) {
this.id = id;
this.maxRunningMs = maxRunningMs;
this.handler = handler;
this.profilerId = profilerId;
rateLimiter = RateLimiter.create(META_STORE.getIntProperty("download.kb.per.second", DEFAULT_DOWNLOAD_LIMIT_KB) * 1024);
}
/**
* Constructs a {@link StackdriverWriter}.
*
* <p>The monitoringClient must have read and write permissions to the Cloud Monitoring API v3 on
* the provided project.
*/
public StackdriverWriter(
Monitoring monitoringClient,
String project,
MonitoredResource monitoredResource,
int maxQps,
int maxPointsPerRequest) {
this.monitoringClient = checkNotNull(monitoringClient);
this.projectResource = "projects/" + checkNotNull(project);
this.monitoredResource = monitoredResource;
this.maxPointsPerRequest = maxPointsPerRequest;
this.timeSeriesBuffer = new ArrayDeque<>(maxPointsPerRequest);
this.rateLimiter = RateLimiter.create(maxQps);
}
@Override
public void afterPropertiesSet() throws Exception {
if(maxConnectionLimit > 0)gloabalLimiter = RateLimiter.create(maxConnectionLimit);
String[] segments = frequencyRule.split("/");
int time = Integer.parseInt(segments[1].replaceAll("[^0-9]", ""));
if(segments[1].toLowerCase().endsWith("m")){
time = time * 60;
}
perLimiter = new PerFrequencyLimiter(time, Integer.parseInt(segments[0]));
log.info("per-Frequency:{}/{}s",segments[0],time);
}
private synchronized void decreaseRateLimit() {
Instant now = _clock.instant();
if (now.isAfter(_endDecreaseCooldownPeriod)) {
// Decrease by half
_rateLimit = Math.max(_rateLimit / 2, minRateLimit);
_rateLimiter = RateLimiter.create(_rateLimit, 1, TimeUnit.SECONDS);
_endDecreaseCooldownPeriod = now.plus(decreaseCooldown);
_endIncreaseCooldownPeriod = now.plus(increaseCooldown);
LOGGER.info("S3 rate limit decreased to {}", _rateLimit);
}
}
public RateLimiter getOrCreateRateLimiter(String resource, int rate) {
RateLimiter limiter = rateLimiterCache.get(resource);
if (limiter == null || limiter.getRate() != rate) {
synchronized (resource.intern()) {
limiter = rateLimiterCache.get(resource);
if (limiter == null) {
limiter = RateLimiter.create(rate);
rateLimiterCache.put(resource, limiter);
}
}
}
return limiter;
}
private void init() throws IOException {
boolean gzip = "true".equals(props.getProperty(WARP10_GZIP));
String endpoint = props.getProperty(WARP10_ENDPOINT);
String token = props.getProperty(WARP10_TOKEN);
String maxrate = props.getProperty(WARP10_MAXRATE);
if (null != maxrate) {
this.limiter = RateLimiter.create(Double.parseDouble(maxrate));
}
conn = (HttpURLConnection) new URL(endpoint).openConnection();
conn.setRequestMethod("POST");
conn.setDoOutput(true);
conn.setDoInput(true);
conn.setRequestProperty(Constants.HTTP_HEADER_TOKEN_DEFAULT, token);
conn.setChunkedStreamingMode(65536);
if (gzip) {
conn.setRequestProperty("Content-Type", "application/gzip");
}
conn.connect();
OutputStream out = conn.getOutputStream();
if (gzip) {
out = new GZIPOutputStream(out);
}
pw = new PrintWriter(out);
this.init = true;
}
@Override
public RateLimiter load(CacheKey key) {
double rateLimit = DEFAULT_RATE;
Double endpointLimit = ENDPOINT_RATES.get(key.endpoint);
if (endpointLimit != null) {
rateLimit = endpointLimit;
}
logger.debug("new RateLimiter ip={} endpoint={} -> limit={}",
key.ip, key.endpoint, rateLimit);
return RateLimiter.create(rateLimit);
}
private Function<InputStream, InputStream> getUploadingInputStreamFunction(final AtomicBoolean shouldCancel) {
return request.bandwidth == null ? identity() : inputStream -> {
final RateLimiter rateLimiter = RateLimiter.create(request.bandwidth.asBytesPerSecond().value);
logger.info("Upload bandwidth capped at {}.", request.bandwidth);
return new RateLimitedInputStream(inputStream, rateLimiter, shouldCancel);
};
}
/** {@inheritDoc} */
@Override
protected List<ConfigIssue> init() {
List<ConfigIssue> issues = super.init();
errorRecordHandler = new DefaultErrorRecordHandler(getContext()); // NOSONAR
double rateLimit = conf.rateLimit > 0 ? (1000.0 / conf.rateLimit) : Double.MAX_VALUE;
rateLimiter = RateLimiter.create(rateLimit);
httpClientCommon.init(issues, getContext());
conf.dataFormatConfig.init(
getContext(),
conf.dataFormat,
Groups.HTTP.name(),
HttpClientCommon.DATA_FORMAT_CONFIG_PREFIX,
issues
);
bodyVars = getContext().createELVars();
bodyEval = getContext().createELEval(REQUEST_BODY_CONFIG_NAME);
if (issues.isEmpty()) {
parserFactory = conf.dataFormatConfig.getParserFactory();
}
return issues;
}
public ShiftableRateLimiter(double initialRate,
double maxRate,
double changeRate,
long changeInterval,
TimeUnit changeIntervalUnit) {
this.initialRate = initialRate;
this.maxRate = maxRate;
this.changeRate = changeRate;
this.nextRate = initialRate;
this.changeInterval = changeInterval;
this.changeIntervalUnit = changeIntervalUnit;
this.rateLimiter = RateLimiter.create(initialRate);
this.executor = Executors.newSingleThreadScheduledExecutor();
this.executor.scheduleAtFixedRate(this, changeInterval, changeInterval, changeIntervalUnit);
}
public static void main(String[] args) throws Exception {
if (3 != args.length) {
System.out.println(HELP);
return;
}
String finagleNameStr = args[0];
final String streamName = args[1];
double rate = Double.parseDouble(args[2]);
RateLimiter limiter = RateLimiter.create(rate);
DistributedLogClient client = DistributedLogClientBuilder.newBuilder()
.clientId(ClientId$.MODULE$.apply("record-generator"))
.name("record-generator")
.thriftmux(true)
.finagleNameStr(finagleNameStr)
.build();
final CountDownLatch keepAliveLatch = new CountDownLatch(1);
final AtomicLong numWrites = new AtomicLong(0);
final AtomicBoolean running = new AtomicBoolean(true);
while (running.get()) {
limiter.acquire();
String record = "record-" + System.currentTimeMillis();
client.write(streamName, ByteBuffer.wrap(record.getBytes(UTF_8)))
.addEventListener(new FutureEventListener<DLSN>() {
@Override
public void onFailure(Throwable cause) {
System.out.println("Encountered error on writing data");
cause.printStackTrace(System.err);
running.set(false);
keepAliveLatch.countDown();
}
@Override
public void onSuccess(DLSN value) {
long numSuccesses = numWrites.incrementAndGet();
if (numSuccesses % 100 == 0) {
System.out.println("Write " + numSuccesses + " records.");
}
}
});
}
keepAliveLatch.await();
client.close();
}
public AsyncRateLimiter(double framesPerSecond, Computable<CompletableFuture<?>> callback, Disposable parentDisposable) {
this.callback = callback;
rateLimiter = RateLimiter.create(framesPerSecond);
requestScheduler = new Alarm(Alarm.ThreadToUse.POOLED_THREAD, parentDisposable);
}
RateLimitingWrapperStream(T stream, long totalStreamSize, double rateLimit) {
super(stream);
Utils.checkArgument(rateLimit > 0, "Rate limit for this stream should be greater than 0.");
rateLimiter = RateLimiter.create(rateLimit);
remainingStreamSize = totalStreamSize;
}
/**
* 返回漏桶算法的RateLimiter
*
* @permitsPerSecond 期望的QPS, RateLimiter将QPS平滑到毫秒级别上,但有蓄水及桶外预借的能力.
*/
public static RateLimiter rateLimiter(int permitsPerSecond) {
return RateLimiter.create(permitsPerSecond);
}