java.util.concurrent.ConcurrentLinkedQueue#size ( )源码实例Demo

下面列出了java.util.concurrent.ConcurrentLinkedQueue#size ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: GOAi   文件: Okexv3WebSocketExchange.java
@Override
public void noDepth(String pushId) {
    WebSocketInfo info = Okexv3WebSocketExchange.CONSUMERS.get(pushId);
    if (null != info) {
        String symbol = info.getSymbol();
        ConcurrentLinkedQueue<WebSocketInfo<Depth>> list = Okexv3WebSocketExchange.DEPTH
                .getOrDefault(symbol, null);
        if (null != list) {
            if (list.size() <= 1) {
                // 这是最后一个订阅,需要取消订阅
                Okexv3WebSocketClient client = Okexv3WebSocketExchange.CLIENTS.get(symbol);
                if (null != client) {
                    client.noDepth();
                }
            }
            list.remove(info);
        }
        Okexv3WebSocketExchange.CONSUMERS.remove(pushId);
    }
}
 
源代码2 项目: GOAi   文件: Okexv3WebSocketExchange.java
@Override
public void noTrades(String pushId) {
    WebSocketInfo info = Okexv3WebSocketExchange.CONSUMERS.get(pushId);
    if (null != info) {
        String symbol = info.getSymbol();
        ConcurrentLinkedQueue<WebSocketInfo<Trades>> list = Okexv3WebSocketExchange.TRADES
                .getOrDefault(symbol, null);
        if (null != list) {
            if (list.size() <= 1) {
                // 这是最后一个订阅,需要取消订阅
                Okexv3WebSocketClient client = Okexv3WebSocketExchange.CLIENTS.get(symbol);
                if (null != client) {
                    client.noTrades();
                }
            }
            list.remove(info);
        }
        Okexv3WebSocketExchange.CONSUMERS.remove(pushId);
    }
}
 
源代码3 项目: james-project   文件: ConcurrentTestRunnerTest.java
@Test
void closeShouldPreventPerformAllOperations() throws IOException, InterruptedException {
    ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

    int maxItems = 200000;
    Closeable closeable = ConcurrentTestRunner.builder()
        .operation((threadNumber, step) -> queue.add(threadNumber + ":" + step))
        .threadCount(2)
        .operationCount(maxItems)
        .run();
    closeable.close();
    TimeUnit.SECONDS.sleep(1);
    int stabilizedItemCount = queue.size();
    assertThat(stabilizedItemCount).isLessThanOrEqualTo(maxItems * 2);
    TimeUnit.SECONDS.sleep(1);
    assertThat(queue).hasSize(stabilizedItemCount);
}
 
源代码4 项目: joyqueue   文件: ConcurrentConsumption.java
/**
 * 添加过期未应答的分区段到过期队列
 * (线程安全,由于定时清理过期未应答partitionSegment和监听会话断开事件后转移到过期队列,存在并发场景)
 *
 * @param consumePartition 消费分区
 * @param partitionSegment 分区段
 */
private void addToExpireQueue(ConsumePartition consumePartition, PartitionSegment partitionSegment) {
    ConcurrentLinkedQueue<PartitionSegment> queue = expireQueueMap.get(consumePartition);
    if (queue == null) {
        queue = new ConcurrentLinkedQueue<>();
        ConcurrentLinkedQueue<PartitionSegment> pre = expireQueueMap.putIfAbsent(consumePartition, queue);
        if (pre != null) {
            queue = pre;
        }
    }
    if (queue.contains(partitionSegment)) {
        return;
    }
    queue.offer(partitionSegment);

    // 记录下超时未应答队列的情况
    long size = queue.size();
    logger.debug("add expire queue, partition: {}, size: {}, start: {}, end: {}", partitionSegment.getPartition(), size, partitionSegment.getStartIndex(), partitionSegment.getEndIndex());
    logger.debug("expire queue size is:[{}], partitionInfo:[{}], ", size, consumePartition);
    if (queue.size() > 10000) {
        logger.info("expire queue size is:[{}], partitionInfo:[{}], ", size, consumePartition);
    }
}
 
源代码5 项目: Mycat-Balance   文件: SendRunnable.java
public void recordFailMsg(Packet packet, String failReason)
{
    if (channelContext.isNeedRecordSendFailMsg())
    {
        ConcurrentLinkedQueue<PacketVo> sendFailQueue = getSendFailQueue(true);
        if (sendFailQueue != null)
        {
            PacketVo packetPojo = PacketVo.createPacketVo(packet, SystemTimer.currentTimeMillis(), failReason);
            if (sendFailQueue.size() >= channelContext.getCountOfRecordSendFail())
            {
                sendFailQueue.poll();
            }
            sendFailQueue.add(packetPojo);
        }
    }

}
 
源代码6 项目: dble   文件: PhysicalDataSource.java
/**
 * check if the connection is not be used for a while & do connection heart beat
 *
 * @param linkedQueue
 * @param hearBeatTime
 */
private void longIdleHeartBeat(ConcurrentLinkedQueue<BackendConnection> linkedQueue, long hearBeatTime) {
    long length = linkedQueue.size();
    for (int i = 0; i < length; i++) {
        BackendConnection con = linkedQueue.poll();
        if (con == null) {
            break;
        } else if (con.isClosed()) {
            continue;
        } else if (con.getLastTime() < hearBeatTime) { //if the connection is idle for a long time
            con.setBorrowed(true);
            new ConnectionHeartBeatHandler().doHeartBeat(con);
        } else {
            linkedQueue.offer(con);
            break;
        }
    }
}
 
源代码7 项目: light_drtc   文件: LogParser.java
public void test(){
	ConcurrentLinkedQueue<String> dataQu = new ConcurrentLinkedQueue<String>();
	dataQu.add("light,yl20160401,view,1234");
	dataQu.add("light,yl20160402,view,1234");
	dataQu.add("light,yl20160403,view,1234");
	dataQu.add("light,yl20160404,view,1234");
	
	dataQu.add("taotao,yl20160402,view,1234");
	dataQu.add("taotao,yl20160403,view,1234");
	dataQu.add("taotao,tu20160404,collect,1234");
	dataQu.add("taotao,yl20160405,view,1234");
	
	dataQu.add("momo,ty20160404,view,1234");
	dataQu.add("momo,ty20160404,share,1234");
	dataQu.add("momo,ty20160405,view,1234");
	dataQu.add("momo,yl20160405,view,1234");
	dataQu.add("momo,yl20160407,view,1234");
	
	int curNum = dataQu.size();
	this.parseLogs(dataQu, curNum);
	
}
 
源代码8 项目: incubator-crail   文件: RpcDispatcher.java
public RpcDispatcher(ConcurrentLinkedQueue<RpcConnection> connectionList) {
	connections = new RpcConnection[connectionList.size()];
	for (int i = 0; i < connections.length; i++){
		connections[i] = connectionList.poll();
	}
	this.setBlockIndex = 0;
	this.getDataNodeIndex = 0;
}
 
源代码9 项目: feeyo-redisproxy   文件: DefaultArrayBucket.java
@Override
public int getQueueSize() {
	int size = 0;
	for (ConcurrentLinkedQueue<ByteBuffer> cq : queueArray) {
		size = size + cq.size();
	}
	return size;
}
 
/**
 * Calculate the average geometric center of a Queue that contains cartesian coordinates
 * Reference: http://stackoverflow.com/questions/6671183/calculate-the-center-point-of-multiple-latitude-longitude-coordinate-pairs
 * Reference: http://stackoverflow.com/questions/1185408/converting-from-longitude-latitude-to-cartesian-coordinates
 * Reference: http://en.wikipedia.org/wiki/Spherical_coordinate_system
 * @param queue The location buffer queue
 * @return Returns a Coordinate object
 */
public static Coordinate getGeographicCenter(final ConcurrentLinkedQueue<Coordinate> queue){
    double x = 0;
    double y = 0;
    double z = 0;
    float accuracy = 0;

    for(final Coordinate coordinate : queue){
        accuracy += coordinate.accuracy;

        // Convert latitude and longitude to radians
        final double latRad = Math.PI * coordinate.latitude / 180;
        final double lonRad = Math.PI * coordinate.longitude / 180;

        // Convert to cartesian coords
        x += _radiusKM * Math.cos(latRad) * Math.cos(lonRad);
        y += _radiusKM * Math.cos(latRad) * Math.sin(lonRad);
        z += _radiusKM * Math.sin(latRad);
    }

    // Get our averages
    final double xAvg = x / queue.size();
    final double yAvg = y / queue.size();
    final double zAvg = z / queue.size();
    final float accuracyAvg = accuracy / queue.size();

    // Convert cartesian back to radians
    final double sphericalLatRads = Math.asin(zAvg / _radiusKM);
    final double sphericalLonRads = Math.atan2(yAvg, xAvg);

    final Coordinate centerPoint = new Coordinate();
    centerPoint.latitude = sphericalLatRads * (180 / Math.PI);
    centerPoint.longitude = sphericalLonRads * (180 / Math.PI);
    centerPoint.accuracy = accuracyAvg;

    return centerPoint;
}
 
源代码11 项目: ignite   文件: GridDebug.java
/**
 * Dumps given number of last events.
 *
 * @param n Number of last elements to dump.
 */
public static void dumpLastAndStop(int n) {
    ConcurrentLinkedQueue<Item> q = que.getAndSet(null);

    if (q == null)
        return;

    int size = q.size();

    while (size-- > n)
        q.poll();

    dump(q);
}
 
源代码12 项目: james-project   文件: RabbitMQTest.java
@Test
void rabbitMQShouldSupportTheExclusiveWorkQueueCase() throws Exception {
    channel1.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE);
    channel1.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of());
    channel1.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY);

    IntStream.range(0, 10)
            .mapToObj(String::valueOf)
            .map(RabbitMQTest.this::asBytes)
            .forEach(Throwing.<byte[]>consumer(
                    bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)).sneakyThrow());

    String dyingConsumerTag = "dyingConsumer";
    ImmutableMap<String, Object> arguments = ImmutableMap.of();
    ConcurrentLinkedQueue<Integer> receivedMessages = new ConcurrentLinkedQueue<>();
    CancelCallback doNothingOnCancel = consumerTag -> { };
    DeliverCallback ackFirstMessageOnly = (consumerTag, message) -> {
        if (receivedMessages.size() == 0) {
            receivedMessages.add(Integer.valueOf(new String(message.getBody(), StandardCharsets.UTF_8)));
            channel2.basicAck(message.getEnvelope().getDeliveryTag(), !MULTIPLE);
        } else {
            channel2.basicNack(message.getEnvelope().getDeliveryTag(), !MULTIPLE, REQUEUE);
        }
    };
    channel2.basicConsume(WORK_QUEUE, !AUTO_ACK, dyingConsumerTag, !NO_LOCAL, EXCLUSIVE, arguments, ackFirstMessageOnly, doNothingOnCancel);

    awaitAtMostOneMinute.until(() -> receivedMessages.size() == 1);

    channel2.basicCancel(dyingConsumerTag);

    InMemoryConsumer fallbackConsumer = new InMemoryConsumer(channel3);
    channel3.basicConsume(WORK_QUEUE, AUTO_ACK, "fallbackConsumer", !NO_LOCAL, EXCLUSIVE, arguments, fallbackConsumer);

    awaitAtMostOneMinute.until(() -> countReceivedMessages(fallbackConsumer) >= 1);

    assertThat(receivedMessages).containsExactly(0);
    assertThat(fallbackConsumer.getConsumedMessages()).contains(1, 2).doesNotContain(0);
}
 
源代码13 项目: hbase   文件: TableOverAsyncTable.java
@Override
public <R> void batchCallback(List<? extends Row> actions, Object[] results, Callback<R> callback)
    throws IOException, InterruptedException {
  ConcurrentLinkedQueue<ThrowableWithExtraContext> errors = new ConcurrentLinkedQueue<>();
  CountDownLatch latch = new CountDownLatch(actions.size());
  AsyncTableRegionLocator locator = conn.getRegionLocator(getName());
  List<CompletableFuture<R>> futures = table.<R> batch(actions);
  for (int i = 0, n = futures.size(); i < n; i++) {
    final int index = i;
    FutureUtils.addListener(futures.get(i), (r, e) -> {
      if (e != null) {
        errors.add(new ThrowableWithExtraContext(e, EnvironmentEdgeManager.currentTime(),
          "Error when processing " + actions.get(index)));
        if (!ArrayUtils.isEmpty(results)) {
          results[index] = e;
        }
        latch.countDown();
      } else {
        if (!ArrayUtils.isEmpty(results)) {
          results[index] = r;
        }
        FutureUtils.addListener(locator.getRegionLocation(actions.get(index).getRow()),
          (l, le) -> {
            if (le != null) {
              errors.add(new ThrowableWithExtraContext(le, EnvironmentEdgeManager.currentTime(),
                "Error when finding the region for row " +
                  Bytes.toStringBinary(actions.get(index).getRow())));
            } else {
              callback.update(l.getRegion().getRegionName(), actions.get(index).getRow(), r);
            }
            latch.countDown();
          });
      }
    });
  }
  latch.await();
  if (!errors.isEmpty()) {
    throw new RetriesExhaustedException(errors.size(),
      errors.stream().collect(Collectors.toList()));
  }
}
 
源代码14 项目: titan1withtp3.1   文件: IDAuthorityTest.java
@Test
public void testMultiIDAcquisition() throws Throwable {
    final int numPartitions = MAX_NUM_PARTITIONS;
    final int numAcquisitionsPerThreadPartition = 100;
    final IDBlockSizer blockSizer = new InnerIDBlockSizer();
    for (int i = 0; i < CONCURRENCY; i++) idAuthorities[i].setIDBlockSizer(blockSizer);
    final List<ConcurrentLinkedQueue<IDBlock>> ids = new ArrayList<ConcurrentLinkedQueue<IDBlock>>(numPartitions);
    for (int i = 0; i < numPartitions; i++) {
        ids.add(new ConcurrentLinkedQueue<IDBlock>());
    }

    final int maxIterations = numAcquisitionsPerThreadPartition * numPartitions * 2;
    final Collection<Future<?>> futures = new ArrayList<Future<?>>(CONCURRENCY);
    ExecutorService es = Executors.newFixedThreadPool(CONCURRENCY);

    Set<String> uids = new HashSet<String>(CONCURRENCY);
    for (int i = 0; i < CONCURRENCY; i++) {
        final IDAuthority idAuthority = idAuthorities[i];
        final IDStressor stressRunnable = new IDStressor(
                numAcquisitionsPerThreadPartition, numPartitions,
                maxIterations, idAuthority, ids);
        uids.add(idAuthority.getUniqueID());
        futures.add(es.submit(stressRunnable));
    }

    // If this fails, it's likely to be a bug in the test rather than the
    // IDAuthority (the latter is technically possible, just less likely)
    assertEquals(CONCURRENCY, uids.size());

    for (Future<?> f : futures) {
        try {
            f.get();
        } catch (ExecutionException e) {
            throw e.getCause();
        }
    }

    for (int i = 0; i < numPartitions; i++) {
        ConcurrentLinkedQueue<IDBlock> list = ids.get(i);
        assertEquals(numAcquisitionsPerThreadPartition * CONCURRENCY, list.size());
        LongSet idset = new LongHashSet((int)blockSize*list.size());
        for (IDBlock block : list) checkBlock(block,idset);
    }

    es.shutdownNow();
}
 
源代码15 项目: flink   文件: PubSubConsumingTest.java
private static <T> void awaitRecordCount(ConcurrentLinkedQueue<T> queue, int count) throws Exception {
	Deadline deadline  = Deadline.fromNow(Duration.ofSeconds(10));
	while (deadline.hasTimeLeft() && queue.size() < count) {
		Thread.sleep(10);
	}
}
 
源代码16 项目: flink   文件: FlinkKinesisConsumerTest.java
private void awaitRecordCount(ConcurrentLinkedQueue<? extends Object> queue, int count) throws Exception {
	Deadline deadline  = Deadline.fromNow(Duration.ofSeconds(10));
	while (deadline.hasTimeLeft() && queue.size() < count) {
		Thread.sleep(10);
	}
}
 
源代码17 项目: hortonmachine   文件: OmsRescaledDistance.java
@Execute
public void process() throws Exception {
    if (!concatOr(outRescaled == null, doReset)) {
        return;
    }
    checkNull(inFlow, inNet);
    RegionMap regionMap = CoverageUtilities.getRegionParamsFromGridCoverage(inFlow);
    int cols = regionMap.getCols();
    int rows = regionMap.getRows();
    xRes = regionMap.getXres();
    yRes = regionMap.getYres();

    RenderedImage flowRI = inFlow.getRenderedImage();
    RandomIter flowIter = RandomIterFactory.create(flowRI, null);

    RenderedImage netRI = inNet.getRenderedImage();
    RandomIter netIter = RandomIterFactory.create(netRI, null);

    if (inElev != null) {
        RenderedImage elevRI = inElev.getRenderedImage();
        elevIter = RandomIterFactory.create(elevRI, null);
    }

    WritableRaster rescaledWR = CoverageUtilities.createWritableRaster(cols, rows, Float.class, null, floatNovalue);
    WritableRandomIter rescaledIter = RandomIterFactory.createWritable(rescaledWR, null);

    try {
        pm.beginTask("Find outlets...", rows * cols); //$NON-NLS-1$
        ConcurrentLinkedQueue<FlowNode> exitsList = new ConcurrentLinkedQueue<>();
        processGrid(cols, rows, ( c, r ) -> {
            if (pm.isCanceled())
                return;
            int netValue = netIter.getSample(c, r, 0);
            if (isNovalue(netValue)) {
                // we make sure that we pick only outlets that are on the net
                return;
            }
            FlowNode flowNode = new FlowNode(flowIter, cols, rows, c, r);
            if (flowNode.isHeadingOutside()) {
                exitsList.add(flowNode);
            }
            pm.worked(1);
        });
        pm.done();

        if (exitsList.size() == 0) {
            throw new ModelsIllegalargumentException("No exits found in the map of flowdirections.", this);
        }

        pm.beginTask("Calculate rescaled distance...", exitsList.size());
        exitsList.parallelStream().forEach(exitNode -> {
            if (pm.isCanceled())
                return;
            calculateRescaledDistance(exitNode, (float) xRes, rescaledIter, elevIter, netIter);
            pm.worked(1);
        });
        pm.done();
    } finally {
        rescaledIter.done();
        netIter.done();
        if (elevIter != null)
            elevIter.done();
    }

    outRescaled = CoverageUtilities.buildCoverage("OmsRescaledDistance", rescaledWR, regionMap,
            inFlow.getCoordinateReferenceSystem());
}
 
源代码18 项目: hortonmachine   文件: OmsExtractBasin.java
@Execute
public void process() throws Exception {
    if (!concatOr(outBasin == null, doReset)) {
        return;
    }
    checkNull(inFlow);

    crs = inFlow.getCoordinateReferenceSystem();

    RegionMap regionMap = CoverageUtilities.getRegionParamsFromGridCoverage(inFlow);
    ncols = regionMap.getCols();
    nrows = regionMap.getRows();
    double xRes = regionMap.getXres();
    double yRes = regionMap.getYres();
    double north = regionMap.getNorth();
    double west = regionMap.getWest();
    double south = regionMap.getSouth();
    double east = regionMap.getEast();

    if (pNorth == -1 || pEast == -1) {
        throw new ModelsIllegalargumentException("No outlet coordinates were supplied.", this.getClass().getSimpleName(), pm);
    }
    if (pNorth > north || pNorth < south || pEast > east || pEast < west) {
        throw new ModelsIllegalargumentException("The outlet point lies outside the map region.",
                this.getClass().getSimpleName(), pm);
    }

    Coordinate snapOutlet = snapOutlet();
    if (snapOutlet != null) {
        pEast = snapOutlet.x;
        pNorth = snapOutlet.y;
    }

    RandomIter flowIter = CoverageUtilities.getRandomIterator(inFlow);
    WritableRaster basinWR = CoverageUtilities.createWritableRaster(ncols, nrows, Short.class, null, shortNovalue);
    WritableRandomIter basinIter = RandomIterFactory.createWritable(basinWR, null);

    try {
        Coordinate outlet = new Coordinate(pEast, pNorth);

        int[] outletColRow = CoverageUtilities.colRowFromCoordinate(outlet, inFlow.getGridGeometry(), null);

        int outletFlow = flowIter.getSample(outletColRow[0], outletColRow[1], 0);
        if (isNovalue(outletFlow)) {
            throw new IllegalArgumentException("The chosen outlet point doesn't have a valid value.");
        }

        FlowNode runningNode = new FlowNode(flowIter, ncols, nrows, outletColRow[0], outletColRow[1]);
        runningNode.setIntValueInMap(basinIter, 1);
        outArea++;

        ConcurrentLinkedQueue<FlowNode> enteringNodes = new ConcurrentLinkedQueue<>(runningNode.getEnteringNodes());
        pm.beginTask(msg.message("wateroutlet.extracting"), -1);
        while( enteringNodes.size() > 0 ) {
            if (pm.isCanceled()) {
                return;
            }

            ConcurrentLinkedQueue<FlowNode> newEnteringNodes = new ConcurrentLinkedQueue<>();
            enteringNodes.parallelStream().forEach(flowNode -> {
                if (pm.isCanceled()) {
                    return;
                }
                if (!alreadyWarned && flowNode.touchesBound()) {
                    pm.errorMessage(MessageFormat.format(
                            "WARNING: touched boundaries in col/row = {0}/{1}. You might consider to review your processing region.",
                            flowNode.col, flowNode.row));
                    alreadyWarned = true;
                }
                flowNode.setIntValueInMap(basinIter, 1);
                outArea++;

                List<FlowNode> newEntering = flowNode.getEnteringNodes();
                newEnteringNodes.addAll(newEntering);
            });
            enteringNodes = newEnteringNodes;
        }
        pm.done();

        outArea = outArea * xRes * yRes;
        outBasin = CoverageUtilities.buildCoverage("basin", basinWR, regionMap, crs);

        extractVectorBasin();
    } finally {
        flowIter.done();
        basinIter.done();
    }
}
 
源代码19 项目: flink   文件: TwoInputStreamTaskTest.java
/**
 * This test verifies that checkpoint barriers are correctly forwarded.
 */
@Test
public void testCheckpointBarriers() throws Exception {

	final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness =
			new TwoInputStreamTaskTestHarness<>(
					TwoInputStreamTask::new,
					2, 2, new int[] {1, 2},
					BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
	testHarness.setupOutputForSingletonOperatorChain();

	StreamConfig streamConfig = testHarness.getStreamConfig();
	CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<>(new IdentityMap());
	streamConfig.setStreamOperator(coMapOperator);
	streamConfig.setOperatorID(new OperatorID());

	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
	long initialTime = 0L;

	testHarness.invoke();
	testHarness.waitForTaskRunning();

	testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);

	// This one should go through
	testHarness.processElement(new StreamRecord<>("Ciao-0-0", initialTime), 0, 1);
	expectedOutput.add(new StreamRecord<>("Ciao-0-0", initialTime));

	testHarness.waitForInputProcessing();

	// These elements should be forwarded, since we did not yet receive a checkpoint barrier
	// on that input, only add to same input, otherwise we would not know the ordering
	// of the output since the Task might read the inputs in any order
	testHarness.processElement(new StreamRecord<>(11, initialTime), 1, 1);
	testHarness.processElement(new StreamRecord<>(111, initialTime), 1, 1);
	expectedOutput.add(new StreamRecord<>("11", initialTime));
	expectedOutput.add(new StreamRecord<>("111", initialTime));

	testHarness.waitForInputProcessing();

	// Wait to allow input to end up in the output.
	// TODO Use count down latches instead as a cleaner solution
	for (int i = 0; i < 20; ++i) {
		if (testHarness.getOutput().size() >= expectedOutput.size()) {
			break;
		} else {
			Thread.sleep(100);
		}
	}

	// we should not yet see the barrier, only the two elements from non-blocked input
	TestHarnessUtil.assertOutputEquals("Output was not correct.",
		expectedOutput,
		testHarness.getOutput());

	testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
	testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
	testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);

	testHarness.waitForInputProcessing();
	testHarness.endInput();
	testHarness.waitForTaskCompletion();

	// now we should see the barrier
	expectedOutput.add(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpointWithDefaultLocation()));

	TestHarnessUtil.assertOutputEquals("Output was not correct.",
			expectedOutput,
			testHarness.getOutput());

	List<String> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
	Assert.assertEquals(3, resultElements.size());
}
 
源代码20 项目: AvatarMQ   文件: SendMessageController.java
public Void call() {
    int period = MessageSystemConfig.SendMessageControllerPeriodTimeValue;
    int commitNumber = MessageSystemConfig.SendMessageControllerTaskCommitValue;
    int sleepTime = MessageSystemConfig.SendMessageControllerTaskSleepTimeValue;

    ConcurrentLinkedQueue<MessageDispatchTask> queue = requestCacheList.get();
    SendMessageCache ref = SendMessageCache.getInstance();

    while (!stoped) {
        SemaphoreCache.acquire(MessageSystemConfig.NotifyTaskSemaphoreValue);
        MessageDispatchTask task = MessageTaskQueue.getInstance().getTask();

        queue.add(task);

        if (queue.size() == 0) {
            try {
                Thread.sleep(sleepTime);
                continue;
            } catch (InterruptedException ex) {
                Logger.getLogger(SendMessageController.class.getName()).log(Level.SEVERE, null, ex);
            }
        }

        if (queue.size() > 0 && (queue.size() % commitNumber == 0 || flushTask.get() == true)) {
            ref.commit(queue);
            queue.clear();
            flushTask.compareAndSet(true, false);
        }

        timer.scheduleAtFixedRate(new TimerTask() {

            public void run() {
                try {
                    flushTask.compareAndSet(false, true);
                } catch (Exception e) {
                    System.out.println("SendMessageTaskMonitor happen exception");
                }
            }
        }, 1000 * 1, period);
    }
    return null;
}