java.util.concurrent.LinkedBlockingQueue#add ( )源码实例Demo

下面列出了java.util.concurrent.LinkedBlockingQueue#add ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: distributedlog   文件: DLAuditor.java
private Map<String, Long> calculateStreamSpaceUsage(
        final URI uri, final Namespace namespace)
    throws IOException {
    Iterator<String> streams = namespace.getLogs();
    final LinkedBlockingQueue<String> streamQueue = new LinkedBlockingQueue<String>();
    while (streams.hasNext()) {
        streamQueue.add(streams.next());
    }

    final Map<String, Long> streamSpaceUsageMap =
            new ConcurrentSkipListMap<String, Long>();
    final AtomicInteger numStreamsCollected = new AtomicInteger(0);

    executeAction(streamQueue, 10, new Action<String>() {
        @Override
        public void execute(String stream) throws IOException {
            streamSpaceUsageMap.put(stream,
                    calculateStreamSpaceUsage(namespace, stream));
            if (numStreamsCollected.incrementAndGet() % 1000 == 0) {
                logger.info("Calculated {} streams from uri {}.", numStreamsCollected.get(), uri);
            }
        }
    });

    return streamSpaceUsageMap;
}
 
源代码2 项目: distributedlog   文件: DLAuditor.java
private void collectLedgersFromDL(final URI uri,
                                  final Namespace namespace,
                                  final Set<Long> ledgers) throws IOException {
    logger.info("Enumerating {} to collect streams.", uri);
    Iterator<String> streams = namespace.getLogs();
    final LinkedBlockingQueue<String> streamQueue = new LinkedBlockingQueue<String>();
    while (streams.hasNext()) {
        streamQueue.add(streams.next());
    }

    logger.info("Collected {} streams from uri {} : {}",
                new Object[] { streamQueue.size(), uri, streams });

    executeAction(streamQueue, 10, new Action<String>() {
        @Override
        public void execute(String stream) throws IOException {
            collectLedgersFromStream(namespace, stream, ledgers);
        }
    });
}
 
源代码3 项目: cyclops   文件: ConnectableTest.java
@Test
public void backpressureScheduledCron(){

    captured= "";

       diff =  System.currentTimeMillis();
      LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(1);
      blockingQueue.add("10");
      blockingQueue.offer("10");
      ReactiveSeq.range(0, Integer.MAX_VALUE)
          .limit(2)
          .peek(v-> diff = System.currentTimeMillis())
          .map(i -> i.toString())
          .schedule("* * * * * ?", scheduled)
          .connect(blockingQueue)
          .onePer(2, TimeUnit.SECONDS)
          .peek(i->System.out.println("BQ " + blockingQueue))
          .peek(System.out::println)
          .forEach(c->captured=c);

      assertThat(System.currentTimeMillis() - diff,greaterThan(1500l));
}
 
源代码4 项目: sdl_java_suite   文件: ChoiceSetManagerTests.java
public void testDismissingQueuedKeyboard(){
	Integer testCancelID = 42;

	// Currently executing operation
	PresentKeyboardOperation testKeyboardOp = mock(PresentKeyboardOperation.class);
	doReturn(true).when(testKeyboardOp).isExecuting();
	doReturn(96).when(testKeyboardOp).getCancelID();
	csm.currentlyPresentedKeyboardOperation = testKeyboardOp;

	// Queued operations
	PresentKeyboardOperation testKeyboardOp2 = mock(PresentKeyboardOperation.class);
	doReturn(true).when(testKeyboardOp2).isExecuting();
	doReturn(testCancelID).when(testKeyboardOp2).getCancelID();
	LinkedBlockingQueue<Runnable> testOperationQueue = new LinkedBlockingQueue<>();
	testOperationQueue.add(testKeyboardOp2);
	csm.operationQueue = testOperationQueue;

	// Queued operation should be canceled
	csm.dismissKeyboard(testCancelID);
	verify(testKeyboardOp, times(0)).dismissKeyboard();
	verify(testKeyboardOp2, times(1)).dismissKeyboard();
}
 
源代码5 项目: openjdk-jdk9   文件: LinkedBlockingQueueTest.java
/**
 * drainTo(c) empties queue into another collection c
 */
public void testDrainTo() {
    LinkedBlockingQueue q = populatedQueue(SIZE);
    ArrayList l = new ArrayList();
    q.drainTo(l);
    assertEquals(0, q.size());
    assertEquals(SIZE, l.size());
    for (int i = 0; i < SIZE; ++i)
        assertEquals(l.get(i), new Integer(i));
    q.add(zero);
    q.add(one);
    assertFalse(q.isEmpty());
    assertTrue(q.contains(zero));
    assertTrue(q.contains(one));
    l.clear();
    q.drainTo(l);
    assertEquals(0, q.size());
    assertEquals(2, l.size());
    for (int i = 0; i < 2; ++i)
        assertEquals(l.get(i), new Integer(i));
}
 
源代码6 项目: openjdk-jdk9   文件: LinkedBlockingQueueTest.java
/**
 * offer transfers elements across Executor tasks
 */
public void testOfferInExecutor() {
    final LinkedBlockingQueue q = new LinkedBlockingQueue(2);
    q.add(one);
    q.add(two);
    final CheckedBarrier threadsStarted = new CheckedBarrier(2);
    final ExecutorService executor = Executors.newFixedThreadPool(2);
    try (PoolCleaner cleaner = cleaner(executor)) {
        executor.execute(new CheckedRunnable() {
            public void realRun() throws InterruptedException {
                assertFalse(q.offer(three));
                threadsStarted.await();
                assertTrue(q.offer(three, LONG_DELAY_MS, MILLISECONDS));
                assertEquals(0, q.remainingCapacity());
            }});

        executor.execute(new CheckedRunnable() {
            public void realRun() throws InterruptedException {
                threadsStarted.await();
                assertSame(one, q.take());
            }});
    }
}
 
源代码7 项目: phoenix   文件: ScannerLeaseRenewalTest.java
@Test
public void testRenewLeaseTaskBehaviorOnError() throws Exception {
    // add connection to the queue
    PhoenixConnection pconn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class);
    LinkedBlockingQueue<WeakReference<PhoenixConnection>> connectionsQueue = new LinkedBlockingQueue<>();
    connectionsQueue.add(new WeakReference<PhoenixConnection>(pconn));
    
    // create a scanner and add it to the queue
    int numLeaseRenewals = 4;
    int lockNotAcquiredAt = 1;
    int thresholdNotReachedCount = 2;
    int failLeaseRenewalAt = 3;
    RenewLeaseOnlyTableIterator itr = new RenewLeaseOnlyTableIterator(numLeaseRenewals, thresholdNotReachedCount, lockNotAcquiredAt, failLeaseRenewalAt);
    LinkedBlockingQueue<WeakReference<TableResultIterator>> scannerQueue = pconn.getScanners();
    scannerQueue.add(new WeakReference<TableResultIterator>(itr));
    
    RenewLeaseTask task = new RenewLeaseTask(connectionsQueue);
    assertTrue(connectionsQueue.size() == 1);
    assertTrue(scannerQueue.size() == 1);
    
    task.run();
    assertTrue(connectionsQueue.size() == 1); 
    assertTrue(scannerQueue.size() == 1); // lock not acquired
    assertEquals(LOCK_NOT_ACQUIRED, itr.getLastRenewLeaseStatus());
    
    task.run();
    assertTrue(scannerQueue.size() == 1);
    assertTrue(connectionsQueue.size() == 1); // renew lease skipped but scanner still in the queue
    assertEquals(THRESHOLD_NOT_REACHED, itr.getLastRenewLeaseStatus());
    
    task.run();
    assertTrue(scannerQueue.size() == 0);
    assertTrue(connectionsQueue.size() == 0); // there was only one connection in the connectionsQueue and it wasn't added back because of error
    
    pconn.close();
    task.run();
    assertTrue(scannerQueue.size() == 0);
    assertTrue("Closing the connection should have removed it from the queue", connectionsQueue.size() == 0);
}
 
源代码8 项目: flink   文件: ChannelWriterOutputView.java
/**
 * Creates an new ChannelWriterOutputView that writes to the given channel and buffers data
 * in the given memory segments. If the given memory segments are null, the writer takes its buffers
 * directly from the return queue of the writer. Note that this variant locks if no buffers are contained
 * in the return queue.
 * 
 * @param writer The writer to write to.
 * @param memory The memory used to buffer data, or null, to utilize solely the return queue.
 * @param segmentSize The size of the memory segments.
 */
public ChannelWriterOutputView(BlockChannelWriter<MemorySegment> writer, List<MemorySegment> memory, int segmentSize) {
	super(segmentSize, HEADER_LENGTH);
	
	if (writer == null) {
		throw new NullPointerException();
	}
	
	this.writer = writer;
	
	if (memory == null) {
		this.numSegments = 0;
	} else {
		this.numSegments = memory.size();
		// load the segments into the queue
		final LinkedBlockingQueue<MemorySegment> queue = writer.getReturnQueue();
		for (int i = memory.size() - 1; i >= 0; --i) {
			final MemorySegment seg = memory.get(i);
			if (seg.size() != segmentSize) {
				throw new IllegalArgumentException("The supplied memory segments are not of the specified size.");
			}
			queue.add(seg);
		}
	}
	
	// get the first segment
	try {
		advance();
	}
	catch (IOException ioex) {
		throw new RuntimeException("BUG: IOException occurred while getting first block for ChannelWriterOutputView.", ioex);
	}
}
 
源代码9 项目: incubator-crail   文件: CrailUtils.java
public static URI getPrimaryNameNode() {
	StringTokenizer tupleTokenizer = new StringTokenizer(CrailConstants.NAMENODE_ADDRESS, ",");
	LinkedBlockingQueue<URI> namenodes = new LinkedBlockingQueue<URI>();
	while(tupleTokenizer.hasMoreTokens()){
		String address = tupleTokenizer.nextToken();
		URI uri = URI.create(address);
		namenodes.add(uri);
	}
	
	URI master = namenodes.poll();
	return master;
}
 
源代码10 项目: j2objc   文件: LinkedBlockingQueueTest.java
/**
 * iterator ordering is FIFO
 */
public void testIteratorOrdering() {
    final LinkedBlockingQueue q = new LinkedBlockingQueue(3);
    q.add(one);
    q.add(two);
    q.add(three);
    assertEquals(0, q.remainingCapacity());
    int k = 0;
    for (Iterator it = q.iterator(); it.hasNext();) {
        assertEquals(++k, it.next());
    }
    assertEquals(3, k);
}
 
源代码11 项目: openjdk-jdk9   文件: LinkedBlockingQueueTest.java
/**
 * Queue transitions from empty to full when elements added
 */
public void testEmptyFull() {
    LinkedBlockingQueue q = new LinkedBlockingQueue(2);
    assertTrue(q.isEmpty());
    assertEquals("should have room for 2", 2, q.remainingCapacity());
    q.add(one);
    assertFalse(q.isEmpty());
    q.add(two);
    assertFalse(q.isEmpty());
    assertEquals(0, q.remainingCapacity());
    assertFalse(q.offer(three));
}
 
源代码12 项目: j2objc   文件: LinkedBlockingQueueTest.java
/**
 * clear removes all elements
 */
public void testClear() {
    LinkedBlockingQueue q = populatedQueue(SIZE);
    q.clear();
    assertTrue(q.isEmpty());
    assertEquals(0, q.size());
    assertEquals(SIZE, q.remainingCapacity());
    q.add(one);
    assertFalse(q.isEmpty());
    assertTrue(q.contains(one));
    q.clear();
    assertTrue(q.isEmpty());
}
 
源代码13 项目: crail   文件: CrailUtils.java
public static URI getPrimaryNameNode() {
	StringTokenizer tupleTokenizer = new StringTokenizer(CrailConstants.NAMENODE_ADDRESS, ",");
	LinkedBlockingQueue<URI> namenodes = new LinkedBlockingQueue<URI>();
	while(tupleTokenizer.hasMoreTokens()){
		String address = tupleTokenizer.nextToken();
		URI uri = URI.create(address);
		namenodes.add(uri);
	}
	
	URI master = namenodes.poll();
	return master;
}
 
源代码14 项目: MiniWeChat-Server   文件: ServerModel_Chatting.java
/**
 * 往消息队列中添加一条未接收的消息
 * 
 * @param chatting
 * @author Feng
 */
public void addChatting(Chatting chatting) {
	LinkedBlockingQueue<Chatting> chattingQueue;

	if (!chattingHashtable.containsKey(chatting.getReceiverUserId())) {
		chattingQueue = new LinkedBlockingQueue<Chatting>();
		chattingHashtable.put(chatting.getReceiverUserId() + "", chattingQueue);
	} else
		chattingQueue = chattingHashtable.get(chatting.getReceiverUserId());

	chattingQueue.add(chatting);
}
 
源代码15 项目: ReactionDecoder   文件: MCSSThread.java
private synchronized LinkedBlockingQueue<IAtomContainer> singleSolution() {

        LOGGER.debug("Calling MCSSTask " + taskNumber + " with " + mcssList.size() + " items");
        LinkedBlockingQueue<IAtomContainer> mcss = new LinkedBlockingQueue<>();
        long startTime = getInstance().getTimeInMillis();
        IAtomContainer querySeed = mcssList.get(0);
        long calcTime = startTime;

        try {
            for (int index = 1; index < mcssList.size(); index++) {
                IAtomContainer target = removeHydrogens(mcssList.get(index));
                Collection<Fragment> fragmentsFomMCS;
                BaseMapping comparison;

                comparison = new Isomorphism(querySeed, target, DEFAULT, atomMatcher, bondMatcher);
                comparison.setChemFilters(true, true, true);
                fragmentsFomMCS = getMCSS(comparison);

                LOGGER.debug("comparison for task " + taskNumber + " has " + fragmentsFomMCS.size()
                        + " unique matches of size " + comparison.getFirstAtomMapping().getCount());
                LOGGER.debug("MCSS for task " + taskNumber + " has " + querySeed.getAtomCount() + " atoms, and " + querySeed.getBondCount() + " bonds");
                LOGGER.debug("Target for task " + taskNumber + " has " + target.getAtomCount() + " atoms, and " + target.getBondCount() + " bonds");
                long endCalcTime = getInstance().getTimeInMillis();
                LOGGER.debug("Task " + taskNumber + " index " + index + " took " + (endCalcTime - calcTime) + "ms");
                calcTime = endCalcTime;

                if (fragmentsFomMCS.isEmpty()) {
                    break;
                }
                querySeed = fragmentsFomMCS.iterator().next().getContainer();
            }

            if (querySeed != null) {
                mcss.add(querySeed);
                long endTime = getInstance().getTimeInMillis();
                LOGGER.debug("Done: task " + taskNumber + " took " + (endTime - startTime) + "ms");
                LOGGER.debug(" and mcss has " + querySeed.getAtomCount() + " atoms, and " + querySeed.getBondCount() + " bonds");
            }
        } catch (Exception e) {
            LOGGER.error("ERROR IN MCS Thread: ", e.getMessage());
        }
        return mcss;
    }
 
源代码16 项目: distributedlog   文件: DistributedLogTool.java
@Override
protected int runCmd() throws Exception {
    String rootPath = getUri().getPath() + "/" + allocationPoolPath;
    final ScheduledExecutorService allocationExecutor = Executors.newSingleThreadScheduledExecutor();
    ExecutorService executorService = Executors.newFixedThreadPool(concurrency);
    checkArgument(getNamespace() instanceof BKDistributedLogNamespace);
    BKDistributedLogNamespace bkns = (BKDistributedLogNamespace) getNamespace();
    final ZooKeeperClient zkc = ((BKNamespaceDriver) bkns.getNamespaceDriver()).getWriterZKC();
    final BookKeeperClient bkc = ((BKNamespaceDriver) bkns.getNamespaceDriver()).getReaderBKC();
    try {
        List<String> pools = zkc.get().getChildren(rootPath, false);
        final LinkedBlockingQueue<String> poolsToDelete = new LinkedBlockingQueue<String>();
        if (getForce() || IOUtils.confirmPrompt("Are you sure you want to delete allocator pools : " + pools)) {
            for (String pool : pools) {
                poolsToDelete.add(rootPath + "/" + pool);
            }
            final CountDownLatch doneLatch = new CountDownLatch(concurrency);
            for (int i = 0; i < concurrency; i++) {
                final int tid = i;
                executorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        while (!poolsToDelete.isEmpty()) {
                            String poolPath = poolsToDelete.poll();
                            if (null == poolPath) {
                                break;
                            }
                            try {
                                LedgerAllocator allocator =
                                        LedgerAllocatorUtils.createLedgerAllocatorPool(poolPath, 0, getConf(),
                                                zkc, bkc,
                                                allocationExecutor);
                                allocator.delete();
                                System.out.println("Deleted allocator pool : " + poolPath + " .");
                            } catch (IOException ioe) {
                                System.err.println("Failed to delete allocator pool "
                                        + poolPath + " : " + ioe.getMessage());
                            }
                        }
                        doneLatch.countDown();
                        System.out.println("Thread " + tid + " is done.");
                    }
                });
            }
            doneLatch.await();
        }
    } finally {
        executorService.shutdown();
        allocationExecutor.shutdown();
    }
    return 0;
}
 
源代码17 项目: mysql_perf_analyzer   文件: AlertScanner.java
public void scan()
{
	Set<String> clusternames = frameworkContext.getDbInfoManager().getMyDatabases(appUser.getName(), false).getMyDbList();
	logger.info("Start scan alerts");
	LinkedBlockingQueue<DBInstanceInfo> dbqueue = new LinkedBlockingQueue<DBInstanceInfo>();
	for(String cl: clusternames)
	{
		DBCredential cred = DBUtils.findDBCredential(frameworkContext, cl, appUser);
		if(cred==null)
		{
			logger.info("No credential for group "+cl+", skip it");
			continue;//log the error
		}
		DBGroupInfo cls = frameworkContext.getDbInfoManager().findGroup(cl);
		if(cls==null)
		{
			logger.info("Group "+cl+" might have been deleted.");
			continue;
		}
		
		for(DBInstanceInfo dbinfo: cls.getInstances())
		{
			dbqueue.add(dbinfo);
		}
	}
		
	int mythreadcnt = this.threadCount;
	if(dbqueue.size()<mythreadcnt)mythreadcnt = dbqueue.size();
	Thread th[] = new Thread[mythreadcnt];
	for(int i=0;i<mythreadcnt;i++)
	{
		AlertScannerRunner runner = new 
				AlertScannerRunner(frameworkContext,
					dbqueue,
					appUser);
		th[i] = new Thread(runner);
		th[i].setName("AlertScannerRunner - "+i);
		th[i].start();
	}
	for(int i=0;i<th.length;i++)try{th[i].join();}catch(Exception ex){}

	logger.info("Done alert scanner");
	this.frameworkContext.getAutoScanner().getMetricDb().flush();//notify persistent store
}
 
源代码18 项目: localization_nifi   文件: TestConsumeMqttCommon.java
@Test
public void testOnStoppedFinish() throws Exception {
    testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");

    testRunner.assertValid();

    MqttMessage innerMessage = new MqttMessage();
    innerMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()).array());
    innerMessage.setQos(2);
    MQTTQueueMessage testMessage = new MQTTQueueMessage("testTopic", innerMessage);

    ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
    consumeMQTT.onScheduled(testRunner.getProcessContext());
    reconnect(consumeMQTT);

    Thread.sleep(PUBLISH_WAIT_MS);

    assertTrue(isConnected(consumeMQTT));

    consumeMQTT.processSessionFactory = testRunner.getProcessSessionFactory();

    Field f = ConsumeMQTT.class.getDeclaredField("mqttQueue");
    f.setAccessible(true);
    LinkedBlockingQueue<MQTTQueueMessage> queue = (LinkedBlockingQueue<MQTTQueueMessage>) f.get(consumeMQTT);
    queue.add(testMessage);

    consumeMQTT.onUnscheduled(testRunner.getProcessContext());
    consumeMQTT.onStopped(testRunner.getProcessContext());

    testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
    assertProvenanceEvents(1);

    List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
    MockFlowFile flowFile = flowFiles.get(0);

    flowFile.assertContentEquals("testMessage");
    flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
    flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
    flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
    flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
    flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
 
源代码19 项目: nifi   文件: TestConsumeMqttCommon.java
@Test
public void testOnStoppedFinish() throws Exception {
    testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");

    testRunner.assertValid();

    MqttMessage innerMessage = new MqttMessage();
    innerMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()).array());
    innerMessage.setQos(2);
    MQTTQueueMessage testMessage = new MQTTQueueMessage("testTopic", innerMessage);

    ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
    consumeMQTT.onScheduled(testRunner.getProcessContext());
    reconnect(consumeMQTT, testRunner.getProcessContext());

    Thread.sleep(PUBLISH_WAIT_MS);

    assertTrue(isConnected(consumeMQTT));

    consumeMQTT.processSessionFactory = testRunner.getProcessSessionFactory();

    Field f = ConsumeMQTT.class.getDeclaredField("mqttQueue");
    f.setAccessible(true);
    LinkedBlockingQueue<MQTTQueueMessage> queue = (LinkedBlockingQueue<MQTTQueueMessage>) f.get(consumeMQTT);
    queue.add(testMessage);

    consumeMQTT.onUnscheduled(testRunner.getProcessContext());
    consumeMQTT.onStopped(testRunner.getProcessContext());

    testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
    assertProvenanceEvents(1);

    List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
    MockFlowFile flowFile = flowFiles.get(0);

    flowFile.assertContentEquals("testMessage");
    flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
    flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
    flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
    flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
    flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
 
源代码20 项目: pravega   文件: ConditionalOutputStreamTest.java
/**
 * It is necessary to only have one outstanding conditional append per writerId to make sure the status can be resolved in the event of a reconnect.
 */
@Test(timeout = 10000)
public void testOnlyOneWriteAtATime() throws ConnectionFailedException, SegmentSealedException {
    MockConnectionFactoryImpl connectionFactory = new MockConnectionFactoryImpl();
    MockController controller = new MockController("localhost", 0, connectionFactory, true);
    ConditionalOutputStreamFactory factory = new ConditionalOutputStreamFactoryImpl(controller, connectionFactory);
    Segment segment = new Segment("scope", "testWrite", 1);       
    ConditionalOutputStream cOut = factory.createConditionalOutputStream(segment,
            DelegationTokenProviderFactory.create("token", controller, segment), EventWriterConfig.builder().build());
    ByteBuffer data = ByteBuffer.allocate(10);
    ClientConnection mock = Mockito.mock(ClientConnection.class);
    PravegaNodeUri location = new PravegaNodeUri("localhost", 0);
    connectionFactory.provideConnection(location, mock);
    setupAppend(connectionFactory, segment, mock, location);
    LinkedBlockingQueue<Boolean> replies = new LinkedBlockingQueue<>();
    Mockito.doAnswer(new Answer<Void>() {
        @Override
        public Void answer(InvocationOnMock invocation) throws Throwable {
            ConditionalAppend argument = (ConditionalAppend) invocation.getArgument(0);
            ReplyProcessor processor = connectionFactory.getProcessor(location);
            if (replies.take()) {                    
                processor.process(new WireCommands.DataAppended(argument.getRequestId(), argument.getWriterId(), argument.getEventNumber(), 0, -1));
            } else {
                processor.process(new WireCommands.ConditionalCheckFailed(argument.getWriterId(), argument.getEventNumber(),
                                                                          argument.getRequestId()));
            }
            return null;
        }
    }).when(mock).sendAsync(any(ConditionalAppend.class), any(ClientConnection.CompletedCallback.class));
    replies.add(true);
    replies.add(false);
    assertTrue(cOut.write(data, 0));
    assertFalse(cOut.write(data, 1));
    AssertExtensions.assertBlocks(() -> {
        assertTrue(cOut.write(data, 2));
    }, () -> {
        replies.add(true);
    });
    AssertExtensions.assertBlocks(() -> {
        assertFalse(cOut.write(data, 3));
    }, () -> {
        replies.add(false);
    });
    AssertExtensions.assertBlocks(() -> {
        assertTrue(cOut.write(data, 4));
    }, () -> {
        AssertExtensions.assertBlocks(() -> {
            assertFalse(cOut.write(data, 5));
        }, () -> {
            replies.add(true);
            replies.add(false);
        });
    });
    AssertExtensions.assertBlocks(() -> {
        assertFalse(cOut.write(data, 6));
    }, () -> {
        AssertExtensions.assertBlocks(() -> {
            assertTrue(cOut.write(data, 7));
        }, () -> {
            replies.add(false);
            replies.add(true);
        });
    });
}