类com.google.common.util.concurrent.Uninterruptibles源码实例Demo

下面列出了怎么用com.google.common.util.concurrent.Uninterruptibles的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hadoop-ozone   文件: AbstractFuture.java
public static <V> V getDone(Future<V> future) throws ExecutionException {
  /*
   * We throw IllegalStateException, since the call could succeed later.
   * Perhaps we "should" throw IllegalArgumentException, since the call
   * could succeed with a different argument. Those exceptions' docs
   * suggest that either is acceptable. Google's Java Practices page
   * recommends IllegalArgumentException here, in part to keep its
   * recommendation simple: Static methods should throw
   * IllegalStateException only when they use static state.
   *
   *
   * Why do we deviate here? The answer: We want for fluentFuture.getDone()
    * to throw the same exception as Futures.getDone(fluentFuture).
   */
  Preconditions.checkState(future.isDone(), "Future was expected to be " +
      "done:" +
      " %s", future);
  return Uninterruptibles.getUninterruptibly(future);
}
 
源代码2 项目: xio   文件: Http1ClientCodecUnitTest.java
@Test
public void testFullResponse() throws Exception {
  outputReceived = new CountDownLatch(1);
  ByteBuf body = ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, "response");

  FullHttpResponse responseIn = new DefaultFullHttpResponse(HTTP_1_1, OK, body);

  channel.writeInbound(responseIn);

  channel.runPendingTasks(); // blocks

  Uninterruptibles.awaitUninterruptibly(outputReceived);

  Response responseOut = responses.remove(0);

  assertTrue(responseOut != null);
  assertTrue(responseOut instanceof FullResponse);
  assertEquals("HTTP/1.1", responseOut.version());
  assertEquals(OK, responseOut.status());
  assertTrue(responseOut.hasBody());
  assertFalse(responseOut.body() == null);
  assertEquals(body, responseOut.body());
}
 
源代码3 项目: streams   文件: TheDataGroup.java
@Override
public DemographicsAppendResponse appendDemographics(AppendRequest request) {
  try {
    RestCall call = restClient
            .doPost(baseUrl() + "sync/append/demographics")
            .body(request)
            .ignoreErrors();
    String responseJson = call.getResponseAsString();
    DemographicsAppendResponse response = parser.parse(responseJson, DemographicsAppendResponse.class);
    return response;
  } catch( Exception e ) {
    LOGGER.error("Exception", e);
    return new DemographicsAppendResponse();
  } finally {
    Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
  }
}
 
源代码4 项目: activejpa   文件: JpaThreadFactoryTest.java
@Test
public void shouldCloseOpenTransactionOnException() {
	final DummyModel model = new DummyModel("value1", "value2", "value3");
	model.persist();
	final AtomicReference<EntityTransaction> transaction = new AtomicReference<>();
	Runnable runnable = new Runnable() {
		@Override
		public void run() {
			transaction.set(DummyModel.beginTxn());
			throw new RuntimeException();
		}
	};
	Thread thread = new ActiveJpaThreadFactory().newThread(runnable);
	thread.run();
	Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
	assertFalse(transaction.get().isActive());
}
 
源代码5 项目: openAGV   文件: StandardKernel.java
@Override
public void run() {
  // Wait until terminated.
  terminationSemaphore.acquireUninterruptibly();
  LOG.info("Terminating...");
  // Sleep a bit so clients have some time to receive an event for the
  // SHUTDOWN state change and shut down gracefully themselves.
  Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
  // Shut down all kernel extensions.
  LOG.debug("Shutting down kernel extensions...");
  for (KernelExtension extension : kernelExtensions) {
    extension.terminate();
  }
  kernelExecutor.shutdown();
  LOG.info("Kernel thread finished.");
}
 
源代码6 项目: twill   文件: Hadoop21YarnNMClient.java
@Override
public void cancel() {
  LOG.info("Request to stop container {}.", container.getId());

  try {
    nmClient.stopContainer(container.getId(), container.getNodeId());
    while (true) {
      ContainerStatus status = nmClient.getContainerStatus(container.getId(), container.getNodeId());
      LOG.trace("Container status: {} {}", status, status.getDiagnostics());
      if (status.getState() == ContainerState.COMPLETE) {
        break;
      }
      Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
    }
    LOG.info("Container {} stopped.", container.getId());
  } catch (Exception e) {
    LOG.error("Fail to stop container {}", container.getId(), e);
    throw Throwables.propagate(e);
  }
}
 
源代码7 项目: openAGV   文件: LoopbackCommunicationAdapter.java
/**
 * Simulates an operation.
 * 模拟操作
 *
 * @param operation A operation
 * @throws InterruptedException If an exception occured while simulating
 */
private void simulateOperation(String operation) {
  requireNonNull(operation, "operation");

  if (isTerminated()) {
    return;
  }

  LOG.debug("Operating...");
  final int operatingTime = getProcessModel().getOperatingTime();
  getProcessModel().setVehicleState(Vehicle.State.EXECUTING);
  for (int timePassed = 0; timePassed < operatingTime && !isTerminated();
       timePassed += simAdvanceTime) {
    Uninterruptibles.sleepUninterruptibly(ADVANCE_TIME, TimeUnit.MILLISECONDS);
    getProcessModel().getVelocityController().advanceTime(simAdvanceTime);
  }
  if (operation.equals(getProcessModel().getLoadOperation())) {
    // Update load handling devices as defined by this operation
    getProcessModel().setVehicleLoadHandlingDevices(
        Arrays.asList(new LoadHandlingDevice(LHD_NAME, true)));
  }
  else if (operation.equals(getProcessModel().getUnloadOperation())) {
    getProcessModel().setVehicleLoadHandlingDevices(
        Arrays.asList(new LoadHandlingDevice(LHD_NAME, false)));
  }
}
 
源代码8 项目: streams   文件: TheDataGroup.java
@Override
public LookupResponse lookupIp(IpLookupRequest request) {
  try {
    RestCall call = restClient
            .doPost(baseUrl() + "sync/lookup/ip")
            .body(request)
            .ignoreErrors();
    String responseJson = call.getResponseAsString();
    LookupResponse response = parser.parse(responseJson, LookupResponse.class);
    return response;
  } catch( Exception e ) {
    LOGGER.error("Exception", e);
    return new LookupResponse();
  } finally {
    Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
  }
}
 
源代码9 项目: caffeine   文件: CacheLoadingTest.java
/**
 * On a successful concurrent computation, only one thread does the work, but all the threads get
 * the same result.
 */
private static void testConcurrentLoadingDefault(Caffeine<Object, Object> builder)
    throws InterruptedException {

  int count = 10;
  final AtomicInteger callCount = new AtomicInteger();
  final CountDownLatch startSignal = new CountDownLatch(count + 1);
  final Object result = new Object();

  LoadingCache<String, Object> cache = CaffeinatedGuava.build(builder,
      new CacheLoader<String, Object>() {
        @Override public Object load(String key) {
          callCount.incrementAndGet();
          assertTrue(Uninterruptibles.awaitUninterruptibly(startSignal, 300, TimeUnit.SECONDS));
          return result;
        }
      });

  List<Object> resultArray = doConcurrentGet(cache, "bar", count, startSignal);

  assertEquals(1, callCount.get());
  for (int i = 0; i < count; i++) {
    assertSame("result(" + i + ") didn't match expected", result, resultArray.get(i));
  }
}
 
源代码10 项目: xio   文件: Http2ClientCodecUnitTest.java
@Test
public void testFullResponse() throws Exception {
  outputReceived = new CountDownLatch(1);
  Http2Headers headers = new DefaultHttp2Headers().status("200");
  Http2Response responseIn = Http2Response.build(1, headers, true);

  channel.writeInbound(responseIn);

  channel.runPendingTasks(); // blocks

  Uninterruptibles.awaitUninterruptibly(outputReceived);

  Response responseOut = responses.remove(0);

  assertTrue(responseOut != null);
  assertTrue(responseOut instanceof FullResponse);
  assertEquals("h2", responseOut.version());
  assertEquals(OK, responseOut.status());
  assertFalse(responseOut.hasBody());
  assertEquals(1, responseOut.streamId());
}
 
@Test
public void releaseOnCancellation() {
    // Setup server
    startServer((req, observer) -> {
        Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
        observer.onNext("delayed_response");
        observer.onCompleted();
    });

    ListenableFuture<String> future = ClientCalls.futureUnaryCall(channel.newCall(METHOD_DESCRIPTOR, CallOptions.DEFAULT), "foo");
    Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
    future.cancel(true);

    // Verify
    Mockito.verify(limiter, Mockito.times(1)).acquire(Mockito.isA(GrpcServerRequestContext.class));
    Mockito.verify(listener.getResult().get(), Mockito.times(0)).onIgnore();

    Mockito.verify(listener.getResult().get(), Mockito.timeout(2000).times(1)).onSuccess();

    verifyCounts(0, 0, 1, 0);
}
 
@Nullable
public SourceChange setWidgetPropertyValue(int propertyId, FlutterWidgetPropertyValue value) {
  final CountDownLatch latch = new CountDownLatch(1);
  final AtomicReference<SourceChange> result = new AtomicReference<>();
  final String id = analysisService.generateUniqueId();
  synchronized (responseConsumers) {
    responseConsumers.put(id, (resultObject) -> {
      try {
        final JsonObject propertiesObject = resultObject.getAsJsonObject("change");
        result.set(SourceChange.fromJson(propertiesObject));
      }
      catch (Throwable ignored) {
      }
      latch.countDown();
    });
  }

  final JsonObject request = FlutterRequestUtilities.generateFlutterSetWidgetPropertyValue(id, propertyId, value);
  analysisService.sendRequest(id, request);

  Uninterruptibles.awaitUninterruptibly(latch, 100, TimeUnit.MILLISECONDS);
  return result.get();
}
 
源代码13 项目: streams   文件: TheDataGroup.java
@Override
public MobileAppendResponse appendMobile(AppendRequest request) {
  try {
    RestCall call = restClient
            .doPost(baseUrl() + "sync/append/mobile")
            .body(request)
            .ignoreErrors();
    String responseJson = call.getResponseAsString();
    MobileAppendResponse response = parser.parse(responseJson, MobileAppendResponse.class);
    return response;
  } catch( Exception e ) {
    LOGGER.error("Exception", e);
    return new MobileAppendResponse();
  } finally {
    Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
  }
}
 
源代码14 项目: bazel   文件: UnixGlob.java
List<Path> globUninterruptible(
    Path base,
    Collection<String> patterns,
    boolean excludeDirectories,
    Predicate<Path> dirPred,
    FilesystemCalls syscalls)
    throws IOException, BadPattern {
  try {
    return Uninterruptibles.getUninterruptibly(
        globAsync(base, patterns, excludeDirectories, dirPred, syscalls));
  } catch (ExecutionException e) {
    Throwable cause = e.getCause();
    Throwables.propagateIfPossible(cause, IOException.class);
    Throwables.propagateIfPossible(cause, BadPattern.class);
    throw new RuntimeException(e);
  }
}
 
源代码15 项目: neural   文件: AdjustableRateLimiter.java
static SleepingStopwatch createFromSystemTimer() {
    return new SleepingStopwatch() {
        final Stopwatch stopwatch = Stopwatch.createStarted();

        @Override
        protected long readMicros() {
            return stopwatch.elapsed(MICROSECONDS);
        }

        @Override
        protected void sleepMicrosUninterruptibly(long micros) {
            if (micros > 0) {
                Uninterruptibles.sleepUninterruptibly(micros, MICROSECONDS);
            }
        }
    };
}
 
源代码16 项目: streams   文件: Sprinklr.java
@Override
public PartnerAccountsResponse getPartnerAccounts() {
  try {
    ObjectMap requestMap = new ObjectMap();
    requestMap.put("types", "PARTNER_ACCOUNTS");
    RestCall call = restClient
        .doGet(baseUrl() + "v1/bootstrap/resources")
        .queryIfNE(requestMap)
        .ignoreErrors();
    String responseJson = call.getResponseAsString();
    PartnerAccountsResponse response = parser.parse(responseJson, PartnerAccountsResponse.class);
    return response;
  } catch (Exception e) {
    LOGGER.error("Exception", e);
    return new PartnerAccountsResponse();
  } finally {
    Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
  }
}
 
源代码17 项目: conductor   文件: TestSQSObservableQueue.java
@Test
public void test() {

    List<Message> messages = new LinkedList<>();
    Observable.range(0, 10).forEach((Integer x) -> messages.add(new Message("" + x, "payload: " + x, null)));
    assertEquals(10, messages.size());

    SQSObservableQueue queue = mock(SQSObservableQueue.class);
    when(queue.getOrCreateQueue()).thenReturn("junit_queue_url");
    Answer<?> answer = (Answer<List<Message>>) invocation -> Collections.emptyList();
    when(queue.receiveMessages()).thenReturn(messages).thenAnswer(answer);
    when(queue.getOnSubscribe()).thenCallRealMethod();
    when(queue.observe()).thenCallRealMethod();

    List<Message> found = new LinkedList<>();
    Observable<Message> observable = queue.observe();
    assertNotNull(observable);
    observable.subscribe(found::add);

    Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);

    assertEquals(messages.size(), found.size());
    assertEquals(messages, found);
}
 
源代码18 项目: streams   文件: FullContact.java
@Override
public EnrichPersonResponse enrichPerson(EnrichPersonRequest request) {
  try {
    String requestJson = serializer.serialize(request);
    RestCall call = restClient
            .doPost(baseUrl() + "person.enrich")
            .accept("application/json")
            .contentType("application/json")
            .ignoreErrors()
            .body(new StringReader(requestJson));
    String responseJson = call.getResponseAsString();
    EnrichPersonResponse response = parser.parse(responseJson, EnrichPersonResponse.class);
    return response;
  } catch( Exception e ) {
    LOGGER.error("Exception", e);
    return new EnrichPersonResponse().withMessage(e.getMessage());
  } finally {
    Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
  }
}
 
源代码19 项目: yangtools   文件: QueuedNotificationManagerTest.java
@Test(timeout = 10000)
public void testNotificationsWithListenerJVMError() {

    final CountDownLatch errorCaughtLatch = new CountDownLatch(1);
    queueExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>()) {
        @Override
        @SuppressWarnings("checkstyle:illegalCatch")
        public void execute(final Runnable command) {
            super.execute(() -> {
                try {
                    command.run();
                } catch (Error e) {
                    errorCaughtLatch.countDown();
                }
            });
        }
    };

    NotificationManager<TestListener<Integer>, Integer> manager = QueuedNotificationManager.create(queueExecutor,
            new TestNotifier<>(), 10, "TestMgr");

    TestListener<Integer> listener = new TestListener<>(2, 1);
    listener.jvmError = mock(Error.class);

    manager.submitNotification(listener, 1);

    assertTrue("JVM Error caught", Uninterruptibles.awaitUninterruptibly(errorCaughtLatch, 5, TimeUnit.SECONDS));

    manager.submitNotification(listener, 2);

    listener.verifyNotifications();
    List<Runnable> tasks = queueExecutor.shutdownNow();
    assertTrue(tasks.isEmpty());
}
 
源代码20 项目: grpc-nebula-java   文件: DetailErrorSample.java
void asyncCall() {
  GreeterStub stub = GreeterGrpc.newStub(channel);
  HelloRequest request = HelloRequest.newBuilder().build();
  final CountDownLatch latch = new CountDownLatch(1);
  StreamObserver<HelloReply> responseObserver = new StreamObserver<HelloReply>() {

    @Override
    public void onNext(HelloReply value) {
      // Won't be called.
    }

    @Override
    public void onError(Throwable t) {
      verifyErrorReply(t);
      latch.countDown();
    }

    @Override
    public void onCompleted() {
      // Won't be called, since the server in this example always fails.
    }
  };
  stub.sayHello(request, responseObserver);

  if (!Uninterruptibles.awaitUninterruptibly(latch, 1, TimeUnit.SECONDS)) {
    throw new RuntimeException("timeout!");
  }
}
 
源代码21 项目: bitherj   文件: Threading.java
/**
 * Put a dummy task into the queue and wait for it to be run. Because it's single threaded, this means all
 * tasks submitted before this point are now completed. Usually you won't want to use this method - it's a
 * convenience primarily used in unit testing. If you want to wait for an event to be called the right thing
 * to do is usually to create a {@link com.google.common.util.concurrent.SettableFuture} and then call set
 * on it. You can then either block on that future, compose it, add listeners to it and so on.
 */
public static void waitForUserCode() {
    final CountDownLatch latch = new CountDownLatch(1);
    USER_THREAD.execute(new Runnable() {
        @Override
        public void run() {
            latch.countDown();
        }
    });
    Uninterruptibles.awaitUninterruptibly(latch);
}
 
源代码22 项目: bcm-android   文件: Threading.java
/**
 * Put a dummy task into the queue and wait for it to be run. Because it's single threaded, this means all
 * tasks submitted before this point are now completed. Usually you won't want to use this method - it's a
 * convenience primarily used in unit testing. If you want to wait for an event to be called the right thing
 * to do is usually to create a {@link com.google.common.util.concurrent.SettableFuture} and then call set
 * on it. You can then either block on that future, compose it, add listeners to it and so on.
 */
public static void waitForUserCode() {
    final CountDownLatch latch = new CountDownLatch(1);
    USER_THREAD.execute(new Runnable() {
        @Override
        public void run() {
            latch.countDown();
        }
    });
    Uninterruptibles.awaitUninterruptibly(latch);
}
 
源代码23 项目: bcm-android   文件: Threading.java
@Override
public void execute(Runnable command) {
    final int size = tasks.size();
    if (size == WARNING_THRESHOLD) {
        log.warn(
                "User thread has {} pending tasks, memory exhaustion may occur.\n" +
                        "If you see this message, check your memory consumption and see if it's problematic or excessively spikey.\n" +
                        "If it is, check for deadlocked or slow event handlers. If it isn't, try adjusting the constant \n" +
                        "Threading.UserThread.WARNING_THRESHOLD upwards until it's a suitable level for your app, or Integer.MAX_VALUE to disable.", size);
    }
    Uninterruptibles.putUninterruptibly(tasks, command);
}
 
源代码24 项目: hedera-mirror-node   文件: DatabaseUtilities.java
public static final Connection getConnection() {
    while (true) {
        if (ShutdownHelper.isStopping()) {
            throw new IllegalStateException("Shutting down");
        }

        try {
            return dataSource.getConnection();
        } catch (Exception e) {
            log.warn("Unable to connect to database: {}", e.getMessage());
            // Don't use 100% CPU if DataSource is null or can't connect
            Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
        }
    }
}
 
源代码25 项目: android-uiconductor   文件: AndroidDeviceDriver.java
public void startXmlDumperServer() throws UicdExternalCommandException {
  xmlDumperProcess =
      adbCommandLineUtil.startXmlDumperServer(
          getDeviceId(), device.getXmlDumperHostPort(), device.getXmlDumperDevicePort());

  // Sleep 5 seconds to make sure
  Uninterruptibles.sleepUninterruptibly(TIME_TO_START_XML_DUMPER_SERVER);
  isXmlDumperStarted = true;
}
 
源代码26 项目: armeria   文件: BraveIntegrationTest.java
MutableSpan[] take(int numSpans) {
    final List<MutableSpan> taken = new ArrayList<>();
    while (taken.size() < numSpans) {
        taken.add(Uninterruptibles.takeUninterruptibly(spans));
    }

    // Reverse the collected spans to sort the spans by request time.
    Collections.reverse(taken);
    return taken.toArray(new MutableSpan[numSpans]);
}
 
/**
 * Simple socket content reader from given container:port
 *
 * @param container to query
 * @param port      to send request to
 * @return socket reader content
 * @throws IOException if any
 */
private String readResponse(GenericContainer container, Integer port) throws IOException {
    try (
        final BufferedReader reader = Unreliables.retryUntilSuccess(10, TimeUnit.SECONDS,
            () -> {
                Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
                final Socket socket = new Socket(container.getHost(), port);
                return new BufferedReader(new InputStreamReader(socket.getInputStream()));
            }
        )
    ) {
        return reader.readLine();
    }
}
 
源代码28 项目: judgels   文件: PlaySessionResource.java
@GET
@Path("/client-login/{authCode}/{redirectUri}")
@UnitOfWork
public Response serviceLogIn(
        @Context UriInfo uriInfo,
        @PathParam("authCode") String authCode,
        @PathParam("redirectUri") String redirectUri) {

    Optional<Session> session = Optional.empty();

    // Hack: wait until the auth code actually got written to db
    for (int i = 0; i < 3; i++) {
        session = sessionStore.getSessionByAuthCode(authCode);
        if (session.isPresent()) {
            break;
        }
        Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
    }
    if (!session.isPresent()) {
        throw new IllegalArgumentException();
    }

    sessionStore.deleteAuthCode(authCode);

    return Response.seeOther(URI.create(redirectUri))
            .cookie(new NewCookie(
                    COOKIE_NAME,
                    session.get().getToken(),
                    "/",
                    uriInfo.getBaseUri().getHost(),
                    null,
                    (int) Duration.ofDays(7).getSeconds(),
                    false,
                    true))
            .build();
}
 
源代码29 项目: pxf   文件: ConnectionManager.java
ConnectionManager(DataSourceFactory factory, Ticker ticker, long sleepIntervalNanos) {
    this.datasourceClosingExecutor = Executors.newCachedThreadPool();
    this.dataSources = CacheBuilder.newBuilder()
            .ticker(ticker)
            .expireAfterAccess(POOL_EXPIRATION_TIMEOUT_HOURS, TimeUnit.HOURS)
            .removalListener(RemovalListeners.asynchronous((RemovalListener<PoolDescriptor, HikariDataSource>) notification ->
                    {
                        HikariDataSource hds = notification.getValue();
                        LOG.debug("Processing cache removal of pool {} for server {} and user {} with cause {}",
                                hds.getPoolName(),
                                notification.getKey().getServer(),
                                notification.getKey().getUser(),
                                notification.getCause().toString());
                        // if connection pool has been removed from the cache while active query is executing
                        // wait until all connections finish execution and become idle, but no longer that CLEANUP_TIMEOUT
                        long startTime = ticker.read();
                        while (hds.getHikariPoolMXBean().getActiveConnections() > 0) {
                            if ((ticker.read() - startTime) > CLEANUP_TIMEOUT_NANOS) {
                                LOG.warn("Pool {} has active connections for too long, destroying it", hds.getPoolName());
                                break;
                            }
                            Uninterruptibles.sleepUninterruptibly(sleepIntervalNanos, TimeUnit.NANOSECONDS);
                        }
                        LOG.debug("Destroying the pool {}", hds.getPoolName());
                        hds.close();
                    }
                    , datasourceClosingExecutor))
            .build(CacheLoader.from(key -> factory.createDataSource(key)));
}
 
源代码30 项目: streams   文件: BasicTasksTest.java
@Test
public void testProviderTask() {
  int numMessages = 100;
  NumericMessageProvider provider = new NumericMessageProvider(numMessages);
  StreamsProviderTask task = new StreamsProviderTask(provider, false, null);
  BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
  task.addOutputQueue(outQueue);
  //Test that adding input queues to providers is not valid
  BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
  Exception exp = null;
  try {
    task.addInputQueue(inQueue);
  } catch (UnsupportedOperationException uoe) {
    exp = uoe;
  }
  assertNotNull(exp);

  ExecutorService service = Executors.newFixedThreadPool(1);
  service.submit(task);
  int attempts = 0;
  while(outQueue.size() != numMessages) {
    Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
    if(attempts == 10) {
      fail("Provider task failed to output "+numMessages+" in a timely fashion.");
    }
  }
  service.shutdown();
  try {
    if(!service.awaitTermination(10, TimeUnit.SECONDS)){
      service.shutdownNow();
      fail("Service did not terminate.");
    }
    assertTrue("Task should have completed running in allotted time.", service.isTerminated());
  } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
  }
}
 
 同包方法