下面列出了怎么用java.util.concurrent.atomic.AtomicIntegerArray的API类实例代码及写法,或者点击链接到github查看源代码。
public PartitionSenderRootExec(RootFragmentContext context,
RecordBatch incoming,
HashPartitionSender operator,
boolean closeIncoming) throws OutOfMemoryException {
super(context, context.newOperatorContext(operator, null), operator);
this.incoming = incoming;
this.operator = operator;
this.closeIncoming = closeIncoming;
this.context = context;
outGoingBatchCount = operator.getDestinations().size();
popConfig = operator;
remainingReceivers = new AtomicIntegerArray(outGoingBatchCount);
remaingReceiverCount = new AtomicInteger(outGoingBatchCount);
stats.setLongStat(Metric.N_RECEIVERS, outGoingBatchCount);
// Algorithm to figure out number of threads to parallelize output
// numberOfRows/sliceTarget/numReceivers/threadfactor
this.cost = operator.getChild().getCost().getOutputRowCount();
final OptionManager optMgr = context.getOptions();
long sliceTarget = optMgr.getOption(ExecConstants.SLICE_TARGET).num_val;
int threadFactor = optMgr.getOption(PlannerSettings.PARTITION_SENDER_THREADS_FACTOR.getOptionName()).num_val.intValue();
int tmpParts = 1;
if ( sliceTarget != 0 && outGoingBatchCount != 0 ) {
tmpParts = (int) Math.round((((cost / (sliceTarget*1.0)) / (outGoingBatchCount*1.0)) / (threadFactor*1.0)));
if ( tmpParts < 1) {
tmpParts = 1;
}
}
final int imposedThreads = optMgr.getOption(PlannerSettings.PARTITION_SENDER_SET_THREADS.getOptionName()).num_val.intValue();
if (imposedThreads > 0 ) {
this.numberPartitions = imposedThreads;
} else {
this.numberPartitions = Math.min(tmpParts, optMgr.getOption(PlannerSettings.PARTITION_SENDER_MAX_THREADS.getOptionName()).num_val.intValue());
}
logger.info("Preliminary number of sending threads is: " + numberPartitions);
this.actualPartitions = outGoingBatchCount > numberPartitions ? numberPartitions : outGoingBatchCount;
this.stats.setLongStat(Metric.SENDING_THREADS_COUNT, actualPartitions);
this.stats.setDoubleStat(Metric.COST, this.cost);
}
/**
* get returns the last value set at index
*/
public void testGetSet() {
AtomicIntegerArray aa = new AtomicIntegerArray(SIZE);
for (int i = 0; i < SIZE; i++) {
aa.set(i, 1);
assertEquals(1, aa.get(i));
aa.set(i, 2);
assertEquals(2, aa.get(i));
aa.set(i, -3);
assertEquals(-3, aa.get(i));
}
}
/**
* compareAndExchange succeeds in changing value if equal to
* expected else fails
*/
public void testCompareAndExchange() {
AtomicIntegerArray aa = new AtomicIntegerArray(SIZE);
for (int i = 0; i < SIZE; i++) {
aa.set(i, 1);
assertEquals(1, aa.compareAndExchange(i, 1, 2));
assertEquals(2, aa.compareAndExchange(i, 2, -4));
assertEquals(-4, aa.get(i));
assertEquals(-4, aa.compareAndExchange(i,-5, 7));
assertEquals(-4, aa.get(i));
assertEquals(-4, aa.compareAndExchange(i, -4, 7));
assertEquals(7, aa.get(i));
}
}
/**
* AtomicIntegerArray getAndAccumulate returns previous value and updates
* with supplied function.
*/
public void testIntArrayGetAndAccumulate() {
AtomicIntegerArray a = new AtomicIntegerArray(1);
a.set(0, 1);
assertEquals(1, a.getAndAccumulate(0, 2, Integer::sum));
assertEquals(3, a.getAndAccumulate(0, 3, Integer::sum));
assertEquals(6, a.get(0));
}
public static void main(String[] args) {
System.out.println(31 - Integer.numberOfLeadingZeros(8));
System.out.println(32 - Integer.numberOfLeadingZeros(8 - 1));
int[] array = {1, 2, 3, 4};
long[] longArray = {1, 2, 3, 4};
AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(array);
atomicIntegerArray.getAndIncrement(1);
AtomicLongArray atomicLongArray = new AtomicLongArray(longArray);
atomicLongArray.getAndIncrement(1);
Object[] objects = {new Object(), new Object()};
AtomicReferenceArray atomicReferenceArray = new AtomicReferenceArray(objects);
atomicReferenceArray.compareAndSet(1, new Object(), new Object());
}
public ThreadStorage(int intSize, int longSize) {
this.owner = Thread.currentThread();
if (intSize > 0) {
this.intStore = new AtomicIntegerArray(intSize);
} else {
this.intStore = null;
}
if (longSize > 0) {
this.longStore = new AtomicLongArray(longSize);
} else {
this.longStore = null;
}
}
/**
* a deserialized serialized array holds same values
*/
public void testSerialization() throws Exception {
AtomicIntegerArray x = new AtomicIntegerArray(SIZE);
for (int i = 0; i < SIZE; i++)
x.set(i, -i);
AtomicIntegerArray y = serialClone(x);
assertNotSame(x, y);
assertEquals(x.length(), y.length());
for (int i = 0; i < SIZE; i++) {
assertEquals(x.get(i), y.get(i));
}
}
/**
* repeated weakCompareAndSetAcquire succeeds in changing value when equal
* to expected
*/
public void testWeakCompareAndSetAcquire() {
AtomicIntegerArray aa = new AtomicIntegerArray(SIZE);
for (int i = 0; i < SIZE; i++) {
aa.set(i, 1);
do {} while (!aa.weakCompareAndSetAcquire(i, 1, 2));
do {} while (!aa.weakCompareAndSetAcquire(i, 2, -4));
assertEquals(-4, aa.get(i));
do {} while (!aa.weakCompareAndSetAcquire(i, -4, 7));
assertEquals(7, aa.get(i));
}
}
/**
* AtomicIntegerArray updateAndGet updates with supplied function and
* returns result.
*/
public void testIntArrayUpdateAndGet() {
AtomicIntegerArray a = new AtomicIntegerArray(1);
a.set(0, 1);
assertEquals(18, a.updateAndGet(0, Atomic8Test::addInt17));
assertEquals(35, a.updateAndGet(0, Atomic8Test::addInt17));
assertEquals(35, a.get(0));
}
/**
* Creates a new statistics instance of the given type
*
* @param type
* A description of the statistics
* @param textId
* Text that identifies this statistic when it is monitored
* @param numericId
* A number that displayed when this statistic is monitored
* @param uniqueId
* A number that uniquely identifies this instance
* @param system
* The distributed system that determines whether or not these
* statistics are stored (and collected) in GemFire shared memory or
* in the local VM
*/
public Atomic60StatisticsImpl(StatisticsType type, String textId,
long numericId, long uniqueId, StatisticsManager system) {
super(type, calcTextId(system, textId), calcNumericId(system, numericId),
uniqueId, 0);
this.dSystem = system;
StatisticsTypeImpl realType = (StatisticsTypeImpl)type;
int intCount = realType.getIntStatCount();
int longCount = realType.getLongStatCount();
int doubleCount = realType.getDoubleStatCount();
if (intCount > 0) {
this.intStorage = new AtomicIntegerArray(intCount);
}
else {
this.intStorage = null;
}
if (longCount > 0) {
this.longStorage = new AtomicLongArray(longCount);
}
else {
this.longStorage = null;
}
if (doubleCount > 0) {
this.doubleStorage = new AtomicLongArray(doubleCount);
}
else {
this.doubleStorage = null;
}
}
/**
* getAndDecrement returns previous value and decrements
*/
public void testGetAndDecrement() {
AtomicIntegerArray aa = new AtomicIntegerArray(SIZE);
for (int i = 0; i < SIZE; i++) {
aa.set(i, 1);
assertEquals(1, aa.getAndDecrement(i));
assertEquals(0, aa.getAndDecrement(i));
assertEquals(-1, aa.getAndDecrement(i));
}
}
static void test_2vi_off(AtomicIntegerArray a, AtomicIntegerArray b, int c, int d) {
for (int i = 0; i < ARRLEN-OFFSET; i+=1) {
a.lazySet((i+OFFSET), c);
b.lazySet((i+OFFSET), d);
}
}
static void test_cp_off(AtomicIntegerArray a, AtomicIntegerArray b) {
for (int i = 0; i < ARRLEN-OFFSET; i+=1) {
a.compareAndSet((i+OFFSET), -123, b.get(i+OFFSET));
}
}
static void test_2ci_inv(AtomicIntegerArray a, AtomicIntegerArray b, int k) {
for (int i = 0; i < ARRLEN-k; i+=1) {
a.compareAndSet((i+k), 123, -123);
b.compareAndSet((i+k), 123, -103);
}
}
static void test_vi_scl(AtomicIntegerArray a, int b, int old) {
for (int i = 0; i*SCALE < ARRLEN; i+=1) {
a.lazySet((i*SCALE), b);
}
}
static void test_2vi_off(AtomicIntegerArray a, AtomicIntegerArray b, int c, int d) {
for (int i = 0; i < ARRLEN-OFFSET; i+=1) {
a.compareAndSet((i+OFFSET), -123, c);
b.compareAndSet((i+OFFSET), -103, d);
}
}
static void test_2ci(AtomicIntegerArray a, AtomicIntegerArray b) {
for (int i = 0; i < ARRLEN; i+=1) {
a.set(i, -123);
b.set(i, -103);
}
}
static void test_ci_inv(AtomicIntegerArray a, int k, int old) {
for (int i = 0; i < ARRLEN-k; i+=1) {
a.compareAndSet((i+k), old, -123);
}
}
/**
* toString returns current value.
*/
public void testToString() {
int[] a = { 17, 3, -42, 99, -7 };
AtomicIntegerArray aa = new AtomicIntegerArray(a);
assertEquals(Arrays.toString(a), aa.toString());
}
static void test_2vi_unaln(AtomicIntegerArray a, AtomicIntegerArray b, int c, int d) {
for (int i = 0; i < ARRLEN-UNALIGN_OFF; i+=1) {
a.set(i, c);
b.set((i+UNALIGN_OFF), d);
}
}
static void test_cp_neg(AtomicIntegerArray a, AtomicIntegerArray b) {
for (int i = ARRLEN-1; i >= 0; i-=1) {
a.set(i, b.get(i));
}
}
static void test_cp_oppos(AtomicIntegerArray a, AtomicIntegerArray b) {
int limit = ARRLEN-1;
for (int i = 0; i < ARRLEN; i+=1) {
a.lazySet(i, b.get(limit-i));
}
}
static void test_2vi_neg(AtomicIntegerArray a, AtomicIntegerArray b, int c, int d) {
for (int i = ARRLEN-1; i >= 0; i-=1) {
a.lazySet(i, c);
b.lazySet(i, d);
}
}
static void test_vi(AtomicIntegerArray a, int b, int old) {
for (int i = 0; i < ARRLEN; i+=1) {
a.lazySet(i, b);
}
}
static void test_2vi(AtomicIntegerArray a, AtomicIntegerArray b, int c, int d) {
for (int i = 0; i < ARRLEN; i+=1) {
a.lazySet(i, c);
b.lazySet(i, d);
}
}
static void test_ci_off(AtomicIntegerArray a, int old) {
for (int i = 0; i < ARRLEN-OFFSET; i+=1) {
a.lazySet((i+OFFSET), -123);
}
}
static void test_vi_oppos(AtomicIntegerArray a, int b, int old) {
int limit = ARRLEN-1;
for (int i = limit; i >= 0; i-=1) {
a.set((limit-i), b);
}
}
static void test_vi(AtomicIntegerArray a, int b, int old) {
for (int i = 0; i < ARRLEN; i+=1) {
a.set(i, b);
}
}
static void test_vi_neg(AtomicIntegerArray a, int b, int old) {
for (int i = ARRLEN-1; i >= 0; i-=1) {
a.set(i, b);
}
}
static void test_vi_off(AtomicIntegerArray a, int b, int old) {
for (int i = 0; i < ARRLEN-OFFSET; i+=1) {
a.set((i+OFFSET), b);
}
}