java.util.concurrent.atomic.AtomicInteger#get()源码实例Demo

下面列出了java.util.concurrent.atomic.AtomicInteger#get() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: signalfx-java   文件: SfxMetrics.java
/**
 * Unregister a set of metrics at once.
 *
 * @param metricsToRemove
 *         An array of metric instances to stop reporting.
 * @return The number of metrics that were actually unregistered.
 */
public int unregister(Metric... metricsToRemove) {
    final Set<Metric> toRemove = ImmutableSet.copyOf(metricsToRemove);
    final AtomicInteger totalRemoved = new AtomicInteger(0);
    metricRegistry.removeMatching(new MetricFilter() {
        @Override
        public boolean matches(String name, Metric metric) {
            final boolean shouldRemove = toRemove.contains(metric);
            if (shouldRemove) {
                totalRemoved.incrementAndGet();
            }
            return shouldRemove;
        }
    });
    return totalRemoved.get();
}
 
源代码2 项目: flink   文件: S3Committer.java
@Override
public void commit() throws IOException {
	if (totalLength > 0L) {
		LOG.info("Committing {} with MPU ID {}", objectName, uploadId);

		final AtomicInteger errorCount = new AtomicInteger();
		s3AccessHelper.commitMultiPartUpload(objectName, uploadId, parts, totalLength, errorCount);

		if (errorCount.get() == 0) {
			LOG.debug("Successfully committed {} with MPU ID {}", objectName, uploadId);
		} else {
			LOG.debug("Successfully committed {} with MPU ID {} after {} retries.", objectName, uploadId, errorCount.get());
		}
	} else {
		LOG.debug("No data to commit for file: {}", objectName);
	}
}
 
源代码3 项目: Java8CN   文件: LinkedBlockingQueue.java
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
 
源代码4 项目: hbase   文件: HBaseFsck.java
/**
 * TODO -- need to add tests for this.
 */
private void reportTablesInFlux() {
  AtomicInteger numSkipped = new AtomicInteger(0);
  TableDescriptor[] allTables = getTables(numSkipped);
  errors.print("Number of Tables: " + allTables.length);
  if (details) {
    if (numSkipped.get() > 0) {
      errors.detail("Number of Tables in flux: " + numSkipped.get());
    }
    for (TableDescriptor td : allTables) {
      errors.detail("  Table: " + td.getTableName() + "\t" +
                         (td.isReadOnly() ? "ro" : "rw") + "\t" +
                          (td.isMetaRegion() ? "META" : "    ") + "\t" +
                         " families: " + td.getColumnFamilyCount());
    }
  }
}
 
源代码5 项目: excelastic   文件: CSVParser.java
private void process(AtomicInteger columnsRead, JsonObject json) {
    columnsRead.incrementAndGet();

    if (columnsRead.get() > headers.size()) {
        throw new ColumnsHeadersMismatchException(columnsRead.get(), headers.size(), row + 1);
    } else {
        if (!dryRun) {
            int read = buffer.position();
            byte[] line = new byte[read + 1];

            ((Buffer) buffer).position(0);
            buffer.get(line, 0, read);
            line[line.length - 1] = '\0';

            json.put(header.next(), parseDatatype(line));
        } else {
            // skip parsing the content - just verify the format.
            header.next();
        }
        ((Buffer) buffer).clear();
    }
}
 
源代码6 项目: JDKSourceCode1.8   文件: LinkedBlockingQueue.java
public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
 
private void getTransformationStatistics(String sourceExtension, String targetExtension,
        StringBuilder sb, ContentTransformer transformer, Collection<String> sourceMimetypes,
        Collection<String> targetMimetypes, boolean includeSystemWideSummary)
{
    AtomicInteger counter = new AtomicInteger(0);
    int i = sb.length();

    for (String sourceMimetype: sourceMimetypes)
    {
        for (String targetMimetype: targetMimetypes)
        {
            getTransformationStatistics(sb, transformer, sourceMimetype, targetMimetype, counter, includeSystemWideSummary);
        }
    }
    
    // Only report transformer summary if there is more than one to summarise
    // and we were asked for all
    if (sourceExtension == null && targetExtension == null && counter.get() > 1)
    {
        StringBuilder sb2 = new StringBuilder();
        getTransformationStatistics(sb2, transformer, null, null, counter, includeSystemWideSummary);
        sb2.append('\n');
        sb.insert((i == 0 ? 0 : i+2), sb2);
    }
}
 
/**
 * Queues given ReferenceWrapper in mEvictionQueue if its in-use count is equal to zero.
 */
private synchronized void maybeAddToEvictionQueue(final CacheEntry<K, V> cacheEntry) {
  AtomicInteger counter = mCachedEntries.get(cacheEntry);
  Preconditions.checkNotNull(counter);
  Preconditions.checkArgument(!mEvictionQueue.contains(cacheEntry));
  if (counter.get() == 0) {
    mEvictionQueueSize += mValueInfoCallback.getSizeInBytes(cacheEntry.value.get());
    mEvictionQueue.add(cacheEntry);
  }
}
 
源代码9 项目: codebuff   文件: ConcurrentHashMultiset.java
/**
 * Removes exactly the specified number of occurrences of {@code element}, or makes no
 * change if this is not possible.
 *
 * <p>This method, in contrast to {@link #remove(Object, int)}, has no effect when the
 * element count is smaller than {@code occurrences}.
 *
 * @param element the element to remove
 * @param occurrences the number of occurrences of {@code element} to remove
 * @return {@code true} if the removal was possible (including if {@code occurrences} is zero)
 * @throws IllegalArgumentException if {@code occurrences} is negative
 */

@CanIgnoreReturnValue
public boolean removeExactly(@Nullable Object element, int occurrences) {
  if (occurrences == 0) {
    return true;
  }
  CollectPreconditions.checkPositive(occurrences, "occurences");
  AtomicInteger existingCounter = Maps.safeGet(countMap, element);
  if (existingCounter == null) {
    return false;
  }
  while (true) {
    int oldValue = existingCounter.get();
    if (oldValue < occurrences) {
      return false;
    }


    int newValue = oldValue - occurrences;
    if (existingCounter.compareAndSet(oldValue, newValue)) {
      if (newValue == 0) {
        // Just CASed to 0; remove the entry to clean up the map. If the removal fails,
        // another thread has already replaced it with a new counter, which is fine.
        countMap.remove(element, existingCounter);
      }
      return true;
    }
  }
}
 
源代码10 项目: p4ic4idea   文件: ClientSystemFileMatchCommands.java
private String sendDir( File file, File cwd, List<String> dirs, AtomicInteger idx, boolean skip )
{
    // Skip printing file in current directory and just report subdirectory
    if( skip ) {
        return FilePathHelper.getLocal( cwd.getAbsolutePath(), "..." );
    }

    // If file is in the current directory: isDirs is unset so that our
    // caller will send back the original file.

    file = file.getParentFile();

    if( sysCompare( file.getAbsolutePath(), cwd.getAbsolutePath() ) != 0 ) {
        return null;
    }

    // Set path to the directory under cwd containing this file.
    // 'dirs' is the list of dirs in cwd on workspace.

    boolean isDir = false;
    for( ; idx.get() < dirs.size() && !isDir; idx.getAndIncrement() ) {
        if( file.getAbsolutePath().startsWith( new File( dirs.get( idx.get() ) ).getAbsolutePath() ) ) {
            return FilePathHelper.getLocal( dirs.get(idx.get()), "..." );
        }
    }

    return null;
}
 
源代码11 项目: lucene-solr   文件: TestDocCollectionWatcher.java
@Test
public void testWaitForStateWatcherIsRetainedOnPredicateFailure() throws Exception {

  CloudSolrClient client = cluster.getSolrClient();
  CollectionAdminRequest.createCollection("falsepredicate", "config", 1, 1)
    .processAndWait(client, MAX_WAIT_TIMEOUT);

  // create collection with 1 shard 1 replica...
  client.waitForState("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
                      (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));

  // set watcher waiting for at least 3 replicas (will fail initially)
  final AtomicInteger runCount = new AtomicInteger(0);
  final Future<Boolean> future = waitInBackground
    ("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
     (collectionState) -> {
      runCount.incrementAndGet();
      int replicas = 0;
      for (Slice slice : collectionState) {
        for (Replica replica : slice) {
          replicas++;
        }
      }
      return 3 <= replicas;
    });

  // add a 2nd replica...
  CollectionAdminRequest.addReplicaToShard("falsepredicate", "shard1")
    .processAndWait(client, MAX_WAIT_TIMEOUT);
  client.waitForState("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
                      (n, c) -> DocCollection.isFullyActive(n, c, 1, 2));

  // confirm watcher has run at least once and has been retained...
  final int runCountSnapshot = runCount.get();
  assertTrue(0 < runCountSnapshot);
  assertEquals(1, client.getZkStateReader().getStateWatchers("falsepredicate").size());
  
  // now add a 3rd replica...
  CollectionAdminRequest.addReplicaToShard("falsepredicate", "shard1")
    .processAndWait(client, MAX_WAIT_TIMEOUT);

  // now confirm watcher is invoked & removed
  assertTrue("watcher never succeeded", future.get());
  assertTrue(runCountSnapshot < runCount.get());
  waitFor("DocCollectionWatcher should be removed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
          () -> client.getZkStateReader().getStateWatchers("falsepredicate").size() == 0);
  
}
 
源代码12 项目: antsdb   文件: Synchronizer.java
public synchronized int sync(boolean checkpoint, long batchSize) throws Exception {
    if (batchSize <= 0) {
        return 0;
    }
    _log.trace("starting synchronization from {}", UberFormatter.hex(this.sp));
    boolean success = false;
    long spStart = this.sp + 1;
    AtomicInteger count = new AtomicInteger();
    boolean cacheOutOfSpace = false;
    try (HumpbackSession session=this.session.open()) {
        long spSystem = this.humpback.getSpaceManager().getAllocationPointer();
        long spActive = findActiveSp();
        long spFrozen = findFrozenSp(spActive);
        if (spFrozen == 0) {
            // nothing is frozen
            return 0;
        }
        if (spFrozen <= this.sp) {
            // nothing to sync
            return 0;
        }
        if (spFrozen >= spSystem) {
            // concurrency 
            return 0;
        }
        sync(spStart, spFrozen, count, batchSize);
        this.sp = spFrozen;
        getStorage().setEndSpacePointer(this.sp);
        if (checkpoint) {
            getStorage().checkpoint();
        }
        success = true;
        return count.get();
    }
    catch (OutOfMinkeSpace x) {
        _log.warn("cache is filled up, synchronization is partitally ended");
        cacheOutOfSpace = true;
        return count.get();
    }
    finally {
        if (count.get() != 0) {
            if (success) {
                _log.debug(
                        "synchronization has finished with {} updates from {} to {}", 
                        count.get(), 
                        hex(spStart), 
                        hex(this.sp));
            }
            else {
                _log.debug("synchronization has failed with {} updates from {} to {}", 
                        count.get(), 
                        hex(spStart), 
                        hex(this.sp));
            }
        }
        if (success) {
            this.state = "idling";
        }
        else {
            if (cacheOutOfSpace) {
                this.state = "waiting for cache space";
            }
            else {
                this.state = "failed";
            }
        }
        _log.trace("synchronization has finished");
    }
}
 
源代码13 项目: x-pipe   文件: ClusterShardCounter.java
public int getShardCount() {

        AtomicInteger count = new AtomicInteger();
        clusters.forEach((clusterName, shards) -> count.addAndGet(shards.size()));
        return count.get();
    }
 
源代码14 项目: my_curd   文件: LoginController.java
/**
 * 登录表单提交地址
 */
public void action() {
    String username = getPara("username");
    String password = getPara("password");

    /* username password 无效 */
    if (StrKit.isBlank(username)) {
        setAttr("errMsg", "请填写用户名。");
        render("login.ftl");
        return;
    }
    if (StrKit.isBlank(password)) {
        setAttr("errMsg", "请填写密码。");
        render("login.ftl");
        return;
    }
    SysUser sysUser = SysUser.dao.findByUsername(username);
    if (sysUser == null) {
        setAttr("errMsg", username + " 用户不存在。");
        render("login.ftl");
        return;
    }

    // 密码错误 n 次 锁定 m 分钟
    BaseCache<String, AtomicInteger> retryCache = CacheContainer.getLoginRetryLimitCache();
    AtomicInteger retryTimes = retryCache.getCache(username);
    if (retryTimes.get() >= LoginRetryLimitCache.RETRY_LIMIT) {
        setAttr("username", username);
        setAttr("errMsg", " 账号已被锁定, " + LoginRetryLimitCache.LOCK_TIME + "分钟后可自动解锁。 ");
        render("login.ftl");
        return;
    }
    password = HashKit.sha1(password);
    if (!sysUser.getPassword().equals(password)) {
        int nowRetryTimes = retryTimes.incrementAndGet();  // 错误次数 加 1
        setAttr("username", username);
        if ((LoginRetryLimitCache.RETRY_LIMIT - nowRetryTimes) == 0) {
            setAttr("errMsg", " 账号已被锁定, " + LoginRetryLimitCache.LOCK_TIME + "分钟后可自动解锁。 ");
        } else {
            setAttr("errMsg", " 密码错误, 再错误 "
                    + (LoginRetryLimitCache.RETRY_LIMIT - nowRetryTimes) + " 次账号将被锁定" + LoginRetryLimitCache.LOCK_TIME + "分钟。");
        }
        render("login.ftl");
        return;
    }
    retryCache.put(username, new AtomicInteger()); // 密码正确缓存数清0

    if (sysUser.getUserState().equals("1")) {
        setAttr("errMsg", username + " 用户被禁用,请联系管理员。");
        render("login.ftl");
        return;
    }

    /* username password 有效 */

    // 如果选中了记住密码且cookie信息不存在,生成新的cookie 信息
    String remember = getPara("remember");
    if ("on".equals(remember) && getCookie(USERNAME_KEY) == null) {
        setCookie(USERNAME_KEY, username, 60 * 60 * 24);  // 1天
        setCookie(PASSWORD_KEY, password, 60 * 60 * 24);
    }

    sysUser.setLastLoginTime(new Date());
    sysUser.update();

    afterLogin(sysUser);

    // 登录日志
    redirect("/dashboard");
}
 
源代码15 项目: nyzoVerifier   文件: ChainInitializationManager.java
public static void initializeFrozenEdge(List<TrustedEntryPoint> trustedEntryPoints) {

        // Only continue with the edge-initialization process if we might need to fetch a new frozen edge. If the
        // open edge is fewer than 20 blocks ahead of the local frozen edge, and we have a complete cycle in the
        // block manager, we can treat the restart as a temporary outage and use standard recovery mechanisms.
        long openEdgeHeight = BlockManager.openEdgeHeight(false);
        if (BlockManager.isCycleComplete() && openEdgeHeight < BlockManager.getFrozenEdgeHeight() + 20) {
            LogUtil.println("skipping frozen-edge consensus process due to closeness to open edge and complete cycle");
        } else {

            System.out.println("entering frozen-edge consensus process because open edge, " + openEdgeHeight +
                    ", is " + (openEdgeHeight - BlockManager.getFrozenEdgeHeight()) + " past frozen edge, " +
                    BlockManager.getFrozenEdgeHeight() + " and cycleComplete=" + BlockManager.isCycleComplete());

            // Attempt to jump into the blockchain. This should succeed on the first attempt, but it may take
            // longer if we are starting a new mesh.
            BootstrapResponseV2 consensusBootstrapResponse = null;
            while (consensusBootstrapResponse == null && !UpdateUtil.shouldTerminate()) {

                // Wait for the incoming message queue to clear. This will prevent a potential problem where this
                // loop continues to pile on more and more requests while not getting responses in time because the
                // queue is overfilled.
                MessageQueue.blockThisThreadUntilClear();

                AtomicInteger numberOfResponsesReceived = new AtomicInteger(0);

                // Send bootstrap requests to all trusted entry points.
                Message bootstrapRequest = new Message(MessageType.BootstrapRequestV2_35, new BootstrapRequest());
                for (TrustedEntryPoint entryPoint : trustedEntryPoints) {

                    System.out.println("sending Bootstrap request to " + entryPoint);
                    Message.fetchTcp(entryPoint.getHost(), entryPoint.getPort(), bootstrapRequest,
                            new MessageCallback() {
                                @Override
                                public void responseReceived(Message message) {
                                    if (message == null) {
                                        System.out.println("Bootstrap response is null");
                                    } else {
                                        numberOfResponsesReceived.incrementAndGet();
                                        processBootstrapResponseMessage(message);
                                    }
                                }
                            });
                }

                // Wait up to 5 seconds for requests to return.
                for (int i = 0; i < 20 && numberOfResponsesReceived.get() < trustedEntryPoints.size(); i++) {
                    ThreadUtil.sleep(250L);
                }

                // Get the consensus response. If this can be determined, we can move to the next step.
                consensusBootstrapResponse = winningResponse();
                System.out.println("consensus bootstrap response: " + consensusBootstrapResponse);
            }

            // If the consensus frozen edge is more than 20 past the local frozen edge, and we are not in the cycle,
            // fetch the consensus frozen edge. If the consensus frozen edge is more than the cycle length past the
            // frozen edge, it does not matter if we were in the cycle. We have lost our place, and the frozen edge
            // needs to be fetched. If we do not fetch the frozen edge here, the recovery mechanisms will attempt to
            // catch us back up in time to verify a block.
            if (consensusBootstrapResponse != null) {

                long consensusFrozenEdge = consensusBootstrapResponse.getFrozenEdgeHeight();
                boolean fetchRequiredNotInCycle = consensusFrozenEdge > BlockManager.getFrozenEdgeHeight() + 20 &&
                        !Verifier.inCycle();
                boolean fetchRequiredInCycle = consensusFrozenEdge > BlockManager.getFrozenEdgeHeight() +
                        BlockManager.currentCycleLength();

                System.out.println("local frozen edge: " + BlockManager.getFrozenEdgeHeight() +
                        ", consensus frozen edge: " + consensusFrozenEdge + ", fetch required (not-in-cycle): " +
                        fetchRequiredNotInCycle + ", fetch required (in-cycle): " + fetchRequiredInCycle);

                if (fetchRequiredNotInCycle || fetchRequiredInCycle || !BlockManager.isCycleComplete()) {
                    System.out.println("fetching block based on bootstrap response");
                    fetchBlock(consensusBootstrapResponse);
                }
            }
        }
    }
 
源代码16 项目: codebuff   文件: ConcurrentHashMultiset.java
/**
 * Adds a number of occurrences of the specified element to this multiset.
 *
 * @param element the element to add
 * @param occurrences the number of occurrences to add
 * @return the previous count of the element before the operation; possibly zero
 * @throws IllegalArgumentException if {@code occurrences} is negative, or if
 *     the resulting amount would exceed {@link Integer#MAX_VALUE}
 */

@CanIgnoreReturnValue
@Override
public int add(E element, int occurrences) {
  checkNotNull(element);
  if (occurrences == 0) {
    return count(element);
  }
  CollectPreconditions.checkPositive(occurrences, "occurences");
  while (true) {
    AtomicInteger existingCounter = Maps.safeGet(countMap, element);
    if (existingCounter == null) {
      existingCounter = countMap.putIfAbsent(element, new AtomicInteger(occurrences));
      if (existingCounter == null) {
        return 0;
      }
      // existingCounter != null: fall through to operate against the existing AtomicInteger
    }
    while (true) {
      int oldValue = existingCounter.get();
      if (oldValue != 0) {
        try {
          int newValue = IntMath.checkedAdd(oldValue, occurrences);
          if (existingCounter.compareAndSet(oldValue, newValue)) {
            // newValue can't == 0, so no need to check & remove
            return oldValue;
          }
        } catch (ArithmeticException overflow) {
          throw new IllegalArgumentException("Overflow adding " + occurrences + " occurrences to a count of " + oldValue);
        }
      } else {
        // In the case of a concurrent remove, we might observe a zero value, which means another
        // thread is about to remove (element, existingCounter) from the map. Rather than wait,
        // we can just do that work here.
        AtomicInteger newCounter = new AtomicInteger(occurrences);
        if ((countMap.putIfAbsent(element, newCounter) == null)
            || countMap.replace(element, existingCounter, newCounter)) {
          return 0;
        }
        break;
      }
    }

    // If we're still here, there was a race, so just try again.
  }
}
 
源代码17 项目: ignite   文件: CacheSerializableTransactionsTest.java
/**
 * @throws Exception If failed.
 */
@Test
public void testConflictResolution() throws Exception {
    final Ignite ignite = ignite(0);

    final String cacheName =
        ignite.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)).getName();

    try {
        final Map<Integer, Integer> keys = new HashMap<>();

        for (int i = 0; i < 500; i++)
            keys.put(i, i);

        final int THREADS = 5;

        for (int i = 0; i < 10; i++) {
            final CyclicBarrier barrier = new CyclicBarrier(THREADS);

            final AtomicInteger commitCntr = new AtomicInteger(0);

            GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
                @Override public Void call() throws Exception {
                    IgniteCache<Integer, Integer> cache = ignite.cache(cacheName);

                    IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();

                    try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
                        cache.putAll(keys);

                        barrier.await();

                        tx.commit();

                        commitCntr.incrementAndGet();
                    }
                    catch (TransactionOptimisticException e) {
                        log.info("Optimistic error: " + e);
                    }

                    return null;
                }
            }, THREADS, "update-thread").get();

            int commits = commitCntr.get();

            log.info("Iteration [iter=" + i + ", commits=" + commits + ']');

            assertTrue(commits > 0);
        }
    }
    finally {
        destroyCache(cacheName);
    }
}
 
源代码18 项目: crate   文件: PublicationTests.java
public void testClusterStatePublishingWithFaultyNodeAfterCommit() throws InterruptedException {
    VotingConfiguration singleNodeConfig = new VotingConfiguration(Sets.newHashSet(n1.getId()));
    initializeCluster(singleNodeConfig);

    AssertingAckListener ackListener = new AssertingAckListener(nodes.size());
    DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(n1).add(n2).add(n3).localNodeId(n1.getId()).build();

    boolean publicationDidNotMakeItToNode2 = randomBoolean();
    AtomicInteger remainingActions = new AtomicInteger(publicationDidNotMakeItToNode2 ? 2 : 3);
    int injectFaultAt = randomInt(remainingActions.get() - 1);
    logger.info("Injecting fault at: {}, publicationDidNotMakeItToNode2: {}", injectFaultAt, publicationDidNotMakeItToNode2);

    MockPublication publication = node1.publish(CoordinationStateTests.clusterState(1L, 2L,
        discoveryNodes, singleNodeConfig, singleNodeConfig, 42L), ackListener, Collections.emptySet());

    publication.pendingPublications.entrySet().stream().collect(shuffle()).forEach(e -> {
        if (e.getKey().equals(n2) == false || publicationDidNotMakeItToNode2 == false) {
            PublishResponse publishResponse = nodeResolver.apply(e.getKey()).coordinationState.handlePublishRequest(
                publication.publishRequest);
            e.getValue().onResponse(new PublishWithJoinResponse(publishResponse, Optional.empty()));
        }
    });

    publication.pendingCommits.entrySet().stream().collect(shuffle()).forEach(e -> {
        if (e.getKey().equals(n2)) {
            // we must fail node before committing for the node, otherwise failing the node is ignored
            publication.onFaultyNode(n2);
        }
        if (remainingActions.decrementAndGet() == injectFaultAt) {
            publication.onFaultyNode(n2);
        }
        if (e.getKey().equals(n2) == false || randomBoolean()) {
            nodeResolver.apply(e.getKey()).coordinationState.handleCommit(publication.applyCommit);
            e.getValue().onResponse(TransportResponse.Empty.INSTANCE);
        }
    });

    // we need to complete publication by failing the node
    if (publicationDidNotMakeItToNode2 && remainingActions.get() > injectFaultAt) {
        publication.onFaultyNode(n2);
    }

    assertTrue(publication.completed);
    assertTrue(publication.committed);

    publication.onFaultyNode(randomFrom(n1, n3)); // has no influence

    List<Tuple<DiscoveryNode, Throwable>> errors = ackListener.awaitErrors(0L, TimeUnit.SECONDS);
    assertThat(errors.size(), equalTo(1));
    assertThat(errors.get(0).v1(), equalTo(n2));
    assertThat(errors.get(0).v2().getMessage(), containsString("faulty node"));
}
 
/**
 * Update the counter to the value provided it is greater. Regardless of the
 * returned value, it is guaranteed that the value of the counter after
 * execution will be at least the input value.
 * 
 * @param counter
 *            the counter that should be updated
 * @param value
 *            the value to which the counter should be updated
 * @return {@code true} if the counter has been updated
 */
private static boolean updateIfSmaller(AtomicInteger counter, int value) {
	for (;;) {
		int snapshotCoutner = counter.get();
		if (snapshotCoutner >= value)
			return false;
		if (counter.compareAndSet(snapshotCoutner, value))
			return true;
	}
}
 
/**
 * Error message count int.
 *
 * @param topic     the topic
 * @param partition the partition
 * @param offset    the offset
 * @return the int
 */
protected int errorMessageCount(String topic, int partition, long offset) {

    if (retryCount == 0)

        return 0;

    String errorKey = errorMessageKey(topic, partition, offset);

    AtomicInteger count = errorMessageCount.get(errorKey);

    if (null == count) {

        count = errorMessageCount.putIfAbsent(errorKey, new AtomicInteger(1));
    }

    if (null != count) {

        if (retryCount > count.get()) {

            return count.incrementAndGet();

        } else {
            errorMessageCount.remove(errorKey);

            return 0;
        }
    }

    return 1;
}