下面列出了java.util.concurrent.atomic.AtomicLong#addAndGet() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private static Pair<Long, String> createLogEntity(AtomicLong base1, int hostIndex) {
// TODO: add randomization
LogEntity le = new LogEntity();
if (hostIndex < 3) {
le.component = "NOVA";
le.host = "nova.000-" + hostIndex + ".datacenter.corp.com";
le.message = "RabbitMQ Exception - MQ not connectable!";
} else {
le.component = "NEUTRON";
le.host = "neturon.000-" + (hostIndex - 3) + ".datacenter.corp.com";
le.message = "DNS Exception - Fail to connect to DNS!";
}
le.instanceUuid = nextUuid;
le.logLevel = "ERROR";
le.reqId = nextReqId;
le.timestamp = base1.get();
base1.addAndGet(1000);// simply some interval.
return Pair.of(base1.get(), JsonUtils.writeValueAsString(le));
}
private void store(byte[] key, byte[] value) throws IOException {
if (null == this.db) {
return;
}
WriteBatch batch = perThreadWriteBatch.get();
AtomicLong size = perThreadWriteBatchSize.get();
boolean written = false;
WriteOptions options = new WriteOptions().sync(null == key || null == value || 1.0 == syncrate);
try {
if (null != key && null != value) {
batch.put(key, value);
size.addAndGet(key.length + value.length);
}
if (null == key || null == value || size.get() > MAX_BATCH_SIZE) {
if (syncwrites && !options.sync()) {
options = new WriteOptions().sync(Math.random() < syncrate);
}
this.db.write(batch, options);
size.set(0L);
perThreadWriteBatch.remove();
written = true;
}
} finally {
if (written) {
batch.close();
}
}
}
@Override
public void cleanUp() {
LOGGER.debug("Clean up {}", this.procId);
synchronized (COUNTS) {
AtomicLong count = COUNTS.get(this.procId);
if(count == null) {
COUNTS.put(this.procId, new AtomicLong(this.count));
} else {
count.addAndGet(this.count);
}
}
LOGGER.debug("{}\t{}", this.procId, this.count);
}
protected void queueWithOp(final Object shardId, final LinkedBlockingQueue<TL> listQueue, final AtomicLong total)
{
Operation op = mainOperation;
if (shardId != null)
{
op = op.and(finderInstance.getSourceAttribute().nonPrimitiveEq(shardId));
}
Operation additionalOperation = additionalPerShardRetrievalOperations.get(shardId);
if (additionalOperation != null)
{
op = op.and(additionalOperation);
}
final List accumulator = FastList.newList(batchSize);
MithraList many = ((RelatedFinder)finderInstance).findMany(op);
many.forEachWithCursor(new DoWhileProcedure()
{
@Override
public boolean execute(Object obj)
{
T result = (T) obj;
accumulator.add(result);
if (accumulator.size() == batchSize)
{
queueResultsWithoutDeepFetch(accumulator, listQueue, shardId);
total.addAndGet(accumulator.size());
accumulator.clear();
}
return true;
}
});
if (!accumulator.isEmpty())
{
queueResultsWithoutDeepFetch(accumulator, listQueue, shardId);
total.addAndGet(accumulator.size());
}
}
@Override
public void incrementCustom(String metric, long delta) {
AtomicLong counter = customMetrics.get(metric);
if (counter==null) {
customMetrics.putIfAbsent(metric,new AtomicLong(0));
counter = customMetrics.get(metric);
}
counter.addAndGet(delta);
if (log.isDebugEnabled())
log.debug("[{}:{}] Incremented by {}", System.identityHashCode(customMetrics), metric, delta);
}
/**
* @param xs Map.
* @param key Key.
* @param inc Increment.
*/
private static void add(Map<Integer,AtomicLong> xs, int key, boolean inc) {
AtomicLong cntr = xs.get(key);
if (cntr == null) {
if (!inc)
fail("Nothing to decrement.");
xs.put(key, cntr = new AtomicLong());
}
cntr.addAndGet(inc ? 1 : -1);
}
private void checkSegmentReads(String segmentName, AtomicLong expectedCurrentOffset, long segmentLength, StreamSegmentStore store, byte[] expectedData) throws Exception {
@Cleanup
ReadResult readResult = store.read(segmentName, expectedCurrentOffset.get(), (int) (segmentLength - expectedCurrentOffset.get()), TIMEOUT).join();
Assert.assertTrue("Empty read result for segment " + segmentName, readResult.hasNext());
// A more thorough read check is done in StreamSegmentContainerTests; here we just check if the data was merged correctly.
while (readResult.hasNext()) {
ReadResultEntry readEntry = readResult.next();
AssertExtensions.assertGreaterThan("getRequestedReadLength should be a positive integer for segment " + segmentName,
0, readEntry.getRequestedReadLength());
Assert.assertEquals("Unexpected value from getStreamSegmentOffset for segment " + segmentName,
expectedCurrentOffset.get(), readEntry.getStreamSegmentOffset());
if (!readEntry.getContent().isDone()) {
readEntry.requestContent(TIMEOUT);
}
readEntry.getContent().get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
Assert.assertNotEquals("Unexpected value for isEndOfStreamSegment for non-sealed segment " + segmentName,
ReadResultEntryType.EndOfStreamSegment, readEntry.getType());
BufferView readEntryContents = readEntry.getContent().join();
byte[] actualData = readEntryContents.getCopy();
AssertExtensions.assertArrayEquals("Unexpected data read from segment " + segmentName + " at offset " + expectedCurrentOffset,
expectedData, (int) expectedCurrentOffset.get(), actualData, 0, readEntryContents.getLength());
expectedCurrentOffset.addAndGet(readEntryContents.getLength());
}
Assert.assertTrue("ReadResult was not closed post-full-consumption for segment" + segmentName, readResult.isClosed());
}
private static void perfTest(final int concurrent, final int seconds) throws InterruptedException {
final Random rand = new Random();
final AtomicBoolean exit = new AtomicBoolean(false);
final AtomicLong counter = new AtomicLong();
final AtomicLong latency = new AtomicLong();
Thread[] threads = new Thread[concurrent];
for (int j = 0; j < concurrent; j++) {
threads[j] = new Thread() {
@Override
public void run() {
while (!exit.get()) {
String db = "db_" + rand.nextInt(256);
String key = "key_" + (rand.nextInt(64));
String status = "status_" + (rand.nextInt(8));
final long delay = 1000 + Math.abs(rand.nextLong()) % 2000;
final long nanos = System.nanoTime();
logger.stat(db, key, status, delay);
latency.addAndGet(System.nanoTime() - nanos);
counter.incrementAndGet();
LockSupport.parkNanos(1000);
}
}
};
threads[j].start();
}
Thread.sleep(seconds * 1000);
System.out.println("concurrent: " + concurrent + ", seconds: " + seconds + ", number: " + counter.get()
+ ", RT: " + (latency.get() / counter.get()) + ", TPS: "
+ ((long) (counter.get() * 100 / seconds)) / 100);
exit.set(true);
for (Thread thread : threads) {
thread.join();
}
}
/**
* @param cntr Received event counter.
* @return Local listener.
*/
protected CacheEntryUpdatedListener<Integer, Integer> localListener(final AtomicLong cntr) {
return new CacheEntryUpdatedListener<Integer, Integer>() {
@Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> events)
throws CacheEntryListenerException {
int size = 0;
for (CacheEntryEvent<? extends Integer, ? extends Integer> e : events)
++size;
cntr.addAndGet(size);
}
};
}
@Test
public void testReaderClose() throws SegmentSealedException {
String scope = "scope";
String stream = "stream";
AtomicLong clock = new AtomicLong();
MockSegmentStreamFactory segmentStreamFactory = new MockSegmentStreamFactory();
PravegaNodeUri endpoint = new PravegaNodeUri("localhost", -1);
@Cleanup
MockConnectionFactoryImpl connectionFactory = new MockConnectionFactoryImpl();
@Cleanup
MockController controller = new MockController(endpoint.getEndpoint(), endpoint.getPort(), connectionFactory, false);
//Mock for the two SegmentInputStreams.
Segment segment1 = new Segment(scope, stream, 0);
@Cleanup
SegmentOutputStream stream1 = segmentStreamFactory.createOutputStreamForSegment(segment1, segmentSealedCallback, writerConfig, DelegationTokenProviderFactory.createWithEmptyToken());
writeInt(stream1, 1);
writeInt(stream1, 1);
writeInt(stream1, 1);
Segment segment2 = new Segment(scope, stream, 1);
@Cleanup
SegmentOutputStream stream2 = segmentStreamFactory.createOutputStreamForSegment(segment2, segmentSealedCallback, writerConfig, DelegationTokenProviderFactory.createWithEmptyToken());
writeInt(stream2, 2);
writeInt(stream2, 2);
writeInt(stream2, 2);
StateSynchronizer<ReaderGroupState> sync = createStateSynchronizerForReaderGroup(connectionFactory, controller,
segmentStreamFactory,
Stream.of(scope, stream),
"reader1", clock, 2);
@Cleanup
EventStreamReaderImpl<byte[]> reader1 = createReader(controller, segmentStreamFactory, "reader1", sync, clock);
@Cleanup
EventStreamReaderImpl<byte[]> reader2 = createReader(controller, segmentStreamFactory, "reader2", sync, clock);
assertEquals(1, readInt(reader1.readNextEvent(0)));
assertEquals(2, readInt(reader2.readNextEvent(0)));
reader2.close();
clock.addAndGet(ReaderGroupStateManager.UPDATE_WINDOW.toNanos());
assertEquals(1, readInt(reader1.readNextEvent(0)));
assertEquals(2, readInt(reader1.readNextEvent(0)));
assertEquals(1, readInt(reader1.readNextEvent(0)));
assertEquals(2, readInt(reader1.readNextEvent(0)));
}
@Test
public void test_size_1() throws Exception {
AtomicLong currentTimeMillis = new AtomicLong(0L);
Clock clock = Clock.mock(currentTimeMillis);
Top top = Top.builder(1)
.resetPositionsPeriodicallyByChunks(Duration.ofSeconds(3), 3)
.withSnapshotCachingDuration(Duration.ZERO)
.withClock(clock)
.withBackgroundExecutor(MockExecutor.INSTANCE)
.build();
TopTestUtil.assertEmpty(top);
TopTestUtil.update(top, TestData.fifth);
TopTestUtil.checkOrder(top, TestData.fifth);
currentTimeMillis.addAndGet(500L); //500
TopTestUtil.checkOrder(top, TestData.fifth);
currentTimeMillis.addAndGet(500L); //1000
TopTestUtil.checkOrder(top, TestData.fifth);
TopTestUtil.update(top, TestData.fourth);
TopTestUtil.checkOrder(top, TestData.fifth);
TopTestUtil.checkOrder(top, TestData.fifth);
currentTimeMillis.addAndGet(1L); //1001
TopTestUtil.update(top, TestData.first);
TopTestUtil.checkOrder(top, TestData.fifth);
currentTimeMillis.addAndGet(1000L); //2001
TopTestUtil.checkOrder(top, TestData.fifth);
TopTestUtil.update(top, TestData.first);
TopTestUtil.update(top, TestData.second);
TopTestUtil.update(top, TestData.third);
TopTestUtil.checkOrder(top, TestData.fifth);
currentTimeMillis.addAndGet(999L); //3000
TopTestUtil.checkOrder(top, TestData.fifth);
currentTimeMillis.addAndGet(1L); //3001
TopTestUtil.update(top, TestData.second);
TopTestUtil.checkOrder(top, TestData.fifth);
currentTimeMillis.addAndGet(999L); //4000
TopTestUtil.update(top, TestData.first);
TopTestUtil.checkOrder(top, TestData.fourth);
currentTimeMillis.addAndGet(1000L); //5000
TopTestUtil.checkOrder(top, TestData.third);
currentTimeMillis.addAndGet(1000L); //6000
TopTestUtil.checkOrder(top, TestData.second);
currentTimeMillis.addAndGet(1000L); //7000
TopTestUtil.checkOrder(top, TestData.first);
currentTimeMillis.addAndGet(1000L); //8000
TopTestUtil.assertEmpty(top);
currentTimeMillis.addAndGet(2999L); //10_999
TopTestUtil.assertEmpty(top);
TopTestUtil.update(top, TestData.second);
TopTestUtil.checkOrder(top, TestData.second);
currentTimeMillis.addAndGet(3000L); //13_999
TopTestUtil.checkOrder(top, TestData.second);
currentTimeMillis.addAndGet(1L); //14_000
TopTestUtil.assertEmpty(top);
}
public static void updateRow() throws Exception {
Connection conn = getConnection(url);
// conn.setAutoCommit(true);
System.out.println("isAutoCommit: " + conn.getAutoCommit());
createTable(conn);
System.out.println("isAutoCommit: " + conn.getAutoCommit());
Statement stmt = conn.createStatement();
int count = 5000;
stmt.executeUpdate("INSERT INTO DeadLockTest(f1, f2, f3) VALUES(100, " + count + ", 30)");
// stmt.executeUpdate("UPDATE DeadLockTest SET f2=f2-1 where f1=100 and f2>0");
// conn.commit();
int n = 100;
int loop = count / n;
AtomicLong total = new AtomicLong();
Thread[] threads = new Thread[n];
for (int i = 0; i < n; i++) {
threads[i] = new Thread(() -> {
try {
Connection conn2 = getConnection(url);
// conn2.setAutoCommit(false);
// conn2.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
Statement stmt2 = conn2.createStatement();
long t1 = System.currentTimeMillis();
for (int j = 0; j < loop; j++) {
stmt2.executeUpdate("UPDATE DeadLockTest SET f2=f2-1 where f1=100 and f2>0");
}
total.addAndGet(System.currentTimeMillis() - t1);
// conn2.commit();
conn2.close();
} catch (Exception e) {
e.printStackTrace();
}
});
}
for (int i = 0; i < n; i++) {
threads[i].start();
}
for (int i = 0; i < n; i++) {
threads[i].join();
}
conn.close();
System.out.println("count: " + count + ", threads: " + n + ", avg time: " + total.get() / n + " ms");
}
@Benchmark
public Object testAtomicLongPerformance() {
final AtomicLong count = new AtomicLong();
count.addAndGet(1);
return count;
}
@Test(timeout = 10000)
public void testSegmentsCannotBeReleasedWithoutCheckpoint() throws ReaderNotInReaderGroupException {
String scope = "scope";
String stream = "stream";
PravegaNodeUri endpoint = new PravegaNodeUri("localhost", SERVICE_PORT);
MockConnectionFactoryImpl connectionFactory = new MockConnectionFactoryImpl();
SegmentWithRange segment0 = new SegmentWithRange(new Segment(scope, stream, 0), 0.0, 0.33);
SegmentWithRange segment1 = new SegmentWithRange(new Segment(scope, stream, 1), 0.33, 0.66);
SegmentWithRange segment2 = new SegmentWithRange(new Segment(scope, stream, 2), 0.66, 1.0);
MockController controller = new MockControllerWithSuccessors(endpoint.getEndpoint(), endpoint.getPort(),
connectionFactory, new StreamSegmentsWithPredecessors(ImmutableMap.of(), ""));
createScopeAndStream(scope, stream, controller);
MockSegmentStreamFactory streamFactory = new MockSegmentStreamFactory();
@Cleanup
SynchronizerClientFactory clientFactory = new ClientFactoryImpl(scope, controller, connectionFactory, streamFactory,
streamFactory, streamFactory, streamFactory);
SynchronizerConfig config = SynchronizerConfig.builder().build();
@Cleanup
StateSynchronizer<ReaderGroupState> stateSynchronizer = createState(stream, clientFactory, config);
AtomicLong clock = new AtomicLong();
Map<SegmentWithRange, Long> segments = ImmutableMap.of(segment0, 0L, segment1, 1L, segment2, 2L);
stateSynchronizer.initialize(new ReaderGroupState.ReaderGroupStateInit(ReaderGroupConfig.builder().stream(Stream.of(scope, stream)).build(),
segments, Collections.emptyMap()));
val readerState1 = new ReaderGroupStateManager("reader1", stateSynchronizer, controller, clock::get);
readerState1.initializeReader(0);
val readerState2 = new ReaderGroupStateManager("reader2", stateSynchronizer, controller, clock::get);
readerState2.initializeReader(0);
assertEquals(segments, stateSynchronizer.getState().getUnassignedSegments());
stateSynchronizer.updateStateUnconditionally(new CreateCheckpoint("CP1"));
stateSynchronizer.fetchUpdates();
assertEquals("CP1", readerState1.getCheckpoint());
assertEquals(Collections.emptyMap(), readerState1.acquireNewSegmentsIfNeeded(1, new PositionImpl(Collections.emptyMap())));
assertEquals(Collections.emptyMap(), readerState2.acquireNewSegmentsIfNeeded(2, new PositionImpl(Collections.emptyMap())));
assertEquals("CP1", readerState2.getCheckpoint());
readerState1.checkpoint("CP1", new PositionImpl(Collections.emptyMap()));
readerState2.checkpoint("CP1", new PositionImpl(Collections.emptyMap()));
assertEquals(ImmutableMap.of(segment0.getSegment(), 0L, segment1.getSegment(), 1L, segment2.getSegment(), 2L), stateSynchronizer.getState().getPositionsForCompletedCheckpoint("CP1"));
Map<SegmentWithRange, Long> segments1 = readerState1.acquireNewSegmentsIfNeeded(1, new PositionImpl(Collections.emptyMap()));
Map<SegmentWithRange, Long> segments2 = readerState2.acquireNewSegmentsIfNeeded(2, new PositionImpl(Collections.emptyMap()));
assertFalse(segments1.isEmpty());
assertFalse(segments2.isEmpty());
assertEquals(0, stateSynchronizer.getState().getNumberOfUnassignedSegments());
//Induce imbalance
for (Entry<SegmentWithRange, Long> entry : segments1.entrySet()) {
stateSynchronizer.updateStateUnconditionally(new ReaderGroupState.ReleaseSegment("reader1", entry.getKey().getSegment(), entry.getValue()));
stateSynchronizer.updateStateUnconditionally(new ReaderGroupState.AcquireSegment("reader2", entry.getKey().getSegment()));
}
stateSynchronizer.updateStateUnconditionally(new CreateCheckpoint("CP2"));
stateSynchronizer.fetchUpdates();
clock.addAndGet(ReaderGroupStateManager.UPDATE_WINDOW.toNanos());
assertNull(readerState1.findSegmentToReleaseIfRequired());
assertNull(readerState2.findSegmentToReleaseIfRequired());
clock.addAndGet(ReaderGroupStateManager.UPDATE_WINDOW.toNanos());
assertFalse(readerState2.releaseSegment(segments2.keySet().iterator().next().getSegment(), 20, 2, new PositionImpl(segments)));
clock.addAndGet(ReaderGroupStateManager.UPDATE_WINDOW.toNanos());
readerState1.checkpoint("CP2", new PositionImpl(Collections.emptyMap()));
readerState2.checkpoint("CP2", new PositionImpl(segments));
assertEquals(ImmutableMap.of(segment0.getSegment(), 0L, segment1.getSegment(), 1L, segment2.getSegment(), 2L), stateSynchronizer.getState().getPositionsForCompletedCheckpoint("CP2"));
Segment toRelease = readerState2.findSegmentToReleaseIfRequired();
assertNotNull(toRelease);
assertTrue(readerState2.releaseSegment(toRelease, 10, 1, new PositionImpl(segments)));
assertEquals(1, stateSynchronizer.getState().getNumberOfUnassignedSegments());
}
public void testThreads() throws Exception {
double targetMBPerSec = 10.0 + 20 * random().nextDouble();
final SimpleRateLimiter limiter = new SimpleRateLimiter(targetMBPerSec);
final CountDownLatch startingGun = new CountDownLatch(1);
Thread[] threads = new Thread[TestUtil.nextInt(random(), 3, 6)];
final AtomicLong totBytes = new AtomicLong();
for(int i=0;i<threads.length;i++) {
threads[i] = new Thread() {
@Override
public void run() {
try {
startingGun.await();
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
long bytesSinceLastPause = 0;
for(int i=0;i<500;i++) {
long numBytes = TestUtil.nextInt(random(), 1000, 10000);
totBytes.addAndGet(numBytes);
bytesSinceLastPause += numBytes;
if (bytesSinceLastPause > limiter.getMinPauseCheckBytes()) {
limiter.pause(bytesSinceLastPause);
bytesSinceLastPause = 0;
}
}
}
};
threads[i].start();
}
long startNS = System.nanoTime();
startingGun.countDown();
for(Thread thread : threads) {
thread.join();
}
long endNS = System.nanoTime();
double actualMBPerSec = (totBytes.get()/1024/1024.)/((endNS-startNS)/1000000000.0);
// TODO: this may false trip .... could be we can only assert that it never exceeds the max, so slow jenkins doesn't trip:
double ratio = actualMBPerSec/targetMBPerSec;
// Only enforce that it wasn't too fast; if machine is bogged down (can't schedule threads / sleep properly) then it may falsely be too slow:
assumeTrue("actualMBPerSec=" + actualMBPerSec + " targetMBPerSec=" + targetMBPerSec, 0.9 <= ratio);
assertTrue("targetMBPerSec=" + targetMBPerSec + " actualMBPerSec=" + actualMBPerSec, ratio <= 1.1);
}
public ContentBlock(final ContentClaim claim, final AtomicLong repoSizeCounter) {
this.claim = claim;
this.repoSizeCounter = repoSizeCounter;
out = new ClaimSwitchingOutputStream(new ArrayManagedOutputStream(memoryManager) {
@Override
public void write(int b) throws IOException {
try {
final long bufferLengthBefore = getBufferLength();
super.write(b);
final long bufferLengthAfter = getBufferLength();
final long bufferSpaceAdded = bufferLengthAfter - bufferLengthBefore;
if (bufferSpaceAdded > 0) {
repoSizeCounter.addAndGet(bufferSpaceAdded);
}
} catch (final IOException e) {
final byte[] buff = new byte[1];
buff[0] = (byte) (b & 0xFF);
redirect(buff, 0, 1);
}
}
@Override
public void write(byte[] b) throws IOException {
write(b, 0, b.length);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
try {
final long bufferLengthBefore = getBufferLength();
super.write(b, off, len);
final long bufferLengthAfter = getBufferLength();
final long bufferSpaceAdded = bufferLengthAfter - bufferLengthBefore;
if (bufferSpaceAdded > 0) {
repoSizeCounter.addAndGet(bufferSpaceAdded);
}
} catch (final IOException e) {
redirect(b, off, len);
}
}
private void redirect(byte[] b, int off, int len) throws IOException {
logger.debug("Redirecting {}", claim);
out.redirect();
out.write(b, off, len);
}
});
}
@Test(timeout = 10000)
public void testRemoveReaderWithNullPosition() throws ReaderNotInReaderGroupException {
String scope = "scope";
String stream = "stream";
SynchronizerConfig synchronizerConfig = SynchronizerConfig.builder().build();
PravegaNodeUri endpoint = new PravegaNodeUri("localhost", SERVICE_PORT);
AtomicLong clock = new AtomicLong();
SegmentWithRange segment0 = new SegmentWithRange(new Segment(scope, stream, 0), 0, 0.5);
SegmentWithRange segment1 = new SegmentWithRange(new Segment(scope, stream, 1), 0.5, 1.0);
Map<SegmentWithRange, Long> segmentMap = ImmutableMap.<SegmentWithRange, Long>builder()
.put(segment0, 123L)
.put(segment1, 456L)
.build();
ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder().stream(Stream.of(scope, stream)).build();
// Setup mocks
MockConnectionFactoryImpl connectionFactory = new MockConnectionFactoryImpl();
MockController controller = new MockController(endpoint.getEndpoint(), endpoint.getPort(), connectionFactory, false);
createScopeAndStream(scope, stream, controller);
MockSegmentStreamFactory streamFactory = new MockSegmentStreamFactory();
@Cleanup
SynchronizerClientFactory clientFactory = new ClientFactoryImpl(scope, controller, connectionFactory, streamFactory, streamFactory, streamFactory, streamFactory);
// Create Reader Group State corresponding to testReader1.
@Cleanup
StateSynchronizer<ReaderGroupState> stateSynchronizer1 = createState(stream, clientFactory, synchronizerConfig);
stateSynchronizer1.initialize(new ReaderGroupState.ReaderGroupStateInit(readerGroupConfig, segmentMap, Collections.emptyMap()));
ReaderGroupStateManager readerState1 = new ReaderGroupStateManager("testReader1",
stateSynchronizer1,
controller,
clock::get);
readerState1.initializeReader(0); // Initialize readerState1 from stateSynchronizer1
// Validations.
assertNull(readerState1.findSegmentToReleaseIfRequired()); // No segments to release.
// Acquire Segments and update StateSynchronizer stream.
Map<SegmentWithRange, Long> newSegments = readerState1.acquireNewSegmentsIfNeeded(0, new PositionImpl(Collections.emptyMap()));
assertFalse(newSegments.isEmpty());
assertEquals(2, newSegments.size()); // Verify testReader1 has acquired the segments.
// Create ReaderGroupState corresponding to testReader2
@Cleanup
StateSynchronizer<ReaderGroupState> stateSynchronizer2 = createState(stream, clientFactory, synchronizerConfig);
ReaderGroupStateManager readerState2 = new ReaderGroupStateManager("testReader2",
stateSynchronizer2,
controller,
clock::get);
readerState2.initializeReader(0); // Initialize readerState2 from stateSynchronizer2.
// Try acquiring segments for testReader2.
newSegments = readerState2.acquireNewSegmentsIfNeeded(0, new PositionImpl(segmentMap));
assertTrue(newSegments.isEmpty()); // No new segments are acquired since testReader1 already owns it and release timer did not complete.
// Trigger testReader1 shutdown.
ReaderGroupStateManager.readerShutdown("testReader1", null, stateSynchronizer1);
// Advance clock by ReaderGroup refresh time.
clock.addAndGet(TimeUnit.MILLISECONDS.toNanos(readerGroupConfig.getGroupRefreshTimeMillis()));
// Try acquiring segments for testReader2, we should acquire the segments owned by testReader1.
newSegments = readerState2.acquireNewSegmentsIfNeeded(0, new PositionImpl(Collections.emptyMap()));
assertFalse(newSegments.isEmpty());
assertEquals(2, newSegments.size());
}
@Test(timeout = 5000)
public void testReleaseAndAcquireTimes() throws ReaderNotInReaderGroupException {
String scope = "scope";
String stream = "stream";
PravegaNodeUri endpoint = new PravegaNodeUri("localhost", SERVICE_PORT);
MockConnectionFactoryImpl connectionFactory = new MockConnectionFactoryImpl();
MockController controller = new MockController(endpoint.getEndpoint(), endpoint.getPort(), connectionFactory, false);
createScopeAndStream(scope, stream, controller);
MockSegmentStreamFactory streamFactory = new MockSegmentStreamFactory();
@Cleanup
SynchronizerClientFactory clientFactory = new ClientFactoryImpl(scope, controller, connectionFactory, streamFactory, streamFactory, streamFactory, streamFactory);
SynchronizerConfig config = SynchronizerConfig.builder().build();
@Cleanup
StateSynchronizer<ReaderGroupState> state = createState(stream, clientFactory, config);
AtomicLong clock = new AtomicLong();
Map<SegmentWithRange, Long> segments = new HashMap<>();
segments.put(new SegmentWithRange(new Segment(scope, stream, 0), 0.0, 0.25), 0L);
segments.put(new SegmentWithRange(new Segment(scope, stream, 1), 0.25, 0.5), 1L);
segments.put(new SegmentWithRange(new Segment(scope, stream, 2), 0.5, 0.75), 2L);
segments.put(new SegmentWithRange(new Segment(scope, stream, 3), 0.65, 1.0), 3L);
state.initialize(new ReaderGroupState.ReaderGroupStateInit(ReaderGroupConfig.builder().stream(Stream.of(scope, stream)).build(),
segments, Collections.emptyMap()));
ReaderGroupStateManager reader1 = new ReaderGroupStateManager("reader1", state, controller, clock::get);
reader1.initializeReader(100);
ReaderGroupStateManager reader2 = new ReaderGroupStateManager("reader2", state, controller, clock::get);
reader2.initializeReader(100);
Map<SegmentWithRange, Long> newSegments = reader1.acquireNewSegmentsIfNeeded(123, new PositionImpl(Collections.emptyMap()));
assertEquals(0, newSegments.size());
newSegments = reader2.acquireNewSegmentsIfNeeded(123, new PositionImpl(Collections.emptyMap()));
assertEquals(0, newSegments.size());
clock.addAndGet(ReaderGroupStateManager.UPDATE_WINDOW.toNanos());
newSegments = reader1.acquireNewSegmentsIfNeeded(123, new PositionImpl(Collections.emptyMap()));
assertEquals(2, newSegments.size());
Duration r1aqt = ReaderGroupStateManager.calculateAcquireTime("reader1", state.getState());
Duration r2aqt = ReaderGroupStateManager.calculateAcquireTime("reader2", state.getState());
assertTrue(r1aqt.toMillis() > r2aqt.toMillis());
Duration r1rlt = ReaderGroupStateManager.calculateReleaseTime("reader1", state.getState());
Duration r2rlt = ReaderGroupStateManager.calculateReleaseTime("reader2", state.getState());
assertTrue(r1rlt.toMillis() < r2rlt.toMillis());
reader1.releaseSegment(newSegments.keySet().iterator().next().getSegment(), 0, 123, new PositionImpl(Collections.emptyMap()));
newSegments = reader2.acquireNewSegmentsIfNeeded(123, new PositionImpl(Collections.emptyMap()));
assertEquals(2, newSegments.size());
r1aqt = ReaderGroupStateManager.calculateAcquireTime("reader1", state.getState());
r2aqt = ReaderGroupStateManager.calculateAcquireTime("reader2", state.getState());
assertTrue(r1aqt.toMillis() < r2aqt.toMillis());
r1rlt = ReaderGroupStateManager.calculateReleaseTime("reader1", state.getState());
r2rlt = ReaderGroupStateManager.calculateReleaseTime("reader2", state.getState());
assertTrue(r1rlt.toMillis() > r2rlt.toMillis());
}
@Issue("#2094")
@Test
void functionCounter() {
AtomicLong count = new AtomicLong();
MockClock clock = new MockClock();
AtlasMeterRegistry registry = new AtlasMeterRegistry(new AtlasConfig() {
@Nullable
@Override
public String get(String k) {
return null;
}
@Override
public Duration step() {
return Duration.ofMinutes(1);
}
@Override
public Duration lwcStep() {
return step();
}
}, clock);
FunctionCounter.builder("test", count, AtomicLong::doubleValue).register(registry);
Supplier<Double> valueSupplier = () -> {
AtlasRegistry r = (AtlasRegistry) registry.getSpectatorRegistry();
PolledMeter.update(r);
clock.add(Duration.ofMinutes(1));
return r.measurements()
.filter(m -> m.id().name().equals("test"))
.findFirst()
.map(Measurement::value)
.orElse(Double.NaN);
};
count.addAndGet(60);
assertThat(valueSupplier.get()).isEqualTo(1.0);
count.addAndGet(120);
assertThat(valueSupplier.get()).isEqualTo(2.0);
count.addAndGet(90);
assertThat(valueSupplier.get()).isEqualTo(1.5);
}
@Test(timeout = 30000)
public void testOnlyOneOutstanding() throws ReinitializationRequiredException, DurableDataLogException {
String endpoint = "localhost";
String streamName = "abc";
String readerGroup = "group";
int port = TestUtils.getAvailableListenPort();
String testString = "Hello world: ";
String scope = "Scope1";
@Cleanup
ServiceBuilder serviceBuilder = ServiceBuilder.newInMemoryBuilder(ServiceBuilderConfig.getDefaultConfig());
serviceBuilder.initialize();
StreamSegmentStore store = serviceBuilder.createStreamSegmentService();
@Cleanup
PravegaConnectionListener server = new PravegaConnectionListener(false, port, store, mock(TableStore.class),
serviceBuilder.getLowPriorityExecutor());
server.startListening();
@Cleanup
MockStreamManager streamManager = new MockStreamManager(scope, endpoint, port);
MockClientFactory clientFactory = streamManager.getClientFactory();
ReaderGroupConfig groupConfig = ReaderGroupConfig.builder()
.automaticCheckpointIntervalMillis(1000)
.stream(Stream.of(scope, streamName))
.build();
streamManager.createScope(scope);
streamManager.createStream(scope, streamName, null);
streamManager.createReaderGroup(readerGroup, groupConfig);
JavaSerializer<String> serializer = new JavaSerializer<>();
populateEvents(streamName, testString, clientFactory, serializer);
AtomicLong fakeClock = new AtomicLong(0);
@Cleanup
EventStreamReader<String> reader1 = clientFactory.createReader("reader1", readerGroup, serializer,
ReaderConfig.builder().build(),
() -> fakeClock.get(),
() -> fakeClock.get() / NANOS_PER_SECOND);
@Cleanup
EventStreamReader<String> reader2 = clientFactory.createReader("reader2", readerGroup, serializer,
ReaderConfig.builder().build(),
() -> fakeClock.get(),
() -> fakeClock.get() / NANOS_PER_SECOND);
int numRead = 0;
int checkpointCount = 0;
while (numRead < 100) {
fakeClock.addAndGet(NANOS_PER_SECOND);
EventRead<String> event = reader1.readNextEvent(1000);
if (event.isCheckpoint()) {
checkpointCount++;
} else {
String message = event.getEvent();
assertEquals(testString + numRead, message);
numRead++;
}
}
assertEquals("As there is a second reader that does not pass the checkpoint, only one should occur", 1,
checkpointCount);
}