com.google.common.util.concurrent.RateLimiter#acquire ( )源码实例Demo

下面列出了com.google.common.util.concurrent.RateLimiter#acquire ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: stateful-functions   文件: Simulation.java
private void createBatchOfPassengers(ThreadLocalRandom random) {
  @SuppressWarnings("UnstableApiUsage")
  RateLimiter rate = RateLimiter.create(2);
  for (int j = 0; j < passengerCount; j++) {
    rate.acquire();
    int startCell, endCell;
    do {
      startCell = random.nextInt(gridDimension * gridDimension);
      int dx = random.nextInt(-10, 10);
      int dy = random.nextInt(-10, 10);
      endCell = moveSlightly(startCell, dx, dy);
    } while (startCell == endCell);

    String id = "passenger-" + UUID.randomUUID();
    Simulatee passenger = new Passenger(communication, id, startCell, endCell);
    scheduler.add(passenger);
  }
}
 
源代码2 项目: flink-statefun   文件: Simulation.java
private void createBatchOfPassengers(ThreadLocalRandom random) {
  @SuppressWarnings("UnstableApiUsage")
  RateLimiter rate = RateLimiter.create(2);
  for (int j = 0; j < passengerCount; j++) {
    rate.acquire();
    int startCell, endCell;
    do {
      startCell = random.nextInt(gridDimension * gridDimension);
      int dx = random.nextInt(-10, 10);
      int dy = random.nextInt(-10, 10);
      endCell = moveSlightly(startCell, dx, dy);
    } while (startCell == endCell);

    String id = "passenger-" + UUID.randomUUID();
    Simulatee passenger = new Passenger(communication, id, startCell, endCell);
    scheduler.add(passenger);
  }
}
 
源代码3 项目: springboot-cloud   文件: UserController.java
@Override
public BaseResponse<UserResVO> getUserByFeignBatch(@RequestBody UserReqVO userReqVO) {
    //调用远程服务
    OrderNoReqVO vo = new OrderNoReqVO();
    vo.setReqNo(userReqVO.getReqNo());
    vo.setAppId(1L);

    RateLimiter limiter = RateLimiter.create(2.0);
    //批量调用
    for (int i = 0; i < COUNT; i++) {
        double acquire = limiter.acquire();
        logger.debug("获取令牌成功!,消耗=" + acquire);
        BaseResponse<OrderNoResVO> orderNo = orderServiceClient.getOrderNo(vo);
        logger.debug("远程返回:" + JSON.toJSONString(orderNo));
    }

    UserRes userRes = new UserRes();
    userRes.setUserId(123);
    userRes.setUserName("张三");

    userRes.setReqNo(userReqVO.getReqNo());
    userRes.setCode(StatusEnum.SUCCESS.getCode());
    userRes.setMessage("成功");

    return userRes;
}
 
源代码4 项目: disthene   文件: SumService.java
private void doFlush(Collection<Metric> metricsToFlush, RateLimiter rateLimiter) {
    logger.debug("Flushing metrics (" + metricsToFlush.size() + ")");

    if (rateLimiter != null) {
        logger.debug("QPS is limited to " + (long) rateLimiter.getRate());
    }

    for(Metric metric : metricsToFlush) {
        if (!blacklistService.isBlackListed(metric)) {
            if (!shuttingDown && rateLimiter != null) {
                rateLimiter.acquire();
            }
            bus.post(new MetricStoreEvent(metric)).now();
        }
    }
}
 
protected List<Resource> buildResourceList(RestApi api) {
    List<Resource> resourceList = new ArrayList<>();

    Resources resources = api.getResources();
    resourceList.addAll(resources.getItem());

    LOG.debug("Building list of resources. Stack trace: ", new Throwable());

    final RateLimiter rl = RateLimiter.create(2);
    while (resources._isLinkAvailable("next")) {
        rl.acquire();
        resources = resources.getNext();
        resourceList.addAll(resources.getItem());
    }

    return resourceList;
}
 
@Test
public void givenLimitedResource_whenRequestTwice_thenShouldPermitWithoutBlocking() {
    //given
    RateLimiter rateLimiter = RateLimiter.create(2);

    //when
    long startTime = ZonedDateTime.now().getSecond();
    rateLimiter.acquire(1);
    doSomeLimitedOperation();
    rateLimiter.acquire(1);
    doSomeLimitedOperation();
    long elapsedTimeSeconds = ZonedDateTime.now().getSecond() - startTime;

    //then
    assertThat(elapsedTimeSeconds <= 1);
}
 
源代码7 项目: DDMQ   文件: HttpRateLimiter.java
public double acquire(String topic, int permits) {
    RateLimiter limiter = httpRateLimiterMap.get(topic);
    if (limiter != null) {
        return limiter.acquire(permits);
    }
    return 0;
}
 
源代码8 项目: DDMQ   文件: HttpRateLimiter.java
public double acquire(String topic, int permits) {
    RateLimiter limiter = httpRateLimiterMap.get(topic);
    if (limiter != null) {
        return limiter.acquire(permits);
    }
    return 0;
}
 
源代码9 项目: micro-service   文件: ListenableFutureDemo.java
/**
 * 
 * RateLimiter类似于JDK的信号量Semphore,他用来限制对资源并发访问的线程数
 * 
 */
public static void testRateLimiter() {

	ListeningExecutorService executorService = MoreExecutors
			.listeningDecorator(Executors.newCachedThreadPool());

	RateLimiter limiter = RateLimiter.create(5.0); // 每秒不超过4个任务被提交

	for (int i = 0; i < 10; i++) {
		limiter.acquire(); // 请求RateLimiter, 超过permits会被阻塞
		final ListenableFuture<Integer> listenableFuture = executorService.submit(new Task("is " + i));
	}

}
 
源代码10 项目: tools-journey   文件: RateLimiterTest.java
public static void main(String[] args) {
    //create方法传入的是每秒生成令牌的个数
    RateLimiter rateLimiter = RateLimiter.create(1.0);
    for (int i = 0; i < 5; i++) {
        /**
         * 创建一个稳定输出令牌的RateLimiter,保证了平均每秒不超过permitsPerSecond个请求
         * 当请求到来的速度超过了permitsPerSecond,保证每秒只处理permitsPerSecond个请求
         * 当这个RateLimiter使用不足(即请求到来速度小于permitsPerSecond),会囤积最多permitsPerSecond个请求
         */
        double waitTime = rateLimiter.acquire(1);
        System.out.println(System.currentTimeMillis() / 1000 + " , " + waitTime);
    }
}
 
源代码11 项目: util4j   文件: ListenableFutureDemo.java
/**
 * RateLimiter类似于JDK的信号量Semphore,他用来限制对资源并发访问的线程数
 */
public static void testRateLimiter() {
	ListeningExecutorService executorService = MoreExecutors
			.listeningDecorator(Executors.newCachedThreadPool());
	RateLimiter limiter = RateLimiter.create(5.0); // 每秒不超过4个任务被提交
	for (int i = 0; i < 10; i++) {
		limiter.acquire(); // 请求RateLimiter, 超过permits会被阻塞
		final ListenableFuture<Integer> listenableFuture = executorService
				.submit(new Task("is " + i));
	}
}
 
源代码12 项目: distributedlog   文件: ProxyTool.java
@Override
protected int runCmd(DistributedLogClient client) throws Exception {
    RateLimiter rateLimiter = RateLimiter.create(rate);
    for (String stream : streams) {
        rateLimiter.acquire();
        try {
            Await.result(client.release(stream));
            System.out.println("Release ownership of stream " + stream);
        } catch (Exception e) {
            System.err.println("Failed to release ownership of stream " + stream);
            throw e;
        }
    }
    return 0;
}
 
源代码13 项目: emodb   文件: AstyanaxTableDAO.java
private Runnable rateLimited(Placement placement, final Runnable delegate) {
    final RateLimiter rateLimiter = _rateLimiterCache.get(placement.getKeyspace().getClusterName());
    return new Runnable() {
        @Override
        public void run() {
            rateLimiter.acquire();
            delegate.run();
        }
    };
}
 
源代码14 项目: disthene   文件: AggregateService.java
private void doFlush(Collection<Metric> metricsToFlush, RateLimiter rateLimiter) {
    // We'd like to feed metrics in a more gentle manner here but not allowing the queue to grow.

    logger.debug("Flushing rollup metrics (" + metricsToFlush.size() + ")");
    if (rateLimiter != null) {
        logger.debug("QPS is limited to " + rateLimiter.getRate());
    }

    for(Metric metric : metricsToFlush) {
        if (!shuttingDown && rateLimiter != null) {
            rateLimiter.acquire();
        }
        bus.post(new MetricStoreEvent(metric)).now();
    }
}
 
源代码15 项目: disthene   文件: RollupService.java
private void doFlush(Collection<Metric> metricsToFlush, RateLimiter rateLimiter) {
    // We'd like to feed metrics in a more gentle manner here but not allowing the queue to grow.

    logger.debug("Flushing rollup metrics (" + metricsToFlush.size() + ")");
    if (rateLimiter != null) {
        logger.debug("QPS is limited to " + (long) rateLimiter.getRate());
    }

    for(Metric metric : metricsToFlush) {
        if (!shuttingDown && rateLimiter != null) {
            rateLimiter.acquire();
        }
        bus.post(new MetricStoreEvent(metric)).now();
    }
}
 
源代码16 项目: swarm   文件: Locust.java
private void spawn(int cronCount) {
    logger.info(
            "Hatching and swarming {} clients at the rate of {} clients/s...",
            cronCount,
            this.hatchRate);
    final RateLimiter rateLimiter = RateLimiter.create(this.hatchRate);

    float weightSum = 0;
    for (Cron cron : this.prototypes) {
        weightSum += cron.getWeight();
    }

    // rescale
    for (Cron prototype : this.prototypes) {
        float percent;

        if (0 == weightSum) {
            percent = 1 / (float) this.prototypes.size();
        } else {
            percent = prototype.getWeight() / weightSum;
        }

        int amount = Math.round(cronCount * percent);
        if (weightSum == 0) {
            amount = cronCount / this.prototypes.size();
        }

        logger.info("> {}={}", prototype.getName(), amount);

        for (int i = 1; i <= amount; i++) {
            rateLimiter.acquire();
            if (isStopped()) {
                return;
            }

            Cron clone = prototype.clone();
            clone.initialize(); // initialize them first
            this.scheduler.submit(clone);
            actualNumClients.incrementAndGet();
        }
    }

    this.onHatchCompleted();
}
 
@Test(timeout = 60000)
public void testMultiReaders() throws Exception {
    String name = "distrlog-multireaders";
    final RateLimiter limiter = RateLimiter.create(1000);

    DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
    confLocal.setOutputBufferSize(0);
    confLocal.setImmediateFlushEnabled(true);

    DistributedLogManager dlmwrite = createNewDLM(confLocal, name);

    final AsyncLogWriter writer = dlmwrite.startAsyncLogSegmentNonPartitioned();
    Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(0)));
    Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1)));
    final AtomicInteger writeCount = new AtomicInteger(2);

    DistributedLogManager dlmread = createNewDLM(conf, name);

    BKSyncLogReader reader0 = (BKSyncLogReader) dlmread.getInputStream(0);

    try {
        ReaderThread[] readerThreads = new ReaderThread[1];
        readerThreads[0] = new ReaderThread("reader0-non-blocking", reader0, false);
        // readerThreads[1] = new ReaderThread("reader1-non-blocking", reader0, false);

        final AtomicBoolean running = new AtomicBoolean(true);
        Thread writerThread = new Thread("WriteThread") {
            @Override
            public void run() {
                try {
                    long txid = 2;
                    DLSN dlsn = DLSN.InvalidDLSN;
                    while (running.get()) {
                        limiter.acquire();
                        long curTxId = txid++;
                        dlsn = Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(curTxId)));
                        writeCount.incrementAndGet();
                        if (curTxId % 1000 == 0) {
                            LOG.info("writer write {}", curTxId);
                        }
                    }
                    LOG.info("Completed writing record at {}", dlsn);
                    Utils.close(writer);
                } catch (DLInterruptedException die) {
                    Thread.currentThread().interrupt();
                } catch (Exception e) {

                }
            }
        };

        for (ReaderThread rt : readerThreads) {
            rt.start();
        }

        writerThread.start();

        TimeUnit.SECONDS.sleep(5);

        LOG.info("Stopping writer");

        running.set(false);
        writerThread.join();

        LOG.info("Writer stopped after writing {} records, waiting for reader to complete",
                writeCount.get());
        while (writeCount.get() > (readerThreads[0].getReadCount())) {
            LOG.info("Write Count = {}, Read Count = {}",
                    new Object[] { writeCount.get(), readerThreads[0].getReadCount() });
            TimeUnit.MILLISECONDS.sleep(100);
        }
        assertEquals(writeCount.get(),
                (readerThreads[0].getReadCount()));

        for (ReaderThread readerThread : readerThreads) {
            readerThread.stopReading();
        }
    } finally {
        dlmwrite.close();
        reader0.close();
        dlmread.close();
    }
}
 
源代码18 项目: distributedlog   文件: RecordGenerator.java
public static void main(String[] args) throws Exception {
    if (3 != args.length) {
        System.out.println(HELP);
        return;
    }

    String finagleNameStr = args[0];
    final String streamName = args[1];
    double rate = Double.parseDouble(args[2]);
    RateLimiter limiter = RateLimiter.create(rate);

    DistributedLogClient client = DistributedLogClientBuilder.newBuilder()
            .clientId(ClientId$.MODULE$.apply("record-generator"))
            .name("record-generator")
            .thriftmux(true)
            .finagleNameStr(finagleNameStr)
            .build();

    final CountDownLatch keepAliveLatch = new CountDownLatch(1);
    final AtomicLong numWrites = new AtomicLong(0);
    final AtomicBoolean running = new AtomicBoolean(true);

    while (running.get()) {
        limiter.acquire();
        String record = "record-" + System.currentTimeMillis();
        client.write(streamName, ByteBuffer.wrap(record.getBytes(UTF_8)))
                .addEventListener(new FutureEventListener<DLSN>() {
                    @Override
                    public void onFailure(Throwable cause) {
                        System.out.println("Encountered error on writing data");
                        cause.printStackTrace(System.err);
                        running.set(false);
                        keepAliveLatch.countDown();
                    }

                    @Override
                    public void onSuccess(DLSN value) {
                        long numSuccesses = numWrites.incrementAndGet();
                        if (numSuccesses % 100 == 0) {
                            System.out.println("Write " + numSuccesses + " records.");
                        }
                    }
                });
    }

    keepAliveLatch.await();
    client.close();
}
 
源代码19 项目: stratio-cassandra   文件: BatchlogManager.java
public int replay(RateLimiter rateLimiter) throws IOException
{
    logger.debug("Replaying batch {}", id);

    List<Mutation> mutations = replayingMutations();

    if (mutations.isEmpty())
        return 0;

    int ttl = calculateHintTTL(mutations);
    if (ttl <= 0)
        return 0;

    replayHandlers = sendReplays(mutations, writtenAt, ttl);

    rateLimiter.acquire(data.remaining()); // acquire afterwards, to not mess up ttl calculation.

    return replayHandlers.size();
}
 
源代码20 项目: pulsar   文件: CmdProduce.java
private int publish(String topic) {
    int numMessagesSent = 0;
    int returnCode = 0;

    try {
        PulsarClient client = clientBuilder.build();
        ProducerBuilder<byte[]> producerBuilder = client.newProducer().topic(topic);
        if (this.chunkingAllowed) {
            producerBuilder.enableChunking(true);
            producerBuilder.enableBatching(false);
        }
        Producer<byte[]> producer = producerBuilder.create();

        List<byte[]> messageBodies = generateMessageBodies(this.messages, this.messageFileNames);
        RateLimiter limiter = (this.publishRate > 0) ? RateLimiter.create(this.publishRate) : null;

        Map<String, String> kvMap = new HashMap<>();
        for (String property : properties) {
            String [] kv = property.split("=");
            kvMap.put(kv[0], kv[1]);
        }

        for (int i = 0; i < this.numTimesProduce; i++) {
            for (byte[] content : messageBodies) {
                if (limiter != null) {
                    limiter.acquire();
                }

                TypedMessageBuilder<byte[]> message = producer.newMessage();

                if (!kvMap.isEmpty()) {
                    message.properties(kvMap);
                }

                if (key != null && !key.isEmpty()) {
                    message.key(key);
                }

                message.value(content).send();

                numMessagesSent++;
            }
        }
        client.close();
    } catch (Exception e) {
        LOG.error("Error while producing messages");
        LOG.error(e.getMessage(), e);
        returnCode = -1;
    } finally {
        LOG.info("{} messages successfully produced", numMessagesSent);
    }

    return returnCode;
}