下面列出了java.util.concurrent.atomic.AtomicInteger#getAndAdd() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testSingle() {
final AtomicInteger i = new AtomicInteger(0);
final AtomicInteger cnt = new AtomicInteger(0);
final BlockingEventHandler<Integer> h = new BlockingEventHandler<>(1, new EventHandler<Iterable<Integer>>() {
@Override
public void onNext(final Iterable<Integer> value) {
for (final int x : value) {
i.getAndAdd(x);
cnt.incrementAndGet();
}
}
});
h.onNext(0xBEEF);
assertEquals(0xBEEF, i.get());
assertEquals(1, cnt.get());
}
public static void main(String[] args) {
int temValue = 0;
AtomicInteger value = new AtomicInteger(0);
temValue = value.getAndSet(3);
// 首先get,获取到当前value的值为0,并赋值给temValue,之后设置新值3,此时value为3
System.out.println("temValue = " + temValue + " value = " + value);
temValue = value.getAndIncrement();
System.out.println("temValue = " + temValue + " value = " + value);
temValue = value.getAndDecrement();
System.out.println("temValue = " + temValue + " value = " + value);
temValue = value.getAndAdd(10);
System.out.println("temValue = " + temValue + " value = " + value);
}
@Test
public void test() throws InterruptedException {
final AtomicInteger atomic = new AtomicInteger();
Runnable task = new Runnable() {
@Override
public void run() {
int i = atomic.getAndAdd(1);
String encrypt = EncryptUtil.encrypt("" + i);
System.out.println(EncryptUtil.decrypt(encrypt));
}
};
ExecutorService executor = Executors.newFixedThreadPool(20);
for (int i = 0; i < 20; i++) {
executor.execute(task);
}
Thread.sleep(3000);
}
private void processStreamsByTopic(String topicKeys, List<KafkaStream<byte[], byte[]>> streamList) {
// init stream thread pool
ExecutorService streamPool = Executors.newFixedThreadPool(partitions);
String[] topics = StringUtils.split(topicKeys, ",");
if (log.isDebugEnabled())
log.debug("准备处理消息流集合 KafkaStreamList,topic count={},topics={}, partitions/topic={}", topics.length, topicKeys, partitions);
//遍历stream
AtomicInteger index = new AtomicInteger(0);
for (KafkaStream<byte[], byte[]> stream : streamList) {
Thread streamThread = new Thread() {
@Override
public void run() {
int i = index.getAndAdd(1);
if (log.isDebugEnabled())
log.debug("处理消息流KafkaStream -- No.={}, partitions={}", i, partitions + ":" + i);
ConsumerIterator<byte[], byte[]> consumerIterator = stream.iterator();
processStreamByConsumer(topicKeys, consumerIterator);
}
};
streamPool.execute(streamThread);
}
}
private Integer sequence(Entity entity) {
String format = config().get(SEQUENCE_FORMAT);
Integer increment = config().get(SEQUENCE_INCREMENT);
AtomicInteger state = sensors().get(SEQUENCE_STATE);
Integer current = state.getAndAdd(increment);
String string = String.format(format, current);
AttributeSensor<Integer> valueSensor = config().get(SEQUENCE_VALUE_SENSOR);
AttributeSensor<String> stringSensor = config().get(SEQUENCE_STRING_SENSOR);
entity.sensors().set(valueSensor, current);
entity.sensors().set(stringSensor, string);
LOG.debug("Sequence on {} set to to {}", entity, current);
sensors().set(SEQUENCE_CURRENT, entity);
LOG.debug("Sequence for {} incremented to {}", this, state.get());
return current;
}
@Test
@DisplayName("perform action with value if non-empty else perform empty action")
@Tag("PASSING")
@Order(6)
public void ifPresentConsumeOrElseOtherAction() {
AtomicInteger nonEmptyValueCounter = new AtomicInteger(0);
Consumer<Integer> nonEmptyValueAction = x -> nonEmptyValueCounter.getAndAdd(x);
Runnable alternateAction = nonEmptyValueCounter::getAndDecrement;
Optional<Integer> nonEmptyIntegerOptional = Optional.of(10);
/*
* DONE:
* Add an ifPresentOrElse call to run either the nonEmptyValueAction or alternateAction
* (depending on whether the optional has a value or not)
* Check API: java.util.Optional.ifPresentOrElse(?, ?)
*/
nonEmptyIntegerOptional.ifPresentOrElse(nonEmptyValueAction, alternateAction);
assertEquals(10, nonEmptyValueCounter.get(), "");
Optional<Integer> emptyIntegerOptional = Optional.ofNullable(null);
/*
* DONE:
* Add an ifPresentOrElse call to run either the nonEmptyValueAction or alternateAction
* (depending on whether the optional has a value or not)
* Check API: java.util.Optional.ifPresentOrElse(?, ?)
*/
emptyIntegerOptional.ifPresentOrElse(nonEmptyValueAction, alternateAction);
assertEquals(9, nonEmptyValueCounter.get(), "");
}
@Test
@DisplayName("perform action with value if non-empty else perform empty action")
@Tag("TODO")
@Order(6)
public void ifPresentConsumeOrElseOtherAction() {
AtomicInteger nonEmptyValueCounter = new AtomicInteger(0);
Consumer<Integer> nonEmptyValueAction = x -> nonEmptyValueCounter.getAndAdd(x);
Runnable alternateAction = nonEmptyValueCounter::getAndDecrement;
Optional<Integer> nonEmptyIntegerOptional = Optional.of(10);
/*
* TODO:
* Add an ifPresentOrElse call to run either the nonEmptyValueAction or alternateAction
* (depending on whether the optional has a value or not)
* Check API: java.util.Optional.ifPresentOrElse(?, ?)
*/
// nonEmptyIntegerOptional.???;
assertEquals(10, nonEmptyValueCounter.get(), "");
Optional<Integer> emptyIntegerOptional = Optional.ofNullable(null);
/*
* TODO:
* Add an ifPresentOrElse call to run either the nonEmptyValueAction or alternateAction
* (depending on whether the optional has a value or not)
* Check API: java.util.Optional.ifPresentOrElse(?, ?)
*/
// emptyIntegerOptional.???
assertEquals(9, nonEmptyValueCounter.get(), "");
}
private DiskCacheManager(final File cacheDir, final long sizeLimit, final int countLimit) {
this.cacheDir = cacheDir;
this.sizeLimit = sizeLimit;
this.countLimit = countLimit;
cacheSize = new AtomicLong();
cacheCount = new AtomicInteger();
mThread = new Thread(new Runnable() {
@Override
public void run() {
int size = 0;
int count = 0;
final File[] cachedFiles = cacheDir.listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.startsWith(CACHE_PREFIX);
}
});
if (cachedFiles != null) {
for (File cachedFile : cachedFiles) {
size += cachedFile.length();
count += 1;
lastUsageDates.put(cachedFile, cachedFile.lastModified());
}
cacheSize.getAndAdd(size);
cacheCount.getAndAdd(count);
}
}
});
mThread.start();
}
private boolean digest(MessageDigest md, byte[] bytes, AtomicInteger written) {
if (this.dkimSignOptions.getBodyLimit() > 0) {
int left = this.dkimSignOptions.getBodyLimit() - written.get();
if (left > 0) {
int len = Math.min(left, bytes.length);
md.update(bytes, 0, len);
written.getAndAdd(len);
} else {
return false;
}
} else {
md.update(bytes);
}
return true;
}
public boolean set(char[] p, AtomicInteger pos, AtomicInteger nStars, AtomicInteger nDots, int caseMode) {
this.caseMode = caseMode;
if (pos.get() == p.length || p[pos.get()] == '\0') {
cc = MapCharClass.cEOS;
this.c = '\0';
return false;
}
this.c = p[pos.get()];
if (p[pos.get()] == '/') {
cc = MapCharClass.cSLASH;
pos.incrementAndGet();
} else if (pos.get() <= p.length - 3 && p[pos.get()] == '.' && p[pos.get() + 1] == '.' && p[pos.get() + 2] == '.') {
cc = MapCharClass.cDOTS;
paramNumber = PARAM_BASE_DOTS + nDots.getAndAdd(1);
pos.addAndGet(3);
} else if (pos.get() <= p.length - 3 && p[pos.get()] == '%' && p[pos.get() + 1] == '%' && p[pos.get() + 2] >= '0' && p[pos.get() + 2] <= '9') {
cc = MapCharClass.cPERC;
paramNumber = PARAM_BASE_PERCENT + (p[pos.get() + 2] - '0');
pos.addAndGet(3);
} else if (p[pos.get()] == '*') {
cc = MapCharClass.cSTAR;
paramNumber = PARAM_BASE_STARS + nStars.getAndAdd(1);
pos.incrementAndGet();
} else {
cc = MapCharClass.cCHAR;
pos.incrementAndGet();
}
return true;
}
public boolean set(char[] p, AtomicInteger pos, AtomicInteger nStars, AtomicInteger nDots, int caseMode) {
this.caseMode = caseMode;
if (pos.get() == p.length || p[pos.get()] == '\0') {
cc = MapCharClass.cEOS;
this.c = '\0';
return false;
}
this.c = p[pos.get()];
if (p[pos.get()] == '/') {
cc = MapCharClass.cSLASH;
pos.incrementAndGet();
} else if (pos.get() <= p.length - 3 && p[pos.get()] == '.' && p[pos.get() + 1] == '.' && p[pos.get() + 2] == '.') {
cc = MapCharClass.cDOTS;
paramNumber = PARAM_BASE_DOTS + nDots.getAndAdd(1);
pos.addAndGet(3);
} else if (pos.get() <= p.length - 3 && p[pos.get()] == '%' && p[pos.get() + 1] == '%' && p[pos.get() + 2] >= '0' && p[pos.get() + 2] <= '9') {
cc = MapCharClass.cPERC;
paramNumber = PARAM_BASE_PERCENT + (p[pos.get() + 2] - '0');
pos.addAndGet(3);
} else if (p[pos.get()] == '*') {
cc = MapCharClass.cSTAR;
paramNumber = PARAM_BASE_STARS + nStars.getAndAdd(1);
pos.incrementAndGet();
} else {
cc = MapCharClass.cCHAR;
pos.incrementAndGet();
}
return true;
}
@Test
public void testTwoStreams() {
final AtomicInteger i = new AtomicInteger(0);
final AtomicInteger cnt = new AtomicInteger(0);
final int num = 1000;
final BlockingEventHandler<Integer> h = new BlockingEventHandler<>(2 * num, new EventHandler<Iterable<Integer>>() {
@Override
public void onNext(final Iterable<Integer> value) {
for (final int x : value) {
i.getAndAdd(x);
cnt.incrementAndGet();
}
}
});
final Runnable r = new Runnable() {
@Override
public void run() {
for (int i = 0; i < num; i++) {
h.onNext(i);
}
}
};
final Thread a = new Thread(r);
final Thread b = new Thread(r);
a.start();
b.start();
try {
a.join();
b.join();
} catch (final InterruptedException e) {
fail(e.toString());
}
assertEquals(2 * (num - 1) * num / 2, i.get());
assertEquals(2 * num, cnt.get());
}
private void runSourceAndUpdateOffset(
final Map<String, String> lastOffset,
final AtomicInteger totalNoOfRecordsRead
) throws Exception {
int batchSize = (RANDOM.nextInt(5) + 1) * 10;
TableConfigBeanImpl tableConfigBean = new TableJdbcSourceTestBuilder.TableConfigBeanTestBuilder()
.tablePattern(TABLE_NAME)
.schema(database)
.build();
TableJdbcSource tableJdbcSource = new TableJdbcSourceTestBuilder(JDBC_URL, true, USER_NAME, PASSWORD)
.tableConfigBeans(ImmutableList.of(tableConfigBean))
.maxBatchSize(batchSize)
.build();
PushSourceRunner runner = new PushSourceRunner.Builder(TableJdbcDSource.class, tableJdbcSource)
.addOutputLane("a").build();
JdbcPushSourceTestCallback callback = new JdbcPushSourceTestCallback(runner, 1);
runner.runInit();
try {
runner.runProduce(lastOffset, batchSize, callback);
List<Record> actualRecords = callback.waitForAllBatchesAndReset().get(0);
LOGGER.info("Read {} records", actualRecords.size());
if (totalNoOfRecordsRead.get() >= expectedRecords.size()) {
Assert.assertEquals(0, actualRecords.size());
} else {
checkRecords(
expectedRecords.subList(totalNoOfRecordsRead.get(), totalNoOfRecordsRead.get() + actualRecords.size()),
actualRecords
);
totalNoOfRecordsRead.getAndAdd(actualRecords.size());
}
lastOffset.clear();
lastOffset.putAll(runner.getOffsets());
} finally {
runner.setStop();
runner.runDestroy();
}
}
protected static BigInteger decodeBigInteger(ByteBuf byteBuf, AtomicInteger offset) {
byte[] idBytes = new byte[16];
byteBuf.getBytes(offset.get(), idBytes, 0, 16);
offset.getAndAdd(16);
return new BigInteger(idBytes);
}
@Test
public void testMultipleResets() {
final AtomicInteger i = new AtomicInteger(0);
final AtomicInteger cnt = new AtomicInteger(0);
final AtomicInteger cntcall = new AtomicInteger(0);
final int num = 1000;
final int events = 10;
final BlockingEventHandler<Integer> h = new BlockingEventHandler<>(2 * num, new EventHandler<Iterable<Integer>>() {
@Override
public void onNext(final Iterable<Integer> value) {
for (final int x : value) {
i.getAndAdd(x);
cnt.incrementAndGet();
}
cntcall.incrementAndGet();
}
});
final Runnable r = new Runnable() {
@Override
public void run() {
for (int i = 0; i < num * events; i++) {
h.onNext(i);
}
}
};
final Thread a = new Thread(r);
final Thread b = new Thread(r);
a.start();
b.start();
try {
a.join();
b.join();
} catch (final InterruptedException e) {
fail(e.toString());
}
assertEquals(2 * (num * events - 1) * (num * events) / 2, i.get());
assertEquals(2 * num * events, cnt.get());
assertEquals(events, cntcall.get());
}
public Thread getOrderDetailsInSeparateThreadAndTransaction(
final Semaphore semaphore,
final int orderItemId,
final int threadNumber,
final AtomicInteger indicator) throws InterruptedException
{
final Runnable updater = new Runnable()
{
@Override
public void run()
{
LOGGER.debug("Thread " + threadNumber + " started");
AuditedOrderItem detached = AuditedOrderItemFinder.findOne(AuditedOrderItemFinder.id().eq(orderItemId)).getDetachedCopy();
MithraTransaction tx = MithraManager.getInstance().startOrContinueTransaction();
AuditedOrderFinder.setTransactionModeReadCacheWithOptimisticLocking(tx);
AuditedOrderItemFinder.setTransactionModeReadCacheWithOptimisticLocking(tx);
AuditedManufacturerFinder.setTransactionModeReadCacheWithOptimisticLocking(tx);
tx.setTransactionName("txn" + threadNumber);
LOGGER.debug("thread" + threadNumber + " - started transaction : " + tx.getTransactionName());
LOGGER.debug("Thread " + threadNumber + " - waiting to acquire semaphore");
try
{
semaphore.acquire();
LOGGER.debug("Thread " + threadNumber + " - semaphore acquired");
//go through relationship to order
LOGGER.debug("Thread " + threadNumber + " start of second fetch of product code");
String manufacturerName = detached.getOrder().getItems().get(0).getManufacturer().getName();
LOGGER.debug("Thread " + threadNumber + " - product code : " + manufacturerName);
LOGGER.debug("Thread " + threadNumber + "sleeping");
sleep(100);
LOGGER.debug("Thread " + threadNumber + "awake");
}
catch (RuntimeException re)
{
LOGGER.error("Unexpected:", re);
indicator.getAndAdd(1);
}
catch (InterruptedException ie)
{
LOGGER.error("Unexpected:", ie);
}
tx.commit();
LOGGER.debug("Thread " + threadNumber + " - transaction committed : " + tx.getTransactionName());
}
};
Thread anotherThread = new Thread(updater, "Thread number: " + threadNumber + " order ID : " + orderItemId);
anotherThread.start();
return anotherThread;
}
private <T> Map<String, T> getAsyncThrows(
final String pathNameForLogs,
final Collection<String> paths,
final Transcoder<T> transcoder,
final Optional<ZkCache<T>> cache
)
throws Exception {
final Map<String, T> objects = new HashMap<>(paths.size());
if (cache.isPresent()) {
for (Iterator<String> itr = paths.iterator(); itr.hasNext();) {
final String path = itr.next();
final Optional<T> fromCache = cache.get().get(path);
if (fromCache.isPresent()) {
objects.put(path, fromCache.get());
itr.remove();
}
}
}
if (paths.isEmpty()) {
return objects;
}
final Map<String, T> synchronizedObjects = Collections.synchronizedMap(objects);
final CountDownLatch latch = new CountDownLatch(paths.size());
final AtomicInteger bytes = new AtomicInteger();
final BackgroundCallback callback = new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event)
throws Exception {
try {
if (event.getData() == null || event.getData().length == 0) {
LOG.trace("Expected active node {} but it wasn't there", event.getPath());
return;
}
bytes.getAndAdd(event.getData().length);
final T object = transcoder.fromBytes(event.getData());
synchronizedObjects.put(event.getPath(), object);
if (cache.isPresent()) {
cache.get().set(event.getPath(), object);
}
} finally {
latch.countDown();
}
}
};
return queryAndReturnResultsThrows(
objects,
paths,
callback,
latch,
pathNameForLogs,
bytes,
CuratorQueryMethod.GET_DATA
);
}
@OpenCLMapping(atomic32 = true, mapTo = "atomic_add")
protected final int atomicAdd(AtomicInteger p, int val) {
return p.getAndAdd(val);
}
protected <T> List<T> getAsyncNestedChildrenAsListThrows(
final String pathNameForLogs,
final List<String> parentPaths,
final Transcoder<T> transcoder
)
throws Exception {
final List<String> allPaths = new ArrayList<>();
for (String parent : parentPaths) {
for (String child : getChildren(parent)) {
allPaths.add(ZKPaths.makePath(parent, child));
}
}
final List<T> results = new ArrayList<>();
final CountDownLatch latch = new CountDownLatch(allPaths.size());
final AtomicInteger bytes = new AtomicInteger();
final BackgroundCallback callback = new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event)
throws Exception {
try {
if (event.getData() == null || event.getData().length == 0) {
LOG.trace("Expected active node {} but it wasn't there", event.getPath());
return;
}
bytes.getAndAdd(event.getData().length);
final T object = transcoder.fromBytes(event.getData());
results.add(object);
} finally {
latch.countDown();
}
}
};
return queryAndReturnResultsThrows(
results,
allPaths,
callback,
latch,
pathNameForLogs,
bytes,
CuratorQueryMethod.GET_DATA
);
}
/**
* Test thin client in multi-thread environment.
*/
@Test
public void testMultithreading() throws Exception {
final int THREAD_CNT = 8;
final int ITERATION_CNT = 20;
final int BATCH_SIZE = 1000;
final int PAGE_CNT = 3;
IgniteConfiguration srvCfg = Config.getServerConfiguration();
// No peer class loading from thin clients: we need the server to know about this class to deserialize
// ScanQuery filter.
srvCfg.setBinaryConfiguration(new BinaryConfiguration().setTypeConfigurations(Arrays.asList(
new BinaryTypeConfiguration(getClass().getName()),
new BinaryTypeConfiguration(SerializedLambda.class.getName())
)));
try (Ignite ignored = Ignition.start(srvCfg);
IgniteClient client = Ignition.startClient(new ClientConfiguration().setAddresses(Config.SERVER))
) {
ClientCache<Integer, String> cache = client.createCache("testMultithreading");
AtomicInteger cnt = new AtomicInteger(1);
AtomicReference<Throwable> error = new AtomicReference<>();
Runnable assertion = () -> {
try {
int rangeStart = cnt.getAndAdd(BATCH_SIZE);
int rangeEnd = rangeStart + BATCH_SIZE;
Map<Integer, String> data = IntStream.range(rangeStart, rangeEnd).boxed()
.collect(Collectors.toMap(i -> i, i -> String.format("String %s", i)));
cache.putAll(data);
Query<Cache.Entry<Integer, String>> qry = new ScanQuery<Integer, String>()
.setPageSize(data.size() / PAGE_CNT)
.setFilter((i, s) -> i >= rangeStart && i < rangeEnd);
try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) {
List<Cache.Entry<Integer, String>> res = cur.getAll();
assertEquals("Unexpected number of entries", data.size(), res.size());
Map<Integer, String> act = res.stream()
.collect(Collectors.toMap(Cache.Entry::getKey, Cache.Entry::getValue));
assertEquals("Unexpected entries", data, act);
}
}
catch (Throwable ex) {
error.set(ex);
}
};
CountDownLatch complete = new CountDownLatch(THREAD_CNT);
Runnable manyAssertions = () -> {
for (int i = 0; i < ITERATION_CNT && error.get() == null; i++)
assertion.run();
complete.countDown();
};
ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_CNT);
IntStream.range(0, THREAD_CNT).forEach(t -> threadPool.submit(manyAssertions));
assertTrue("Timeout", complete.await(180, TimeUnit.SECONDS));
String errMsg = error.get() == null ? "" : error.get().getMessage();
assertNull(errMsg, error.get());
}
}