下面列出了com.google.common.util.concurrent.RateLimiter#acquire ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void createBatchOfPassengers(ThreadLocalRandom random) {
@SuppressWarnings("UnstableApiUsage")
RateLimiter rate = RateLimiter.create(2);
for (int j = 0; j < passengerCount; j++) {
rate.acquire();
int startCell, endCell;
do {
startCell = random.nextInt(gridDimension * gridDimension);
int dx = random.nextInt(-10, 10);
int dy = random.nextInt(-10, 10);
endCell = moveSlightly(startCell, dx, dy);
} while (startCell == endCell);
String id = "passenger-" + UUID.randomUUID();
Simulatee passenger = new Passenger(communication, id, startCell, endCell);
scheduler.add(passenger);
}
}
private void createBatchOfPassengers(ThreadLocalRandom random) {
@SuppressWarnings("UnstableApiUsage")
RateLimiter rate = RateLimiter.create(2);
for (int j = 0; j < passengerCount; j++) {
rate.acquire();
int startCell, endCell;
do {
startCell = random.nextInt(gridDimension * gridDimension);
int dx = random.nextInt(-10, 10);
int dy = random.nextInt(-10, 10);
endCell = moveSlightly(startCell, dx, dy);
} while (startCell == endCell);
String id = "passenger-" + UUID.randomUUID();
Simulatee passenger = new Passenger(communication, id, startCell, endCell);
scheduler.add(passenger);
}
}
@Override
public BaseResponse<UserResVO> getUserByFeignBatch(@RequestBody UserReqVO userReqVO) {
//调用远程服务
OrderNoReqVO vo = new OrderNoReqVO();
vo.setReqNo(userReqVO.getReqNo());
vo.setAppId(1L);
RateLimiter limiter = RateLimiter.create(2.0);
//批量调用
for (int i = 0; i < COUNT; i++) {
double acquire = limiter.acquire();
logger.debug("获取令牌成功!,消耗=" + acquire);
BaseResponse<OrderNoResVO> orderNo = orderServiceClient.getOrderNo(vo);
logger.debug("远程返回:" + JSON.toJSONString(orderNo));
}
UserRes userRes = new UserRes();
userRes.setUserId(123);
userRes.setUserName("张三");
userRes.setReqNo(userReqVO.getReqNo());
userRes.setCode(StatusEnum.SUCCESS.getCode());
userRes.setMessage("成功");
return userRes;
}
private void doFlush(Collection<Metric> metricsToFlush, RateLimiter rateLimiter) {
logger.debug("Flushing metrics (" + metricsToFlush.size() + ")");
if (rateLimiter != null) {
logger.debug("QPS is limited to " + (long) rateLimiter.getRate());
}
for(Metric metric : metricsToFlush) {
if (!blacklistService.isBlackListed(metric)) {
if (!shuttingDown && rateLimiter != null) {
rateLimiter.acquire();
}
bus.post(new MetricStoreEvent(metric)).now();
}
}
}
protected List<Resource> buildResourceList(RestApi api) {
List<Resource> resourceList = new ArrayList<>();
Resources resources = api.getResources();
resourceList.addAll(resources.getItem());
LOG.debug("Building list of resources. Stack trace: ", new Throwable());
final RateLimiter rl = RateLimiter.create(2);
while (resources._isLinkAvailable("next")) {
rl.acquire();
resources = resources.getNext();
resourceList.addAll(resources.getItem());
}
return resourceList;
}
@Test
public void givenLimitedResource_whenRequestTwice_thenShouldPermitWithoutBlocking() {
//given
RateLimiter rateLimiter = RateLimiter.create(2);
//when
long startTime = ZonedDateTime.now().getSecond();
rateLimiter.acquire(1);
doSomeLimitedOperation();
rateLimiter.acquire(1);
doSomeLimitedOperation();
long elapsedTimeSeconds = ZonedDateTime.now().getSecond() - startTime;
//then
assertThat(elapsedTimeSeconds <= 1);
}
public double acquire(String topic, int permits) {
RateLimiter limiter = httpRateLimiterMap.get(topic);
if (limiter != null) {
return limiter.acquire(permits);
}
return 0;
}
public double acquire(String topic, int permits) {
RateLimiter limiter = httpRateLimiterMap.get(topic);
if (limiter != null) {
return limiter.acquire(permits);
}
return 0;
}
/**
*
* RateLimiter类似于JDK的信号量Semphore,他用来限制对资源并发访问的线程数
*
*/
public static void testRateLimiter() {
ListeningExecutorService executorService = MoreExecutors
.listeningDecorator(Executors.newCachedThreadPool());
RateLimiter limiter = RateLimiter.create(5.0); // 每秒不超过4个任务被提交
for (int i = 0; i < 10; i++) {
limiter.acquire(); // 请求RateLimiter, 超过permits会被阻塞
final ListenableFuture<Integer> listenableFuture = executorService.submit(new Task("is " + i));
}
}
public static void main(String[] args) {
//create方法传入的是每秒生成令牌的个数
RateLimiter rateLimiter = RateLimiter.create(1.0);
for (int i = 0; i < 5; i++) {
/**
* 创建一个稳定输出令牌的RateLimiter,保证了平均每秒不超过permitsPerSecond个请求
* 当请求到来的速度超过了permitsPerSecond,保证每秒只处理permitsPerSecond个请求
* 当这个RateLimiter使用不足(即请求到来速度小于permitsPerSecond),会囤积最多permitsPerSecond个请求
*/
double waitTime = rateLimiter.acquire(1);
System.out.println(System.currentTimeMillis() / 1000 + " , " + waitTime);
}
}
/**
* RateLimiter类似于JDK的信号量Semphore,他用来限制对资源并发访问的线程数
*/
public static void testRateLimiter() {
ListeningExecutorService executorService = MoreExecutors
.listeningDecorator(Executors.newCachedThreadPool());
RateLimiter limiter = RateLimiter.create(5.0); // 每秒不超过4个任务被提交
for (int i = 0; i < 10; i++) {
limiter.acquire(); // 请求RateLimiter, 超过permits会被阻塞
final ListenableFuture<Integer> listenableFuture = executorService
.submit(new Task("is " + i));
}
}
@Override
protected int runCmd(DistributedLogClient client) throws Exception {
RateLimiter rateLimiter = RateLimiter.create(rate);
for (String stream : streams) {
rateLimiter.acquire();
try {
Await.result(client.release(stream));
System.out.println("Release ownership of stream " + stream);
} catch (Exception e) {
System.err.println("Failed to release ownership of stream " + stream);
throw e;
}
}
return 0;
}
private Runnable rateLimited(Placement placement, final Runnable delegate) {
final RateLimiter rateLimiter = _rateLimiterCache.get(placement.getKeyspace().getClusterName());
return new Runnable() {
@Override
public void run() {
rateLimiter.acquire();
delegate.run();
}
};
}
private void doFlush(Collection<Metric> metricsToFlush, RateLimiter rateLimiter) {
// We'd like to feed metrics in a more gentle manner here but not allowing the queue to grow.
logger.debug("Flushing rollup metrics (" + metricsToFlush.size() + ")");
if (rateLimiter != null) {
logger.debug("QPS is limited to " + rateLimiter.getRate());
}
for(Metric metric : metricsToFlush) {
if (!shuttingDown && rateLimiter != null) {
rateLimiter.acquire();
}
bus.post(new MetricStoreEvent(metric)).now();
}
}
private void doFlush(Collection<Metric> metricsToFlush, RateLimiter rateLimiter) {
// We'd like to feed metrics in a more gentle manner here but not allowing the queue to grow.
logger.debug("Flushing rollup metrics (" + metricsToFlush.size() + ")");
if (rateLimiter != null) {
logger.debug("QPS is limited to " + (long) rateLimiter.getRate());
}
for(Metric metric : metricsToFlush) {
if (!shuttingDown && rateLimiter != null) {
rateLimiter.acquire();
}
bus.post(new MetricStoreEvent(metric)).now();
}
}
private void spawn(int cronCount) {
logger.info(
"Hatching and swarming {} clients at the rate of {} clients/s...",
cronCount,
this.hatchRate);
final RateLimiter rateLimiter = RateLimiter.create(this.hatchRate);
float weightSum = 0;
for (Cron cron : this.prototypes) {
weightSum += cron.getWeight();
}
// rescale
for (Cron prototype : this.prototypes) {
float percent;
if (0 == weightSum) {
percent = 1 / (float) this.prototypes.size();
} else {
percent = prototype.getWeight() / weightSum;
}
int amount = Math.round(cronCount * percent);
if (weightSum == 0) {
amount = cronCount / this.prototypes.size();
}
logger.info("> {}={}", prototype.getName(), amount);
for (int i = 1; i <= amount; i++) {
rateLimiter.acquire();
if (isStopped()) {
return;
}
Cron clone = prototype.clone();
clone.initialize(); // initialize them first
this.scheduler.submit(clone);
actualNumClients.incrementAndGet();
}
}
this.onHatchCompleted();
}
@Test(timeout = 60000)
public void testMultiReaders() throws Exception {
String name = "distrlog-multireaders";
final RateLimiter limiter = RateLimiter.create(1000);
DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
confLocal.setOutputBufferSize(0);
confLocal.setImmediateFlushEnabled(true);
DistributedLogManager dlmwrite = createNewDLM(confLocal, name);
final AsyncLogWriter writer = dlmwrite.startAsyncLogSegmentNonPartitioned();
Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(0)));
Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1)));
final AtomicInteger writeCount = new AtomicInteger(2);
DistributedLogManager dlmread = createNewDLM(conf, name);
BKSyncLogReader reader0 = (BKSyncLogReader) dlmread.getInputStream(0);
try {
ReaderThread[] readerThreads = new ReaderThread[1];
readerThreads[0] = new ReaderThread("reader0-non-blocking", reader0, false);
// readerThreads[1] = new ReaderThread("reader1-non-blocking", reader0, false);
final AtomicBoolean running = new AtomicBoolean(true);
Thread writerThread = new Thread("WriteThread") {
@Override
public void run() {
try {
long txid = 2;
DLSN dlsn = DLSN.InvalidDLSN;
while (running.get()) {
limiter.acquire();
long curTxId = txid++;
dlsn = Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(curTxId)));
writeCount.incrementAndGet();
if (curTxId % 1000 == 0) {
LOG.info("writer write {}", curTxId);
}
}
LOG.info("Completed writing record at {}", dlsn);
Utils.close(writer);
} catch (DLInterruptedException die) {
Thread.currentThread().interrupt();
} catch (Exception e) {
}
}
};
for (ReaderThread rt : readerThreads) {
rt.start();
}
writerThread.start();
TimeUnit.SECONDS.sleep(5);
LOG.info("Stopping writer");
running.set(false);
writerThread.join();
LOG.info("Writer stopped after writing {} records, waiting for reader to complete",
writeCount.get());
while (writeCount.get() > (readerThreads[0].getReadCount())) {
LOG.info("Write Count = {}, Read Count = {}",
new Object[] { writeCount.get(), readerThreads[0].getReadCount() });
TimeUnit.MILLISECONDS.sleep(100);
}
assertEquals(writeCount.get(),
(readerThreads[0].getReadCount()));
for (ReaderThread readerThread : readerThreads) {
readerThread.stopReading();
}
} finally {
dlmwrite.close();
reader0.close();
dlmread.close();
}
}
public static void main(String[] args) throws Exception {
if (3 != args.length) {
System.out.println(HELP);
return;
}
String finagleNameStr = args[0];
final String streamName = args[1];
double rate = Double.parseDouble(args[2]);
RateLimiter limiter = RateLimiter.create(rate);
DistributedLogClient client = DistributedLogClientBuilder.newBuilder()
.clientId(ClientId$.MODULE$.apply("record-generator"))
.name("record-generator")
.thriftmux(true)
.finagleNameStr(finagleNameStr)
.build();
final CountDownLatch keepAliveLatch = new CountDownLatch(1);
final AtomicLong numWrites = new AtomicLong(0);
final AtomicBoolean running = new AtomicBoolean(true);
while (running.get()) {
limiter.acquire();
String record = "record-" + System.currentTimeMillis();
client.write(streamName, ByteBuffer.wrap(record.getBytes(UTF_8)))
.addEventListener(new FutureEventListener<DLSN>() {
@Override
public void onFailure(Throwable cause) {
System.out.println("Encountered error on writing data");
cause.printStackTrace(System.err);
running.set(false);
keepAliveLatch.countDown();
}
@Override
public void onSuccess(DLSN value) {
long numSuccesses = numWrites.incrementAndGet();
if (numSuccesses % 100 == 0) {
System.out.println("Write " + numSuccesses + " records.");
}
}
});
}
keepAliveLatch.await();
client.close();
}
public int replay(RateLimiter rateLimiter) throws IOException
{
logger.debug("Replaying batch {}", id);
List<Mutation> mutations = replayingMutations();
if (mutations.isEmpty())
return 0;
int ttl = calculateHintTTL(mutations);
if (ttl <= 0)
return 0;
replayHandlers = sendReplays(mutations, writtenAt, ttl);
rateLimiter.acquire(data.remaining()); // acquire afterwards, to not mess up ttl calculation.
return replayHandlers.size();
}
private int publish(String topic) {
int numMessagesSent = 0;
int returnCode = 0;
try {
PulsarClient client = clientBuilder.build();
ProducerBuilder<byte[]> producerBuilder = client.newProducer().topic(topic);
if (this.chunkingAllowed) {
producerBuilder.enableChunking(true);
producerBuilder.enableBatching(false);
}
Producer<byte[]> producer = producerBuilder.create();
List<byte[]> messageBodies = generateMessageBodies(this.messages, this.messageFileNames);
RateLimiter limiter = (this.publishRate > 0) ? RateLimiter.create(this.publishRate) : null;
Map<String, String> kvMap = new HashMap<>();
for (String property : properties) {
String [] kv = property.split("=");
kvMap.put(kv[0], kv[1]);
}
for (int i = 0; i < this.numTimesProduce; i++) {
for (byte[] content : messageBodies) {
if (limiter != null) {
limiter.acquire();
}
TypedMessageBuilder<byte[]> message = producer.newMessage();
if (!kvMap.isEmpty()) {
message.properties(kvMap);
}
if (key != null && !key.isEmpty()) {
message.key(key);
}
message.value(content).send();
numMessagesSent++;
}
}
client.close();
} catch (Exception e) {
LOG.error("Error while producing messages");
LOG.error(e.getMessage(), e);
returnCode = -1;
} finally {
LOG.info("{} messages successfully produced", numMessagesSent);
}
return returnCode;
}