下面列出了com.google.common.util.concurrent.RateLimiter#setRate ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* 一个用来定制RateLimiter的方法。
*
* @param permitsPerSecond 每秒允许的请求书,可看成QPS
* @param maxBurstSeconds 最大的突发缓冲时间。用来应对突发流量。Guava的实现默认是1s。permitsPerSecond * maxBurstSeconds的数量,就是闲置时预留的缓冲token数量
* @param filledWithToken 是否需要创建时就保留有permitsPerSecond * maxBurstSeconds的token
*/
public static RateLimiter create(double permitsPerSecond, double maxBurstSeconds, boolean filledWithToken)
throws ReflectiveOperationException {
Class<?> sleepingStopwatchClass = Class
.forName("com.google.common.util.concurrent.RateLimiter$SleepingStopwatch");
Method createStopwatchMethod = sleepingStopwatchClass.getDeclaredMethod("createFromSystemTimer");
createStopwatchMethod.setAccessible(true);
Object stopwatch = createStopwatchMethod.invoke(null);
Class<?> burstyRateLimiterClass = Class
.forName("com.google.common.util.concurrent.SmoothRateLimiter$SmoothBursty");
Constructor<?> burstyRateLimiterConstructor = burstyRateLimiterClass.getDeclaredConstructors()[0];
burstyRateLimiterConstructor.setAccessible(true);
// set maxBurstSeconds
RateLimiter rateLimiter = (RateLimiter) burstyRateLimiterConstructor.newInstance(stopwatch, maxBurstSeconds);
rateLimiter.setRate(permitsPerSecond);
if (filledWithToken) {
// set storedPermits
setField(rateLimiter, "storedPermits", permitsPerSecond * maxBurstSeconds);
}
return rateLimiter;
}
public void freshNetRate(int rate) {
if (rate <= 0) {
logger.error("net rate:{} is illegal", rate);
return;
}
int downRate = rate > properties.getSystemNeedRate() ?
rate - properties.getSystemNeedRate() : (rate + 1) / 2;
long rateOnByte = downRate * 1024L * 1024L;
try {
boolean updated = false;
for (RateLimiter rateLimiter : rateLimiters) {
if (Math.abs(rateLimiter.getRate() - rateOnByte) >= 1024) {
rateLimiter.setRate(rateOnByte);
updated = true;
}
}
if (updated) {
logger.info("update net rate to {} MB", rate);
}
} catch (Exception e) {
logger.error("E_freshNetRate", e);
}
}
/**
* @param topic
* @param httpNewRate
* @return
*/
public void adjustRateLimit(String topic, double httpNewRate) {
RateLimiter oldLimiter = httpRateLimiterMap.get(topic);
if (oldLimiter == null) {
return;
}
oldLimiter.setRate(httpNewRate);
}
/**
* @param topic
* @param httpNewRate
* @return
*/
public void adjustRateLimit(String topic, double httpNewRate) {
RateLimiter oldLimiter = httpRateLimiterMap.get(topic);
if (oldLimiter == null) {
return;
}
oldLimiter.setRate(httpNewRate);
}
public RateLimiter getPartitionProduceQPSRateLimiter(String topic,
int partition) {
Double limit = getLimit(m_topicQPSRateLimits.get(), topic);
Pair<String, Integer> tp = new Pair<String, Integer>(topic, partition);
RateLimiter rateLimiter = m_topicPartitionQPSRateLimiters.get(tp);
if (rateLimiter == null) {
synchronized (m_topicPartitionQPSRateLimiters) {
rateLimiter = m_topicPartitionQPSRateLimiters.get(tp);
if (rateLimiter == null) {
rateLimiter = RateLimiter.create(limit);
m_topicPartitionQPSRateLimiters.put(tp, rateLimiter);
log.info(
"Set single partition's qps rate limit to {} for topic {} and partition {}",
limit, topic, partition);
}
}
} else {
synchronized (rateLimiter) {
if (rateLimiter.getRate() != limit) {
rateLimiter.setRate(limit);
log.info(
"Single partition's qps rate limit changed to {} for topic {} and partition {}",
limit, topic, partition);
}
}
}
return rateLimiter;
}
public RateLimiter getPartitionProduceBytesRateLimiter(String topic,
int partition) {
Double limit = getLimit(m_topicBytesRateLimits.get(), topic);
Pair<String, Integer> tp = new Pair<String, Integer>(topic, partition);
RateLimiter rateLimiter = m_topicPartitionBytesRateLimiters.get(tp);
if (rateLimiter == null) {
synchronized (m_topicPartitionBytesRateLimiters) {
rateLimiter = m_topicPartitionBytesRateLimiters.get(tp);
if (rateLimiter == null) {
rateLimiter = RateLimiter.create(limit);
m_topicPartitionBytesRateLimiters.put(tp, rateLimiter);
log.info(
"Set single partition's bytes rate limit to {} for topic {}",
limit, topic);
}
}
} else {
synchronized (rateLimiter) {
if (rateLimiter.getRate() != limit) {
rateLimiter.setRate(limit);
log.info(
"Single partition's bytes rate limit changed to {} for topic {}",
limit, topic);
}
}
}
return rateLimiter;
}
private static void updateRuntimeConfig(Supplier<StyxConfig> config, RateLimiter rateLimiter) {
try {
double currentRate = rateLimiter.getRate();
Double updatedRate = config.get().submissionRateLimit().orElse(
StyxScheduler.DEFAULT_SUBMISSION_RATE_PER_SEC);
if (Math.abs(updatedRate - currentRate) >= 0.1) {
LOG.info("Updating submission rate limit: {} -> {}", currentRate, updatedRate);
rateLimiter.setRate(updatedRate);
}
} catch (Exception e) {
LOG.warn("Failed to fetch the submission rate config from storage, "
+ "skipping RateLimiter update", e);
}
}
private void mayUpdateThroughput(double limit, RateLimiter rateLimiter)
{
// if throughput is set to 0, throttling is disabled
if (limit == 0)
limit = Double.MAX_VALUE;
if (rateLimiter.getRate() != limit)
rateLimiter.setRate(limit);
}