下面列出了java.util.concurrent.ScheduledFuture#get ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
void shouldNotExecuteTwiceAtTheSameTime() throws ExecutionException, InterruptedException {
AtomicInteger executedTasks = new AtomicInteger();
AtomicInteger runningTasks = new AtomicInteger();
Runnable task = () -> {
assertThat(runningTasks.getAndIncrement()).isEqualTo(0);
sleep(50);
assertThat(runningTasks.decrementAndGet()).isEqualTo(0);
executedTasks.incrementAndGet();
};
ScheduledFuture<?> scheduledFuture1 = executor.schedule(new LockableRunnable(task, lockManager), 1, TimeUnit.MILLISECONDS);
ScheduledFuture<?> scheduledFuture2 = executor.schedule(new LockableRunnable(task, lockManager), 1, TimeUnit.MILLISECONDS);
scheduledFuture1.get();
scheduledFuture2.get();
// this assertion can fail if the tasks are scheduled one after another
assertThat(executedTasks.get()).isEqualTo(1);
}
@Test
public void testNewSingleThreadScheduledExecutor() throws InterruptedException {
String reason = "(can be ignored in Tests) NewSingleThreadScheduledExecutor";
Thread.UncaughtExceptionHandler handler = new TestExceptionHandler(reason);
int threadCount = 2;
latch = new CountDownLatch(threadCount);
ScheduledExecutorService exec = IoTDBThreadPoolFactory
.newSingleThreadScheduledExecutor(POOL_NAME, handler);
for (int i = 0; i < threadCount; i++) {
Runnable task = new TestThread(reason);
ScheduledFuture<?> future = exec.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS);
try {
future.get();
} catch (ExecutionException e) {
assertEquals(reason, e.getCause().getMessage());
count.addAndGet(1);
latch.countDown();
}
}
try {
latch.await();
assertEquals(count.get(), threadCount);
} catch (InterruptedException E) {
fail();
}
}
@Test
public void testNewScheduledThreadPool() throws InterruptedException {
String reason = "(can be ignored in Tests) NewScheduledThreadPool";
Thread.UncaughtExceptionHandler handler = new TestExceptionHandler(reason);
int threadCount = 4;
latch = new CountDownLatch(threadCount);
ScheduledExecutorService exec = IoTDBThreadPoolFactory
.newScheduledThreadPool(threadCount / 2, POOL_NAME, handler);
for (int i = 0; i < threadCount; i++) {
Runnable task = new TestThread(reason);
ScheduledFuture<?> future = exec.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS);
try {
future.get();
} catch (ExecutionException e) {
assertEquals(reason, e.getCause().getMessage());
count.addAndGet(1);
latch.countDown();
}
}
try {
latch.await();
assertEquals(count.get(), threadCount);
} catch (InterruptedException E) {
fail();
}
}
@Test
public void testScheduleWithDelay() throws ExecutionException, InterruptedException {
MockRunLoop runLoop = new MockRunLoop();
try {
assertEquals(1, runLoop.getThreadPool().getCorePoolSize());
ScheduledFuture future = runLoop.schedule(new Runnable() {
@Override
public void run() {
}
}, 500L);
assertEquals(1, runLoop.getThreadPool().getCorePoolSize());
future.get();
assertTrue(runLoop.errors.isEmpty());
} finally {
runLoop.getExecutorService().shutdownNow();
}
}
/**
* Tries to cancel the task that belongs to {@code actionName} submitted to the queue.
*
* @param actionName the name of action to cancel.
*/
private void tryCancelScheduledAction(String actionName) {
LOG.info("Trying to cancel the action: {}.", actionName);
ScheduledFuture scheduledFuture = futureHandles.get(actionName);
if (scheduledFuture != null && !scheduledFuture.isDone()) {
LOG.info("Attempting to cancel the future of action: {}", actionName);
// Attempt to cancel
if (!scheduledFuture.cancel(false)) {
try {
scheduledFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (Exception e) {
// we ignore the exception
LOG.warn("Cancelling the future of action: {} failed.", actionName, e);
}
}
futureHandles.remove(actionName);
}
}
@Test
public void testApplicationTaskFinishesWhenApplicationFinishes() throws Exception {
final TestingDispatcherGateway.Builder dispatcherBuilder = new TestingDispatcherGateway.Builder()
.setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
.setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.FINISHED))
.setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(createSuccessfulJobResult(jobId)));
ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap(3);
final CompletableFuture<Acknowledge> shutdownFuture =
bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), scheduledExecutor);
ScheduledFuture<?> applicationExecutionFuture = bootstrap.getApplicationExecutionFuture();
// wait until the bootstrap "thinks" it's done
shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
// make sure the task finishes
applicationExecutionFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
@Test
public void testNoSpoolDirWithWaiting() throws Exception{
DirectorySpooler.Builder builder = initializeAndGetBuilder()
.setMaxSpoolFiles(1)
.waitForPathAppearance(true);
final DirectorySpooler spooler = builder.build();
spooler.init("x2");
ScheduledExecutorService schedService = new SafeScheduledExecutorService(1, "One Time pooler");
boolean test_passed = false;
try {
Callable<Boolean> task = new Callable<Boolean>(){
public Boolean call() {
try {
return (spooler.poolForFile(intervalMillis, TimeUnit.MILLISECONDS) != null);
}
catch (InterruptedException e) {
//Task Interrupted as it did not finish the task.
}
return false;
}
};
ScheduledFuture<Boolean> test_status = schedService.schedule(task, 0, TimeUnit.MILLISECONDS);
Assert.assertTrue(spoolDir.mkdirs());
File logFile = new File(spoolDir, "x2.log").getAbsoluteFile();
new FileWriter(logFile).close();
//Wait for 10 secs at max and then report false;
test_passed = test_status.get(10000, TimeUnit.MILLISECONDS);
} finally {
spooler.destroy();
schedService.shutdownNow();
}
Assert.assertTrue("Test did not pass, Spooler did not find files", test_passed);
}
public static boolean isMaster(final HazelcastInstance hazelcastInstance, ScheduledExecutorService executor,
int delaySeconds) {
if (hazelcastInstance == null || !isOldestMember(hazelcastInstance)) {
return false;
}
try {
Callable<Boolean> callable = () -> isOldestMember(hazelcastInstance);
ScheduledFuture<Boolean> future = executor.schedule(callable, delaySeconds, TimeUnit.SECONDS);
return future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new IllegalStateException(e);
}
}
@Test
public void testScheduleRunnable() throws Exception {
final OneShotLatch latch = new OneShotLatch();
final long delay = 100L;
final long start = System.nanoTime();
ScheduledFuture<?> scheduledFuture = akkaRpcService.scheduleRunnable(latch::trigger, delay, TimeUnit.MILLISECONDS);
scheduledFuture.get();
assertTrue(latch.isTriggered());
final long stop = System.nanoTime();
assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay);
}
@Test
public void testScheduleRunnable() throws Exception {
final OneShotLatch latch = new OneShotLatch();
final long delay = 100L;
final long start = System.nanoTime();
ScheduledFuture<?> scheduledFuture = akkaRpcService.scheduleRunnable(latch::trigger, delay, TimeUnit.MILLISECONDS);
scheduledFuture.get();
assertTrue(latch.isTriggered());
final long stop = System.nanoTime();
assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay);
}
@Test(timeout = 100000)
public void testNonBlockingReadRecovery() throws Exception {
String name = "distrlog-non-blocking-reader-recovery";
final DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
confLocal.loadConf(conf);
confLocal.setReadAheadBatchSize(10);
confLocal.setReadAheadMaxRecords(10);
final DistributedLogManager dlm = createNewDLM(confLocal, name);
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
ScheduledFuture writerClosedFuture = null;
try {
final Thread currentThread = Thread.currentThread();
writerClosedFuture = executor.schedule(
new Runnable() {
@Override
public void run() {
try {
writeRecordsForNonBlockingReads(confLocal, dlm, true);
} catch (Exception exc) {
currentThread.interrupt();
}
}
}, 100, TimeUnit.MILLISECONDS);
readNonBlocking(dlm, false);
assertFalse(currentThread.isInterrupted());
} finally {
if (writerClosedFuture != null){
// ensure writer.closeAndComplete is done before we close dlm
writerClosedFuture.get();
}
executor.shutdown();
dlm.close();
}
}
@Test(timeout = 100000)
public void testNonBlockingRead() throws Exception {
String name = "distrlog-non-blocking-reader";
final DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
confLocal.loadConf(conf);
confLocal.setReadAheadBatchSize(1);
confLocal.setReadAheadMaxRecords(1);
confLocal.setReaderIdleWarnThresholdMillis(100);
final DistributedLogManager dlm = createNewDLM(confLocal, name);
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
ScheduledFuture writerClosedFuture = null;
try {
final Thread currentThread = Thread.currentThread();
writerClosedFuture = executor.schedule(
new Runnable() {
@Override
public void run() {
try {
writeRecordsForNonBlockingReads(confLocal, dlm, false);
} catch (Exception exc) {
currentThread.interrupt();
}
}
}, 100, TimeUnit.MILLISECONDS);
readNonBlocking(dlm, false);
assertFalse(currentThread.isInterrupted());
} finally {
if (writerClosedFuture != null){
// ensure writer.closeAndComplete is done before we close dlm
writerClosedFuture.get();
}
executor.shutdown();
dlm.close();
}
}
@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
ScheduledFuture<?> curr;
synchronized (this.triggerContextMonitor) {
curr = obtainCurrentFuture();
}
return curr.get(timeout, unit);
}
@Test
public void testQuiesce() throws Exception {
ProcessingTimeServiceImpl processingTimeService = new ProcessingTimeServiceImpl(timerService, v -> v);
final CompletableFuture<?> timerRunFuture = new CompletableFuture();
final OneShotLatch timerWaitLatch = new OneShotLatch();
ScheduledFuture<?> timer = processingTimeService.registerTimer(0, timestamp -> {
timerRunFuture.complete(null);
timerWaitLatch.await();
});
// wait for the timer to run, then quiesce the time service
timerRunFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
CompletableFuture<?> quiesceCompletedFuture = processingTimeService.quiesce();
// after the timer server is quiesced, tests #registerTimer() and #scheduleAtFixedRate()
assertThat(processingTimeService.registerTimer(0, timestamp -> {}), is(instanceOf(NeverCompleteFuture.class)));
assertThat(processingTimeService.scheduleAtFixedRate(
timestamp -> {}, 0, Long.MAX_VALUE), is(instanceOf(NeverCompleteFuture.class)));
// when the timer is finished, the quiesce-completed future should be completed
assertFalse(quiesceCompletedFuture.isDone());
timerWaitLatch.trigger();
timer.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
assertTrue(quiesceCompletedFuture.isDone());
}
@Override
public void schedule(Callable<?> task, long delay) {
System.out.println("Requested to schedule " + task + ", delay: " + delay);
ScheduledFuture<?> future = scheduledExecutorService.schedule(task, 1, TimeUnit.MILLISECONDS);
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
Assert.fail("Unexpected exception");
}
}
/**
* Simulate a JVM Pause by pausing the health check process
* Ensure that none of the nodes with heartbeats become Dead or Stale.
* @throws IOException
* @throws InterruptedException
* @throws AuthenticationException
*/
@Test
public void testScmHandleJvmPause()
throws IOException, InterruptedException, AuthenticationException {
final int healthCheckInterval = 200; // milliseconds
final int heartbeatInterval = 1; // seconds
final int staleNodeInterval = 3; // seconds
final int deadNodeInterval = 6; // seconds
ScheduledFuture schedFuture;
OzoneConfiguration conf = getConf();
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
healthCheckInterval, MILLISECONDS);
conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL,
heartbeatInterval, SECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL,
staleNodeInterval, SECONDS);
conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL,
deadNodeInterval, SECONDS);
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
DatanodeDetails node1 =
TestUtils.createRandomDatanodeAndRegister(nodeManager);
DatanodeDetails node2 =
TestUtils.createRandomDatanodeAndRegister(nodeManager);
nodeManager.processHeartbeat(node1);
nodeManager.processHeartbeat(node2);
// Sleep so that heartbeat processing thread gets to run.
Thread.sleep(1000);
//Assert all nodes are healthy.
assertEquals(2, nodeManager.getAllNodes().size());
assertEquals(2, nodeManager.getNodeCount(HEALTHY));
/**
* Simulate a JVM Pause and subsequent handling in following steps:
* Step 1 : stop heartbeat check process for stale node interval
* Step 2 : resume heartbeat check
* Step 3 : wait for 1 iteration of heartbeat check thread
* Step 4 : retrieve the state of all nodes - assert all are HEALTHY
* Step 5 : heartbeat for node1
* [TODO : what if there is scheduling delay of test thread in Step 5?]
* Step 6 : wait for some time to allow iterations of check process
* Step 7 : retrieve the state of all nodes - assert node2 is STALE
* and node1 is HEALTHY
*/
// Step 1 : stop health check process (simulate JVM pause)
nodeManager.pauseHealthCheck();
Thread.sleep(MILLISECONDS.convert(staleNodeInterval, SECONDS));
// Step 2 : resume health check
assertTrue("Unexpected, already skipped heartbeat checks",
(nodeManager.getSkippedHealthChecks() == 0));
schedFuture = nodeManager.unpauseHealthCheck();
// Step 3 : wait for 1 iteration of health check
try {
schedFuture.get();
assertTrue("We did not skip any heartbeat checks",
nodeManager.getSkippedHealthChecks() > 0);
} catch (ExecutionException e) {
assertEquals("Unexpected exception waiting for Scheduled Health Check",
0, 1);
}
// Step 4 : all nodes should still be HEALTHY
assertEquals(2, nodeManager.getAllNodes().size());
assertEquals(2, nodeManager.getNodeCount(HEALTHY));
// Step 5 : heartbeat for node1
nodeManager.processHeartbeat(node1);
// Step 6 : wait for health check process to run
Thread.sleep(1000);
// Step 7 : node2 should transition to STALE
assertEquals(1, nodeManager.getNodeCount(HEALTHY));
assertEquals(1, nodeManager.getNodeCount(STALE));
}
}
public static void main(String[] args) throws Exception {
CommandLineParser parser = new DefaultParser();
Options slaManagerCliOptions = constructCliOptions();
// parse the help options first.
Options helpOptions = constructHelpOptions();
CommandLine cmd = parser.parse(helpOptions, args, true);
if (cmd.hasOption("h")) {
usage(slaManagerCliOptions);
return;
}
try {
cmd = parser.parse(slaManagerCliOptions, args);
} catch (ParseException e) {
usage(slaManagerCliOptions);
throw new RuntimeException("Error parsing command line options: ", e);
}
HealthManagerMode mode = HealthManagerMode.cluster;
if (hasOption(cmd, CliArgs.MODE)) {
mode = HealthManagerMode.valueOf(getOptionValue(cmd, CliArgs.MODE));
}
Config config;
switch (mode) {
case cluster:
config = Config.toClusterMode(Config.newBuilder()
.putAll(ConfigLoader.loadClusterConfig())
.putAll(commandLineConfigs(cmd))
.build());
break;
case local:
if (!hasOption(cmd, CliArgs.HERON_HOME) || !hasOption(cmd, CliArgs.CONFIG_PATH)) {
throw new IllegalArgumentException("Missing heron_home or config_path argument");
}
String heronHome = getOptionValue(cmd, CliArgs.HERON_HOME);
String configPath = getOptionValue(cmd, CliArgs.CONFIG_PATH);
config = Config.toLocalMode(Config.newBuilder()
.putAll(ConfigLoader.loadConfig(heronHome, configPath, null, null))
.putAll(commandLineConfigs(cmd))
.build());
break;
default:
throw new IllegalArgumentException("Invalid mode: " + getOptionValue(cmd, CliArgs.MODE));
}
setupLogging(cmd, config);
LOG.fine(Arrays.toString(cmd.getOptions()));
// Add the SystemConfig into SingletonRegistry
SystemConfig systemConfig = SystemConfig.newBuilder(true)
.putAll(Context.systemFile(config), true)
.putAll(Context.overrideFile(config), true).build();
SingletonRegistry.INSTANCE.registerSingleton(SystemConfig.HERON_SYSTEM_CONFIG, systemConfig);
LOG.info("Static Heron config loaded successfully ");
LOG.fine(config.toString());
// load the default config value and override with any command line values
String metricSourceClassName = config.getStringValue(PolicyConfigKey.METRIC_SOURCE_TYPE.key());
metricSourceClassName = getOptionValue(cmd, CliArgs.METRIC_SOURCE_TYPE, metricSourceClassName);
String metricsUrl = config.getStringValue(PolicyConfigKey.METRIC_SOURCE_URL.key());
metricsUrl = getOptionValue(cmd, CliArgs.METRIC_SOURCE_URL, metricsUrl);
// metrics reporting thread
HealthManagerMetrics publishingMetrics =
new HealthManagerMetrics(Integer.valueOf(getOptionValue(cmd, CliArgs.METRICSMGR_PORT)));
AbstractModule module
= buildBaseModule(metricsUrl, metricSourceClassName, publishingMetrics);
HealthManager healthManager = new HealthManager(config, module);
LOG.info("Initializing health manager");
healthManager.initialize();
LOG.info("Starting Health Manager");
PoliciesExecutor policyExecutor = new PoliciesExecutor(healthManager.healthPolicies);
ScheduledFuture<?> future = policyExecutor.start();
LOG.info("Starting Health Manager metric posting thread");
new Thread(publishingMetrics).start();
try {
future.get();
} finally {
policyExecutor.destroy();
publishingMetrics.close();
}
}
@Test(timeout = 20000)
@Ignore
// This is a test to see if the atmos bug still exists. If the bug exists,
// the server will hang 20 secs
public void testSendAndReceiveMessagesOnAtmosphereServer() throws Exception {
final long maxTimePerRun = 5000;
final int maxRuns = 100;
bpMock.createChannel(channelId);
// createChannel(channelIdProvider);
RestAssured.baseURI = getBounceProxyBaseUri();
int index = 1;
List<byte[]> expectedPayloads = new ArrayList<byte[]>();
for (int i = 0; i < maxRuns; i++) {
expectedPayloads.clear();
long startTime_ms = System.currentTimeMillis();
ScheduledFuture<Response> longPollConsumer = bpMock.longPollInOwnThread(channelId, 30000);
byte[] postPayload = (payload + index++ + "-" + createUuidString()).getBytes(StandardCharsets.UTF_8);
expectedPayloads.add(postPayload);
ScheduledFuture<Response> postMessage = bpMock.postMessageInOwnThread(channelId, 5000, postPayload);
byte[] postPayload2 = (payload + index++ + "-" + createUuidString()).getBytes(StandardCharsets.UTF_8);
expectedPayloads.add(postPayload2);
ScheduledFuture<Response> postMessage2 = bpMock.postMessageInOwnThread(channelId, 5000, postPayload2);
// wait until the long poll returns
Response responseLongPoll = longPollConsumer.get();
byte[] responseBody = responseLongPoll.getBody().asByteArray();
List<ImmutableMessage> receivedMessages = Utilities.splitSMRF(responseBody);
// wait until the POSTs are finished.
postMessage.get();
postMessage2.get();
long elapsedTime_ms = System.currentTimeMillis() - startTime_ms;
if (receivedMessages.size() < 2 && elapsedTime_ms < maxTimePerRun) {
// Thread.sleep(100);
Response responseLongPoll2 = bpMock.longPollInOwnThread(channelId, 30000).get();
byte[] responseBody2 = responseLongPoll2.getBody().asByteArray();
List<ImmutableMessage> receivedMessages2 = Utilities.splitSMRF(responseBody2);
receivedMessages.addAll(receivedMessages2);
}
ArrayList<byte[]> payloads = new ArrayList<byte[]>();
for (ImmutableMessage message : receivedMessages) {
payloads.add(message.getUnencryptedBody());
}
elapsedTime_ms = System.currentTimeMillis() - startTime_ms;
log.info(i + ": time elapsed to send messages and return long poll:" + elapsedTime_ms);
assertThat("the long poll did not receive the messages in time", elapsedTime_ms, lessThan(maxTimePerRun));
if (payloads.size() == 2) {
assertFalse("Unresolved bug that causes duplicate messages to be sent",
payloads.get(0).equals(payloads.get(1)));
}
assertThat(payloads, hasItems(postPayload, postPayload2));
}
}
@Test
public void testScheduleRunnable() throws Exception {
assertThat(submitted.getCount()).isZero();
assertThat(running.getCount()).isZero();
assertThat(completed.getCount()).isZero();
assertThat(duration.getCount()).isZero();
assertThat(scheduledOnce.getCount()).isZero();
assertThat(scheduledRepetitively.getCount()).isZero();
assertThat(scheduledOverrun.getCount()).isZero();
assertThat(percentOfPeriod.getCount()).isZero();
ScheduledFuture<?> theFuture = instrumentedScheduledExecutor.schedule(new Runnable() {
public void run() {
assertThat(submitted.getCount()).isZero();
assertThat(running.getCount()).isEqualTo(1);
assertThat(completed.getCount()).isZero();
assertThat(duration.getCount()).isZero();
assertThat(scheduledOnce.getCount()).isEqualTo(1);
assertThat(scheduledRepetitively.getCount()).isZero();
assertThat(scheduledOverrun.getCount()).isZero();
assertThat(percentOfPeriod.getCount()).isZero();
}
}, 10L, TimeUnit.MILLISECONDS);
theFuture.get();
assertThat(submitted.getCount()).isZero();
assertThat(running.getCount()).isZero();
assertThat(completed.getCount()).isEqualTo(1);
assertThat(duration.getCount()).isEqualTo(1);
assertThat(duration.getSnapshot().size()).isEqualTo(1);
assertThat(scheduledOnce.getCount()).isEqualTo(1);
assertThat(scheduledRepetitively.getCount()).isZero();
assertThat(scheduledOverrun.getCount()).isZero();
assertThat(percentOfPeriod.getCount()).isZero();
}
@Test(timeout = 1000000)
@Ignore
// This is a test to see if sending and receiving messages at the same time
// results in duplicate messages in the long poll.
public void testSendAndReceiveMessagesConcurrently() throws Exception {
final int maxRuns = 1000;
bpMock.createChannel(channelId);
// createChannel(channelIdProvider);
RestAssured.baseURI = getBounceProxyBaseUri();
List<byte[]> expectedPayloads = new ArrayList<byte[]>();
for (int i = 0; i < maxRuns; i++) {
expectedPayloads.clear();
ScheduledFuture<Response> longPollConsumer = bpMock.longPollInOwnThread(channelId, 30000);
byte[] postPayload = (payload + i + "-" + createUuidString()).getBytes(StandardCharsets.UTF_8);
expectedPayloads.add(postPayload);
ScheduledFuture<Response> postMessage = bpMock.postMessageInOwnThread(channelId, 5000, postPayload);
// wait until the long poll returns
Response responseLongPoll = longPollConsumer.get();
byte[] responseBody = responseLongPoll.getBody().asByteArray();
List<ImmutableMessage> receivedMessages = Utilities.splitSMRF(responseBody);
// wait until the POSTs are finished.
postMessage.get();
ArrayList<byte[]> payloads = new ArrayList<byte[]>();
for (ImmutableMessage message : receivedMessages) {
payloads.add(message.getUnencryptedBody());
}
// assertFalse("Unresolved bug that causes duplicate messages to be sent", payloads.size() == 2);
// assertEquals(1, payloads.size());
assertThat(payloads, hasItems(postPayload));
}
}