java.util.concurrent.ScheduledFuture#get ( )源码实例Demo

下面列出了java.util.concurrent.ScheduledFuture#get ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: ShedLock   文件: ReentrantLockProviderTest.java
@Test
void shouldNotExecuteTwiceAtTheSameTime() throws ExecutionException, InterruptedException {
    AtomicInteger executedTasks = new AtomicInteger();
    AtomicInteger runningTasks = new AtomicInteger();

    Runnable task = () -> {
        assertThat(runningTasks.getAndIncrement()).isEqualTo(0);
        sleep(50);
        assertThat(runningTasks.decrementAndGet()).isEqualTo(0);
        executedTasks.incrementAndGet();
    };
    ScheduledFuture<?> scheduledFuture1 = executor.schedule(new LockableRunnable(task, lockManager), 1, TimeUnit.MILLISECONDS);
    ScheduledFuture<?> scheduledFuture2 = executor.schedule(new LockableRunnable(task, lockManager), 1, TimeUnit.MILLISECONDS);
    scheduledFuture1.get();
    scheduledFuture2.get();

    // this assertion can fail if the tasks are scheduled one after another
    assertThat(executedTasks.get()).isEqualTo(1);
}
 
@Test
public void testNewSingleThreadScheduledExecutor() throws InterruptedException {
  String reason = "(can be ignored in Tests) NewSingleThreadScheduledExecutor";
  Thread.UncaughtExceptionHandler handler = new TestExceptionHandler(reason);
  int threadCount = 2;
  latch = new CountDownLatch(threadCount);
  ScheduledExecutorService exec = IoTDBThreadPoolFactory
      .newSingleThreadScheduledExecutor(POOL_NAME, handler);
  for (int i = 0; i < threadCount; i++) {
    Runnable task = new TestThread(reason);
    ScheduledFuture<?> future = exec.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS);
    try {
      future.get();
    } catch (ExecutionException e) {
      assertEquals(reason, e.getCause().getMessage());
      count.addAndGet(1);
      latch.countDown();
    }
  }
  try {
    latch.await();
    assertEquals(count.get(), threadCount);
  } catch (InterruptedException E) {
    fail();
  }
}
 
@Test
public void testNewScheduledThreadPool() throws InterruptedException {
  String reason = "(can be ignored in Tests) NewScheduledThreadPool";
  Thread.UncaughtExceptionHandler handler = new TestExceptionHandler(reason);
  int threadCount = 4;
  latch = new CountDownLatch(threadCount);
  ScheduledExecutorService exec = IoTDBThreadPoolFactory
      .newScheduledThreadPool(threadCount / 2, POOL_NAME, handler);
  for (int i = 0; i < threadCount; i++) {
    Runnable task = new TestThread(reason);
    ScheduledFuture<?> future = exec.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS);
    try {
      future.get();
    } catch (ExecutionException e) {
      assertEquals(reason, e.getCause().getMessage());
      count.addAndGet(1);
      latch.countDown();
    }
  }
  try {
    latch.await();
    assertEquals(count.get(), threadCount);
  } catch (InterruptedException E) {
    fail();
  }
}
 
源代码4 项目: firebase-admin-java   文件: DefaultRunLoopTest.java
@Test
public void testScheduleWithDelay() throws ExecutionException, InterruptedException {
  MockRunLoop runLoop = new MockRunLoop();
  try {
    assertEquals(1, runLoop.getThreadPool().getCorePoolSize());
    ScheduledFuture future = runLoop.schedule(new Runnable() {
      @Override
      public void run() {
      }
    }, 500L);
    assertEquals(1, runLoop.getThreadPool().getCorePoolSize());

    future.get();
    assertTrue(runLoop.errors.isEmpty());
  } finally {
    runLoop.getExecutorService().shutdownNow();
  }
}
 
源代码5 项目: samza   文件: ScheduleAfterDebounceTime.java
/**
 * Tries to cancel the task that belongs to {@code actionName} submitted to the queue.
 *
 * @param actionName the name of action to cancel.
 */
private void tryCancelScheduledAction(String actionName) {
  LOG.info("Trying to cancel the action: {}.", actionName);
  ScheduledFuture scheduledFuture = futureHandles.get(actionName);
  if (scheduledFuture != null && !scheduledFuture.isDone()) {
    LOG.info("Attempting to cancel the future of action: {}", actionName);
    // Attempt to cancel
    if (!scheduledFuture.cancel(false)) {
      try {
        scheduledFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
      } catch (Exception e) {
        // we ignore the exception
        LOG.warn("Cancelling the future of action: {} failed.", actionName, e);
      }
    }
    futureHandles.remove(actionName);
  }
}
 
@Test
public void testApplicationTaskFinishesWhenApplicationFinishes() throws Exception {
	final TestingDispatcherGateway.Builder dispatcherBuilder = new TestingDispatcherGateway.Builder()
			.setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
			.setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.FINISHED))
			.setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(createSuccessfulJobResult(jobId)));

	ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap(3);

	final CompletableFuture<Acknowledge> shutdownFuture =
			bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), scheduledExecutor);

	ScheduledFuture<?> applicationExecutionFuture = bootstrap.getApplicationExecutionFuture();

	// wait until the bootstrap "thinks" it's done
	shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);

	// make sure the task finishes
	applicationExecutionFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
 
@Test
public void testNoSpoolDirWithWaiting() throws Exception{
  DirectorySpooler.Builder builder = initializeAndGetBuilder()
      .setMaxSpoolFiles(1)
      .waitForPathAppearance(true);

  final DirectorySpooler spooler = builder.build();
  spooler.init("x2");
  ScheduledExecutorService schedService = new SafeScheduledExecutorService(1, "One Time pooler");
  boolean test_passed = false;
  try {
    Callable<Boolean> task = new Callable<Boolean>(){
      public Boolean call() {
        try {
          return (spooler.poolForFile(intervalMillis, TimeUnit.MILLISECONDS) != null);
        }
        catch (InterruptedException e) {
          //Task Interrupted as it did not finish the task.
        }
        return false;
      }
    };
    ScheduledFuture<Boolean> test_status = schedService.schedule(task, 0, TimeUnit.MILLISECONDS);
    Assert.assertTrue(spoolDir.mkdirs());

    File logFile = new File(spoolDir, "x2.log").getAbsoluteFile();
    new FileWriter(logFile).close();
    //Wait for 10 secs at max and then report false;
    test_passed = test_status.get(10000, TimeUnit.MILLISECONDS);

  } finally {
    spooler.destroy();
    schedService.shutdownNow();
  }
  Assert.assertTrue("Test did not pass, Spooler did not find files", test_passed);
}
 
源代码8 项目: hazelcast-simulator   文件: HazelcastUtils.java
public static boolean isMaster(final HazelcastInstance hazelcastInstance, ScheduledExecutorService executor,
                               int delaySeconds) {
    if (hazelcastInstance == null || !isOldestMember(hazelcastInstance)) {
        return false;
    }
    try {
        Callable<Boolean> callable = () -> isOldestMember(hazelcastInstance);
        ScheduledFuture<Boolean> future = executor.schedule(callable, delaySeconds, TimeUnit.SECONDS);
        return future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
    } catch (InterruptedException | ExecutionException | TimeoutException e) {
        throw new IllegalStateException(e);
    }
}
 
源代码9 项目: Flink-CEPplus   文件: AkkaRpcServiceTest.java
@Test
public void testScheduleRunnable() throws Exception {
	final OneShotLatch latch = new OneShotLatch();
	final long delay = 100L;
	final long start = System.nanoTime();

	ScheduledFuture<?> scheduledFuture = akkaRpcService.scheduleRunnable(latch::trigger, delay, TimeUnit.MILLISECONDS);

	scheduledFuture.get();

	assertTrue(latch.isTriggered());
	final long stop = System.nanoTime();

	assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay);
}
 
源代码10 项目: flink   文件: AkkaRpcServiceTest.java
@Test
public void testScheduleRunnable() throws Exception {
	final OneShotLatch latch = new OneShotLatch();
	final long delay = 100L;
	final long start = System.nanoTime();

	ScheduledFuture<?> scheduledFuture = akkaRpcService.scheduleRunnable(latch::trigger, delay, TimeUnit.MILLISECONDS);

	scheduledFuture.get();

	assertTrue(latch.isTriggered());
	final long stop = System.nanoTime();

	assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay);
}
 
源代码11 项目: distributedlog   文件: TestNonBlockingReads.java
@Test(timeout = 100000)
public void testNonBlockingReadRecovery() throws Exception {
    String name = "distrlog-non-blocking-reader-recovery";
    final DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
    confLocal.loadConf(conf);
    confLocal.setReadAheadBatchSize(10);
    confLocal.setReadAheadMaxRecords(10);
    final DistributedLogManager dlm = createNewDLM(confLocal, name);
    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
    ScheduledFuture writerClosedFuture = null;
    try {
        final Thread currentThread = Thread.currentThread();
        writerClosedFuture = executor.schedule(
                new Runnable() {
                    @Override
                    public void run() {
                        try {
                            writeRecordsForNonBlockingReads(confLocal, dlm, true);
                        } catch (Exception exc) {
                            currentThread.interrupt();
                        }

                    }
                }, 100, TimeUnit.MILLISECONDS);


        readNonBlocking(dlm, false);
        assertFalse(currentThread.isInterrupted());
    } finally {
        if (writerClosedFuture != null){
            // ensure writer.closeAndComplete is done before we close dlm
            writerClosedFuture.get();
        }
        executor.shutdown();
        dlm.close();
    }
}
 
源代码12 项目: distributedlog   文件: TestNonBlockingReads.java
@Test(timeout = 100000)
public void testNonBlockingRead() throws Exception {
    String name = "distrlog-non-blocking-reader";
    final DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
    confLocal.loadConf(conf);
    confLocal.setReadAheadBatchSize(1);
    confLocal.setReadAheadMaxRecords(1);
    confLocal.setReaderIdleWarnThresholdMillis(100);
    final DistributedLogManager dlm = createNewDLM(confLocal, name);
    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
    ScheduledFuture writerClosedFuture = null;
    try {
        final Thread currentThread = Thread.currentThread();
        writerClosedFuture = executor.schedule(
                new Runnable() {
                    @Override
                    public void run() {
                        try {
                            writeRecordsForNonBlockingReads(confLocal, dlm, false);
                        } catch (Exception exc) {
                            currentThread.interrupt();
                        }

                    }
                }, 100, TimeUnit.MILLISECONDS);

        readNonBlocking(dlm, false);
        assertFalse(currentThread.isInterrupted());
    } finally {
        if (writerClosedFuture != null){
            // ensure writer.closeAndComplete is done before we close dlm
            writerClosedFuture.get();
        }
        executor.shutdown();
        dlm.close();
    }
}
 
@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
	ScheduledFuture<?> curr;
	synchronized (this.triggerContextMonitor) {
		curr = obtainCurrentFuture();
	}
	return curr.get(timeout, unit);
}
 
源代码14 项目: flink   文件: ProcessingTimeServiceImplTest.java
@Test
public void testQuiesce() throws Exception {
	ProcessingTimeServiceImpl processingTimeService = new ProcessingTimeServiceImpl(timerService, v -> v);

	final CompletableFuture<?> timerRunFuture = new CompletableFuture();
	final OneShotLatch timerWaitLatch = new OneShotLatch();

	ScheduledFuture<?> timer = processingTimeService.registerTimer(0, timestamp -> {
		timerRunFuture.complete(null);
		timerWaitLatch.await();
	});

	// wait for the timer to run, then quiesce the time service
	timerRunFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
	CompletableFuture<?> quiesceCompletedFuture = processingTimeService.quiesce();

	// after the timer server is quiesced, tests #registerTimer() and #scheduleAtFixedRate()
	assertThat(processingTimeService.registerTimer(0, timestamp -> {}), is(instanceOf(NeverCompleteFuture.class)));
	assertThat(processingTimeService.scheduleAtFixedRate(
		timestamp -> {}, 0, Long.MAX_VALUE), is(instanceOf(NeverCompleteFuture.class)));

	// when the timer is finished, the quiesce-completed future should be completed
	assertFalse(quiesceCompletedFuture.isDone());
	timerWaitLatch.trigger();
	timer.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
	assertTrue(quiesceCompletedFuture.isDone());
}
 
源代码15 项目: scheduling   文件: MockSchedulingInfrastructure.java
@Override
public void schedule(Callable<?> task, long delay) {
    System.out.println("Requested to schedule " + task + ", delay: " + delay);
    ScheduledFuture<?> future = scheduledExecutorService.schedule(task, 1, TimeUnit.MILLISECONDS);
    try {
        future.get();
    } catch (Exception e) {
        e.printStackTrace();
        Assert.fail("Unexpected exception");
    }
}
 
源代码16 项目: hadoop-ozone   文件: TestSCMNodeManager.java
/**
 * Simulate a JVM Pause by pausing the health check process
 * Ensure that none of the nodes with heartbeats become Dead or Stale.
 * @throws IOException
 * @throws InterruptedException
 * @throws AuthenticationException
 */
@Test
public void testScmHandleJvmPause()
    throws IOException, InterruptedException, AuthenticationException {
  final int healthCheckInterval = 200; // milliseconds
  final int heartbeatInterval = 1; // seconds
  final int staleNodeInterval = 3; // seconds
  final int deadNodeInterval = 6; // seconds
  ScheduledFuture schedFuture;

  OzoneConfiguration conf = getConf();
  conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
      healthCheckInterval, MILLISECONDS);
  conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL,
      heartbeatInterval, SECONDS);
  conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL,
      staleNodeInterval, SECONDS);
  conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL,
      deadNodeInterval, SECONDS);

  try (SCMNodeManager nodeManager = createNodeManager(conf)) {
    DatanodeDetails node1 =
        TestUtils.createRandomDatanodeAndRegister(nodeManager);
    DatanodeDetails node2 =
        TestUtils.createRandomDatanodeAndRegister(nodeManager);

    nodeManager.processHeartbeat(node1);
    nodeManager.processHeartbeat(node2);

    // Sleep so that heartbeat processing thread gets to run.
    Thread.sleep(1000);

    //Assert all nodes are healthy.
    assertEquals(2, nodeManager.getAllNodes().size());
    assertEquals(2, nodeManager.getNodeCount(HEALTHY));

    /**
     * Simulate a JVM Pause and subsequent handling in following steps:
     * Step 1 : stop heartbeat check process for stale node interval
     * Step 2 : resume heartbeat check
     * Step 3 : wait for 1 iteration of heartbeat check thread
     * Step 4 : retrieve the state of all nodes - assert all are HEALTHY
     * Step 5 : heartbeat for node1
     * [TODO : what if there is scheduling delay of test thread in Step 5?]
     * Step 6 : wait for some time to allow iterations of check process
     * Step 7 : retrieve the state of all nodes -  assert node2 is STALE
     * and node1 is HEALTHY
     */

    // Step 1 : stop health check process (simulate JVM pause)
    nodeManager.pauseHealthCheck();
    Thread.sleep(MILLISECONDS.convert(staleNodeInterval, SECONDS));

    // Step 2 : resume health check
    assertTrue("Unexpected, already skipped heartbeat checks",
        (nodeManager.getSkippedHealthChecks() == 0));
    schedFuture = nodeManager.unpauseHealthCheck();

    // Step 3 : wait for 1 iteration of health check
    try {
      schedFuture.get();
      assertTrue("We did not skip any heartbeat checks",
          nodeManager.getSkippedHealthChecks() > 0);
    } catch (ExecutionException e) {
      assertEquals("Unexpected exception waiting for Scheduled Health Check",
          0, 1);
    }

    // Step 4 : all nodes should still be HEALTHY
    assertEquals(2, nodeManager.getAllNodes().size());
    assertEquals(2, nodeManager.getNodeCount(HEALTHY));

    // Step 5 : heartbeat for node1
    nodeManager.processHeartbeat(node1);

    // Step 6 : wait for health check process to run
    Thread.sleep(1000);

    // Step 7 : node2 should transition to STALE
    assertEquals(1, nodeManager.getNodeCount(HEALTHY));
    assertEquals(1, nodeManager.getNodeCount(STALE));
  }
}
 
源代码17 项目: incubator-heron   文件: HealthManager.java
public static void main(String[] args) throws Exception {
  CommandLineParser parser = new DefaultParser();
  Options slaManagerCliOptions = constructCliOptions();

  // parse the help options first.
  Options helpOptions = constructHelpOptions();
  CommandLine cmd = parser.parse(helpOptions, args, true);
  if (cmd.hasOption("h")) {
    usage(slaManagerCliOptions);
    return;
  }

  try {
    cmd = parser.parse(slaManagerCliOptions, args);
  } catch (ParseException e) {
    usage(slaManagerCliOptions);
    throw new RuntimeException("Error parsing command line options: ", e);
  }

  HealthManagerMode mode = HealthManagerMode.cluster;
  if (hasOption(cmd, CliArgs.MODE)) {
    mode = HealthManagerMode.valueOf(getOptionValue(cmd, CliArgs.MODE));
  }

  Config config;
  switch (mode) {
    case cluster:
      config = Config.toClusterMode(Config.newBuilder()
          .putAll(ConfigLoader.loadClusterConfig())
          .putAll(commandLineConfigs(cmd))
          .build());
      break;

    case local:
      if (!hasOption(cmd, CliArgs.HERON_HOME) || !hasOption(cmd, CliArgs.CONFIG_PATH)) {
        throw new IllegalArgumentException("Missing heron_home or config_path argument");
      }
      String heronHome = getOptionValue(cmd, CliArgs.HERON_HOME);
      String configPath = getOptionValue(cmd, CliArgs.CONFIG_PATH);
      config = Config.toLocalMode(Config.newBuilder()
          .putAll(ConfigLoader.loadConfig(heronHome, configPath, null, null))
          .putAll(commandLineConfigs(cmd))
          .build());
      break;

    default:
      throw new IllegalArgumentException("Invalid mode: " + getOptionValue(cmd, CliArgs.MODE));
  }

  setupLogging(cmd, config);

  LOG.fine(Arrays.toString(cmd.getOptions()));

  // Add the SystemConfig into SingletonRegistry
  SystemConfig systemConfig = SystemConfig.newBuilder(true)
      .putAll(Context.systemFile(config), true)
      .putAll(Context.overrideFile(config), true).build();
  SingletonRegistry.INSTANCE.registerSingleton(SystemConfig.HERON_SYSTEM_CONFIG, systemConfig);

  LOG.info("Static Heron config loaded successfully ");
  LOG.fine(config.toString());

  // load the default config value and override with any command line values
  String metricSourceClassName = config.getStringValue(PolicyConfigKey.METRIC_SOURCE_TYPE.key());
  metricSourceClassName = getOptionValue(cmd, CliArgs.METRIC_SOURCE_TYPE, metricSourceClassName);

  String metricsUrl = config.getStringValue(PolicyConfigKey.METRIC_SOURCE_URL.key());
  metricsUrl = getOptionValue(cmd, CliArgs.METRIC_SOURCE_URL, metricsUrl);

  // metrics reporting thread
  HealthManagerMetrics publishingMetrics =
      new HealthManagerMetrics(Integer.valueOf(getOptionValue(cmd, CliArgs.METRICSMGR_PORT)));

  AbstractModule module
      = buildBaseModule(metricsUrl, metricSourceClassName, publishingMetrics);
  HealthManager healthManager = new HealthManager(config, module);

  LOG.info("Initializing health manager");
  healthManager.initialize();

  LOG.info("Starting Health Manager");
  PoliciesExecutor policyExecutor = new PoliciesExecutor(healthManager.healthPolicies);
  ScheduledFuture<?> future = policyExecutor.start();

  LOG.info("Starting Health Manager metric posting thread");
  new Thread(publishingMetrics).start();

  try {
    future.get();
  } finally {
    policyExecutor.destroy();
    publishingMetrics.close();
  }
}
 
源代码18 项目: joynr   文件: AbstractBounceProxyServerTest.java
@Test(timeout = 20000)
@Ignore
// This is a test to see if the atmos bug still exists. If the bug exists,
// the server will hang 20 secs
public void testSendAndReceiveMessagesOnAtmosphereServer() throws Exception {

    final long maxTimePerRun = 5000;
    final int maxRuns = 100;

    bpMock.createChannel(channelId);
    // createChannel(channelIdProvider);

    RestAssured.baseURI = getBounceProxyBaseUri();

    int index = 1;
    List<byte[]> expectedPayloads = new ArrayList<byte[]>();

    for (int i = 0; i < maxRuns; i++) {
        expectedPayloads.clear();

        long startTime_ms = System.currentTimeMillis();
        ScheduledFuture<Response> longPollConsumer = bpMock.longPollInOwnThread(channelId, 30000);

        byte[] postPayload = (payload + index++ + "-" + createUuidString()).getBytes(StandardCharsets.UTF_8);
        expectedPayloads.add(postPayload);
        ScheduledFuture<Response> postMessage = bpMock.postMessageInOwnThread(channelId, 5000, postPayload);

        byte[] postPayload2 = (payload + index++ + "-" + createUuidString()).getBytes(StandardCharsets.UTF_8);
        expectedPayloads.add(postPayload2);
        ScheduledFuture<Response> postMessage2 = bpMock.postMessageInOwnThread(channelId, 5000, postPayload2);

        // wait until the long poll returns
        Response responseLongPoll = longPollConsumer.get();
        byte[] responseBody = responseLongPoll.getBody().asByteArray();
        List<ImmutableMessage> receivedMessages = Utilities.splitSMRF(responseBody);

        // wait until the POSTs are finished.
        postMessage.get();
        postMessage2.get();

        long elapsedTime_ms = System.currentTimeMillis() - startTime_ms;

        if (receivedMessages.size() < 2 && elapsedTime_ms < maxTimePerRun) {
            // Thread.sleep(100);
            Response responseLongPoll2 = bpMock.longPollInOwnThread(channelId, 30000).get();
            byte[] responseBody2 = responseLongPoll2.getBody().asByteArray();
            List<ImmutableMessage> receivedMessages2 = Utilities.splitSMRF(responseBody2);
            receivedMessages.addAll(receivedMessages2);
        }

        ArrayList<byte[]> payloads = new ArrayList<byte[]>();
        for (ImmutableMessage message : receivedMessages) {
            payloads.add(message.getUnencryptedBody());
        }

        elapsedTime_ms = System.currentTimeMillis() - startTime_ms;
        log.info(i + ": time elapsed to send messages and return long poll:" + elapsedTime_ms);
        assertThat("the long poll did not receive the messages in time", elapsedTime_ms, lessThan(maxTimePerRun));

        if (payloads.size() == 2) {
            assertFalse("Unresolved bug that causes duplicate messages to be sent",
                        payloads.get(0).equals(payloads.get(1)));
        }
        assertThat(payloads, hasItems(postPayload, postPayload2));
    }

}
 
@Test
public void testScheduleRunnable() throws Exception {
    assertThat(submitted.getCount()).isZero();

    assertThat(running.getCount()).isZero();
    assertThat(completed.getCount()).isZero();
    assertThat(duration.getCount()).isZero();

    assertThat(scheduledOnce.getCount()).isZero();
    assertThat(scheduledRepetitively.getCount()).isZero();
    assertThat(scheduledOverrun.getCount()).isZero();
    assertThat(percentOfPeriod.getCount()).isZero();

    ScheduledFuture<?> theFuture = instrumentedScheduledExecutor.schedule(new Runnable() {
        public void run() {
            assertThat(submitted.getCount()).isZero();

            assertThat(running.getCount()).isEqualTo(1);
            assertThat(completed.getCount()).isZero();
            assertThat(duration.getCount()).isZero();

            assertThat(scheduledOnce.getCount()).isEqualTo(1);
            assertThat(scheduledRepetitively.getCount()).isZero();
            assertThat(scheduledOverrun.getCount()).isZero();
            assertThat(percentOfPeriod.getCount()).isZero();
        }
    }, 10L, TimeUnit.MILLISECONDS);

    theFuture.get();

    assertThat(submitted.getCount()).isZero();

    assertThat(running.getCount()).isZero();
    assertThat(completed.getCount()).isEqualTo(1);
    assertThat(duration.getCount()).isEqualTo(1);
    assertThat(duration.getSnapshot().size()).isEqualTo(1);

    assertThat(scheduledOnce.getCount()).isEqualTo(1);
    assertThat(scheduledRepetitively.getCount()).isZero();
    assertThat(scheduledOverrun.getCount()).isZero();
    assertThat(percentOfPeriod.getCount()).isZero();
}
 
源代码20 项目: joynr   文件: AbstractBounceProxyServerTest.java
@Test(timeout = 1000000)
@Ignore
// This is a test to see if sending and receiving messages at the same time
// results in duplicate messages in the long poll.
public void testSendAndReceiveMessagesConcurrently() throws Exception {

    final int maxRuns = 1000;

    bpMock.createChannel(channelId);
    // createChannel(channelIdProvider);

    RestAssured.baseURI = getBounceProxyBaseUri();

    List<byte[]> expectedPayloads = new ArrayList<byte[]>();

    for (int i = 0; i < maxRuns; i++) {

        expectedPayloads.clear();

        ScheduledFuture<Response> longPollConsumer = bpMock.longPollInOwnThread(channelId, 30000);

        byte[] postPayload = (payload + i + "-" + createUuidString()).getBytes(StandardCharsets.UTF_8);
        expectedPayloads.add(postPayload);
        ScheduledFuture<Response> postMessage = bpMock.postMessageInOwnThread(channelId, 5000, postPayload);

        // wait until the long poll returns
        Response responseLongPoll = longPollConsumer.get();
        byte[] responseBody = responseLongPoll.getBody().asByteArray();
        List<ImmutableMessage> receivedMessages = Utilities.splitSMRF(responseBody);

        // wait until the POSTs are finished.
        postMessage.get();

        ArrayList<byte[]> payloads = new ArrayList<byte[]>();
        for (ImmutableMessage message : receivedMessages) {
            payloads.add(message.getUnencryptedBody());
        }

        // assertFalse("Unresolved bug that causes duplicate messages to be sent", payloads.size() == 2);
        // assertEquals(1, payloads.size());
        assertThat(payloads, hasItems(postPayload));
    }

}