下面列出了java.util.concurrent.LinkedBlockingQueue#poll ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Closes this OutputView, closing the underlying writer and returning all memory segments.
*
* @return A list containing all memory segments originally supplied to this view.
* @throws IOException Thrown, if the underlying writer could not be properly closed.
*/
public List<MemorySegment> close() throws IOException
{
// send off set last segment
writeSegment(getCurrentSegment(), getCurrentPositionInSegment(), true);
clear();
// close the writer and gather all segments
final LinkedBlockingQueue<MemorySegment> queue = this.writer.getReturnQueue();
this.writer.close();
// re-collect all memory segments
ArrayList<MemorySegment> list = new ArrayList<MemorySegment>(this.numSegments);
for (int i = 0; i < this.numSegments; i++) {
final MemorySegment m = queue.poll();
if (m == null) {
// we get null if the queue is empty. that should not be the case if the reader was properly closed.
throw new RuntimeException("ChannelWriterOutputView: MemorySegments have been taken from return queue by different actor.");
}
list.add(m);
}
return list;
}
/**
* Closes this OutputView, closing the underlying writer and returning all memory segments.
*
* @return A list containing all memory segments originally supplied to this view.
* @throws IOException Thrown, if the underlying writer could not be properly closed.
*/
public List<MemorySegment> close() throws IOException
{
// send off set last segment
writeSegment(getCurrentSegment(), getCurrentPositionInSegment(), true);
clear();
// close the writer and gather all segments
final LinkedBlockingQueue<MemorySegment> queue = this.writer.getReturnQueue();
this.writer.close();
// re-collect all memory segments
ArrayList<MemorySegment> list = new ArrayList<MemorySegment>(this.numSegments);
for (int i = 0; i < this.numSegments; i++) {
final MemorySegment m = queue.poll();
if (m == null) {
// we get null if the queue is empty. that should not be the case if the reader was properly closed.
throw new RuntimeException("ChannelWriterOutputView: MemorySegments have been taken from return queue by different actor.");
}
list.add(m);
}
return list;
}
/**
* Closes this InputView, closing the underlying reader and returning all memory segments.
*
* @return A list containing all memory segments originally supplied to this view.
* @throws IOException Thrown, if the underlying reader could not be properly closed.
*/
public List<MemorySegment> close() throws IOException {
if (this.closed) {
throw new IllegalStateException("Already closed.");
}
this.closed = true;
// re-collect all memory segments
ArrayList<MemorySegment> list = this.freeMem;
final MemorySegment current = getCurrentSegment();
if (current != null) {
list.add(current);
}
clear();
// close the writer and gather all segments
final LinkedBlockingQueue<MemorySegment> queue = this.reader.getReturnQueue();
this.reader.close();
while (list.size() < this.numSegments) {
final MemorySegment m = queue.poll();
if (m == null) {
// we get null if the queue is empty. that should not be the case if the reader was properly closed.
throw new RuntimeException("Bug in ChannelReaderInputView: MemorySegments lost.");
}
list.add(m);
}
return list;
}
/**
* Closes this InputView, closing the underlying reader and returning all memory segments.
*
* @return A list containing all memory segments originally supplied to this view.
* @throws IOException Thrown, if the underlying reader could not be properly closed.
*/
@Override
public List<MemorySegment> close() throws IOException {
if (this.closed) {
throw new IllegalStateException("Already closed.");
}
this.closed = true;
// re-collect all memory segments
ArrayList<MemorySegment> list = this.freeMem;
final MemorySegment current = getCurrentSegment();
if (current != null) {
list.add(current);
}
clear();
// close the writer and gather all segments
final LinkedBlockingQueue<MemorySegment> queue = this.reader.getReturnQueue();
this.reader.close();
while (list.size() < this.numSegments) {
final MemorySegment m = queue.poll();
if (m == null) {
// we get null if the queue is empty. that should not be the case if the reader was properly closed.
throw new RuntimeException("Bug in ChannelReaderInputView: MemorySegments lost.");
}
list.add(m);
}
return list;
}
public JSONObject readReply(int id)
{
LinkedBlockingQueue<JSONObject> q = getQueue(id);
try
{
return q.poll(REPLY_TIMEOUT_SEC, TimeUnit.SECONDS);
}
catch(InterruptedException e){throw new RuntimeException(e);}
}
/**
* contains(x) reports true when elements added but not yet removed
*/
public void testContains() {
LinkedBlockingQueue q = populatedQueue(SIZE);
for (int i = 0; i < SIZE; ++i) {
assertTrue(q.contains(new Integer(i)));
q.poll();
assertFalse(q.contains(new Integer(i)));
}
}
@Test
public void verifyConcurrentBacklogConsumption() throws Exception {
final LinkedBlockingQueue<Request> t = new LinkedBlockingQueue<>();
topics.put("t", t);
publisher = Publisher.builder()
.project("test")
.pubsub(pubsub)
.listener(listener)
.concurrency(2)
.batchSize(2)
.queueSize(100)
.build();
// Saturate concurrency with two messages
final Message m0a = Message.of("0a");
final CompletableFuture<String> f0a = publisher.publish("t", m0a);
final Request r0a = t.take();
final Message m0b = Message.of("0b");
final CompletableFuture<String> f0b = publisher.publish("t", m0b);
final Request r0b = t.take();
// Enqueue enough for at least two more batches
final List<Message> m1 = range(0, 4).mapToObj(String::valueOf).map(Message::of).collect(toList());
final List<CompletableFuture<String>> f1 = m1.stream().map(m -> publisher.publish("t", m)).collect(toList());
// Complete the first two requests
r0a.future.succeed(singletonList("0a"));
r0b.future.succeed(singletonList("0b"));
// Verify that two batches kicked off concurrently and that we got all four messages in the two batches
final Request r1a = t.poll(30, SECONDS);
final Request r1b = t.poll(30, SECONDS);
assertThat(r1a, is(notNullValue()));
assertThat(r1b, is(notNullValue()));
final Set<Message> r1received = ImmutableSet.copyOf(concat(r1a.messages, r1b.messages));
assertThat(r1received, is(ImmutableSet.copyOf(m1)));
}
/**
* Closes this InputView, closing the underlying reader and returning all memory segments.
*
* @return A list containing all memory segments originally supplied to this view.
* @throws IOException Thrown, if the underlying reader could not be properly closed.
*/
@Override
public List<MemorySegment> close() throws IOException {
if (this.closed) {
throw new IllegalStateException("Already closed.");
}
this.closed = true;
// re-collect all memory segments
ArrayList<MemorySegment> list = this.freeMem;
final MemorySegment current = getCurrentSegment();
if (current != null) {
list.add(current);
}
clear();
// close the writer and gather all segments
final LinkedBlockingQueue<MemorySegment> queue = this.reader.getReturnQueue();
this.reader.close();
while (list.size() < this.numSegments) {
final MemorySegment m = queue.poll();
if (m == null) {
// we get null if the queue is empty. that should not be the case if the reader was properly closed.
throw new RuntimeException("Bug in ChannelReaderInputView: MemorySegments lost.");
}
list.add(m);
}
return list;
}
private void assertContainsEventWithMetadata(Int128 expectedEventId, String expectedMetadata, LinkedBlockingQueue<SerializedEvent> events) throws InterruptedException {
long now = System.currentTimeMillis();
long deadline = now + 10 * 1000;
while (System.currentTimeMillis() < deadline) {
SerializedEvent event = events.poll(100, TimeUnit.MILLISECONDS);
if (event != null && event.getId().equals(expectedEventId)) {
assertEquals(Optional.of(expectedMetadata), event.getMetadata());
return;
}
}
fail("could not find");
}
public static InetSocketAddress getNameNodeAddress() {
StringTokenizer tupleTokenizer = new StringTokenizer(CrailConstants.NAMENODE_ADDRESS, ",");
LinkedBlockingQueue<URI> namenodes = new LinkedBlockingQueue<URI>();
while(tupleTokenizer.hasMoreTokens()){
String address = tupleTokenizer.nextToken();
URI uri = URI.create(address);
namenodes.add(uri);
}
URI master = namenodes.poll();
InetSocketAddress nnAddr = createSocketAddrForHost(master.getHost(), master.getPort());
return nnAddr;
}
/**
* Closes this InputView, closing the underlying reader and returning all memory segments.
*
* @return A list containing all memory segments originally supplied to this view.
* @throws IOException Thrown, if the underlying reader could not be properly closed.
*/
public List<MemorySegment> close() throws IOException
{
if (this.closed) {
throw new IllegalStateException("Already closed.");
}
this.closed = true;
// re-collect all memory segments
ArrayList<MemorySegment> list = this.freeMem;
final MemorySegment current = getCurrentSegment();
if (current != null) {
list.add(current);
}
clear();
// close the writer and gather all segments
final LinkedBlockingQueue<MemorySegment> queue = this.reader.getReturnQueue();
this.reader.close();
while (list.size() < this.numSegments) {
final MemorySegment m = queue.poll();
if (m == null) {
// we get null if the queue is empty. that should not be the case if the reader was properly closed.
throw new RuntimeException("Bug in ChannelReaderInputView: MemorySegments lost.");
}
list.add(m);
}
return list;
}
protected String pollForNextLine(LinkedBlockingQueue<String> queue) {
String line = null;
try {
do {
line = queue.poll(250, TimeUnit.MILLISECONDS);
} while (line == null && keepPolling);
} catch (InterruptedException ie) {
Thread.interrupted();
this.keepPolling = false;
}
return line;
}
public ByteBuffer acquireBuffer(int bufferSize)
{
int normalizedBufferSize = initialBufferSize;
while (normalizedBufferSize < bufferSize) {
normalizedBufferSize *= 2;
}
LinkedBlockingQueue<ByteBuffer> buffers;
synchronized (bufferPool) {
buffers = bufferPool.get(normalizedBufferSize);
if (buffers == null) {
buffers = new LinkedBlockingQueue<>();
bufferPool.put(normalizedBufferSize, buffers);
}
}
ByteBuffer buffer = buffers.poll();
if (buffer != null) {
return buffer;
}
/*
synchronized (allocatedSize) {
if (allocatedSize.get() + normalizedBufferSize > maxBufferSize) {
return null; // `null` means the buffer is full.
}
allocatedSize.addAndGet(normalizedBufferSize);
return ByteBuffer.allocateDirect(normalizedBufferSize);
}
*/
while (true) {
long currentAllocatedSize = allocatedSize.get();
if (currentAllocatedSize + normalizedBufferSize > maxBufferSize) {
releaseBuffers();
return null; // `null` means the buffer is full.
}
if (currentAllocatedSize == allocatedSize.getAndAdd(normalizedBufferSize)) {
ByteBuffer buf;
if (jvmHeapBufferMode) {
buf = ByteBuffer.allocate(normalizedBufferSize);
}
else {
buf = ByteBuffer.allocateDirect(normalizedBufferSize);
}
return buf;
}
allocatedSize.getAndAdd(-normalizedBufferSize);
}
}
@Test(timeout = TEST_TIMEOUT)
public void deregisterNode() throws Exception {
LinkedBlockingQueue<String> nodeAddedQueue = new LinkedBlockingQueue<>();
LinkedBlockingQueue<String> nodeRemovedQueue = new LinkedBlockingQueue<>();
LinkedBlockingQueue<Exception> exceptionsQueue = new LinkedBlockingQueue<>();
CuratorFramework client2 = CuratorFrameworkFactory.builder()
.connectString(zkUrl)
.retryPolicy(new ExponentialBackoffRetry(
RETRY_SLEEP_MS, MAX_RETRY))
.namespace(CLUSTER_NAME_2)
.build();
@Cleanup
Cluster clusterListener = new ClusterZKImpl(client2, ClusterType.HOST);
clusterListener.addListener((eventType, host) -> {
switch (eventType) {
case HOST_ADDED:
nodeAddedQueue.offer(host.getIpAddr());
break;
case HOST_REMOVED:
nodeRemovedQueue.offer(host.getIpAddr());
break;
case ERROR:
exceptionsQueue.offer(new RuntimeException("Encountered error"));
break;
default:
exceptionsQueue.offer(new RuntimeException("Unhandled case"));
break;
}
});
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(zkUrl)
.retryPolicy(new ExponentialBackoffRetry(
RETRY_SLEEP_MS, MAX_RETRY))
.namespace(CLUSTER_NAME_2)
.build();
//Create Add a node to the cluster.
@Cleanup
Cluster clusterZKInstance1 = new ClusterZKImpl(client, ClusterType.HOST);
clusterZKInstance1.registerHost(new Host(HOST_1, PORT, null));
assertEquals(HOST_1, nodeAddedQueue.poll(5, TimeUnit.SECONDS));
clusterZKInstance1.deregisterHost(new Host(HOST_1, PORT, null));
assertEquals(HOST_1, nodeRemovedQueue.poll(5, TimeUnit.SECONDS));
Exception exception = exceptionsQueue.poll();
if (exception != null) {
throw exception;
}
}
/**
* 准备启动指定插件组件
*
* @param mContext 主工程Context
* @param mConnection bindService时需要的ServiceConnection,如果不是bindService的方式启动组件,传入Null
* @param mIntent 需要启动组件的Intent
* @param needAddCache 是否需要缓存Intnet,true:如果插件没有初始化,那么会缓存起来,等插件加载完毕再执行此Intent
* false:如果插件没初始化,则直接抛弃此Intent
*/
public static boolean readyToStartSpecifyPlugin(Context mContext,
ServiceConnection mConnection,
Intent mIntent,
boolean needAddCache) {
PluginDebugLog.runtimeLog(TAG, "readyToStartSpecifyPlugin launchIntent: " + mIntent);
String packageName = tryParsePkgName(mContext, mIntent);
PluginLoadedApk mLoadedApk = getPluginLoadedApkByPkgName(packageName);
if (mLoadedApk == null) {
deliver(mContext, false, packageName, ErrorType.ERROR_PLUGIN_NOT_LOADED, "pluginLoadedApk not ready");
PluginDebugLog.runtimeLog(TAG, packageName + "readyToStartSpecifyPlugin launchIntent exception, plugin loaded apk not exist");
PActivityStackSupervisor.clearLoadingIntent(packageName);
return false;
}
LinkedBlockingQueue<IntentRequest> cacheIntents =
PActivityStackSupervisor.getCachedIntent(packageName);
if (cacheIntents == null) {
cacheIntents = new LinkedBlockingQueue<IntentRequest>();
PActivityStackSupervisor.addCachedIntent(packageName, cacheIntents);
}
// 避免重复添加Intent请求到队列中,尤其是第一次初始化时在enterProxy中已经添加了一次
IntentRequest request = new IntentRequest(mIntent, mConnection);
if (!cacheIntents.contains(request) && needAddCache) {
PluginDebugLog.runtimeLog(TAG, "readyToStartSpecifyPlugin launchIntent add to cacheIntent....");
cacheIntents.offer(request); // 添加到队列
} else {
PluginDebugLog.runtimeLog(TAG, "readyToStartSpecifyPlugin launchIntent no need add to cacheIntent....needAddCache:" + needAddCache);
}
PluginDebugLog.runtimeLog(TAG, "readyToStartSpecifyPlugin launchIntent_cacheIntents: " + cacheIntents);
if (!mLoadedApk.hasLaunchIngIntent()) {
IntentRequest firstRequest = cacheIntents.poll(); //处理队首的Intent
if (firstRequest != null && firstRequest.getIntent() != null) {
PluginDebugLog.runtimeFormatLog(TAG, "readyToStartSpecifyPlugin, no launching intent for pkgName: %s, " +
"ready to process first intent in queue!", packageName);
doRealLaunch(mContext, mLoadedApk, firstRequest.getIntent(), firstRequest.getServiceConnection());
}
} else {
PluginDebugLog.runtimeFormatLog(TAG, "readyToStartSpecifyPlugin, has launching intent for pkgName %s " +
"waiting other intent process over", packageName);
}
return true;
}
void createFile(String filename, int loop) throws Exception, InterruptedException {
System.out.println("createFile, filename " + filename + ", loop " + loop);
//warmup
ConcurrentLinkedQueue<CrailBuffer> bufferQueue = new ConcurrentLinkedQueue<CrailBuffer>();
CrailBuffer buf = fs.allocateBuffer();
bufferQueue.add(buf);
warmUp(filename, warmup, bufferQueue);
fs.freeBuffer(buf);
//benchmark
System.out.println("starting benchmark...");
fs.getStatistics().reset();
LinkedBlockingQueue<String> pathQueue = new LinkedBlockingQueue<String>();
fs.create(filename, CrailNodeType.DIRECTORY, CrailStorageClass.DEFAULT, CrailLocationClass.DEFAULT, true).get().syncDir();
int filecounter = 0;
for (int i = 0; i < loop; i++){
String name = "" + filecounter++;
String f = filename + "/" + name;
pathQueue.add(f);
}
double ops = 0;
long start = System.currentTimeMillis();
while(!pathQueue.isEmpty()){
String path = pathQueue.poll();
fs.create(path, CrailNodeType.DATAFILE, CrailStorageClass.DEFAULT, CrailLocationClass.DEFAULT, true).get().syncDir();
}
long end = System.currentTimeMillis();
double executionTime = ((double) (end - start)) / 1000.0;
double latency = 0.0;
if (executionTime > 0) {
latency = 1000000.0 * executionTime / ops;
}
System.out.println("execution time " + executionTime);
System.out.println("ops " + ops);
System.out.println("latency " + latency);
fs.getStatistics().print("close");
}
@Parameters({"broker-port", "admin-username", "admin-password", "broker-hostname"})
@Test
public void testMultipleTopicSubscribersOnSameSession(String port,
String adminUsername,
String adminPassword,
String brokerHostname)
throws NamingException, JMSException, InterruptedException {
String queueName = "testMultipleTopicSubscribersOnSameSession";
InitialContext initialContext = ClientHelper
.getInitialContextBuilder(adminUsername, adminPassword, brokerHostname, port)
.withTopic(queueName)
.build();
TopicConnectionFactory connectionFactory
= (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
TopicSession subscriberSession = connection.createTopicSession(false, TopicSession.CLIENT_ACKNOWLEDGE);
Topic topic = (Topic) initialContext.lookup(queueName);
int numberOfConsumers = 3;
int messagesPerConsumer = 1000;
int maxNumberOfMessages = numberOfConsumers * messagesPerConsumer;
LinkedBlockingQueue<MessageResult> receiveQueue = new LinkedBlockingQueue<>(maxNumberOfMessages);
TopicSubscriber consumers[] = new TopicSubscriber[numberOfConsumers];
int messageCount[] = new int[numberOfConsumers];
for (int consumerIndex = 0; consumerIndex < numberOfConsumers; consumerIndex++) {
consumers[consumerIndex] = subscriberSession.createSubscriber(topic);
int finalConsumerIndex = consumerIndex;
consumers[consumerIndex].setMessageListener(message -> {
messageCount[finalConsumerIndex]++;
try {
message.acknowledge();
} catch (JMSException e) {
LOGGER.error("Message acknowledging failed.", e);
}
receiveQueue.offer(new MessageResult(message, finalConsumerIndex));
});
}
// publish messages with property.
TopicSession producerSession = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
TopicPublisher producer = producerSession.createPublisher(topic);
TextMessage textMessage;
String consumerMessage = "testMessage";
for (int i = 0; i < messagesPerConsumer; i++) {
textMessage = producerSession.createTextMessage(consumerMessage);
producer.send(textMessage);
}
for (int i = 0; i < maxNumberOfMessages; i++) {
MessageResult result = receiveQueue.poll(5, TimeUnit.SECONDS);
if (result == null) {
StringBuilder countSummary = new StringBuilder();
for (int consumerIndex = 0; consumerIndex < numberOfConsumers; consumerIndex++) {
countSummary.append("Consumer ")
.append(consumerIndex)
.append(" received ")
.append(messageCount[consumerIndex])
.append(" messages, ");
}
Assert.fail("Messages stopped receiving after " + i + " iterations. " + countSummary.toString());
} else {
TextMessage textMessage1 = (TextMessage) result.getMessage();
Assert.assertEquals(textMessage1.getText(),
consumerMessage,
"Incorrect message received for consumer " + result.getConsumerId());
}
}
for (int consumerIndex = 0; consumerIndex < numberOfConsumers; consumerIndex++) {
Assert.assertEquals(messageCount[consumerIndex],
messagesPerConsumer,
"Message " + messageCount[consumerIndex]
+ " received for consumer " + consumerIndex + ".");
}
producer.close();
for (int consumerIndex = 0; consumerIndex < numberOfConsumers; consumerIndex++) {
consumers[consumerIndex].close();
}
connection.close();
}
/**
* Multithreaded version of {@link #resultant()}.
*
* @return <code>(rho, res)</code> satisfying <code>res = rho*this + t*(x^n-1)</code> for some integer <code>t</code>.
*/
public Resultant resultantMultiThread()
{
int N = coeffs.length;
// upper bound for resultant(f, g) = ||f, 2||^deg(g) * ||g, 2||^deg(f) = squaresum(f)^(N/2) * 2^(deg(f)/2) because g(x)=x^N-1
// see http://jondalon.mathematik.uni-osnabrueck.de/staff/phpages/brunsw/CompAlg.pdf chapter 3
BigInteger max = squareSum().pow((N + 1) / 2);
max = max.multiply(BigInteger.valueOf(2).pow((degree() + 1) / 2));
BigInteger max2 = max.multiply(BigInteger.valueOf(2));
// compute resultants modulo prime numbers
BigInteger prime = BigInteger.valueOf(10000);
BigInteger pProd = Constants.BIGINT_ONE;
LinkedBlockingQueue<Future<ModularResultant>> resultantTasks = new LinkedBlockingQueue<Future<ModularResultant>>();
Iterator<BigInteger> primes = BIGINT_PRIMES.iterator();
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
while (pProd.compareTo(max2) < 0)
{
if (primes.hasNext())
{
prime = primes.next();
}
else
{
prime = prime.nextProbablePrime();
}
Future<ModularResultant> task = executor.submit(new ModResultantTask(prime.intValue()));
resultantTasks.add(task);
pProd = pProd.multiply(prime);
}
// Combine modular resultants to obtain the resultant.
// For efficiency, first combine all pairs of small resultants to bigger resultants,
// then combine pairs of those, etc. until only one is left.
ModularResultant overallResultant = null;
while (!resultantTasks.isEmpty())
{
try
{
Future<ModularResultant> modRes1 = resultantTasks.take();
Future<ModularResultant> modRes2 = resultantTasks.poll();
if (modRes2 == null)
{
// modRes1 is the only one left
overallResultant = modRes1.get();
break;
}
Future<ModularResultant> newTask = executor.submit(new CombineTask(modRes1.get(), modRes2.get()));
resultantTasks.add(newTask);
}
catch (Exception e)
{
throw new IllegalStateException(e.toString());
}
}
executor.shutdown();
BigInteger res = overallResultant.res;
BigIntPolynomial rhoP = overallResultant.rho;
BigInteger pProd2 = pProd.divide(BigInteger.valueOf(2));
BigInteger pProd2n = pProd2.negate();
if (res.compareTo(pProd2) > 0)
{
res = res.subtract(pProd);
}
if (res.compareTo(pProd2n) < 0)
{
res = res.add(pProd);
}
for (int i = 0; i < N; i++)
{
BigInteger c = rhoP.coeffs[i];
if (c.compareTo(pProd2) > 0)
{
rhoP.coeffs[i] = c.subtract(pProd);
}
if (c.compareTo(pProd2n) < 0)
{
rhoP.coeffs[i] = c.add(pProd);
}
}
return new Resultant(rhoP, res);
}
@Test(timeout = 60000)
public void progressingWatermarkWithWriterTimeouts() throws Exception {
String scope = "Timeout";
String streamName = "Timeout";
int numSegments = 1;
URI controllerUri = URI.create("tcp://localhost:" + controllerPort);
ClientConfig clientConfig = ClientConfig.builder().controllerURI(controllerUri).build();
@Cleanup
StreamManager streamManager = StreamManager.create(controllerUri);
assertNotNull(streamManager);
streamManager.createScope(scope);
streamManager.createStream(scope, streamName, StreamConfiguration.builder()
.scalingPolicy(ScalingPolicy.fixed(numSegments))
.build());
@Cleanup
EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(scope, clientConfig);
@Cleanup
SynchronizerClientFactory syncClientFactory = SynchronizerClientFactory.withScope(scope, clientConfig);
String markStream = NameUtils.getMarkStreamForStream(streamName);
RevisionedStreamClient<Watermark> watermarkReader = syncClientFactory.createRevisionedStreamClient(markStream,
new WatermarkSerializer(),
SynchronizerConfig.builder().build());
LinkedBlockingQueue<Watermark> watermarks = new LinkedBlockingQueue<>();
AtomicBoolean stopFlag = new AtomicBoolean(false);
fetchWatermarks(watermarkReader, watermarks, stopFlag);
// create two writers and write two sevent and call note time for each writer.
@Cleanup
EventStreamWriter<String> writer1 = clientFactory.createEventWriter(streamName,
new JavaSerializer<>(),
EventWriterConfig.builder().build());
writer1.writeEvent("1").get();
writer1.noteTime(100L);
@Cleanup
EventStreamWriter<String> writer2 = clientFactory.createEventWriter(streamName,
new JavaSerializer<>(),
EventWriterConfig.builder().build());
writer2.writeEvent("2").get();
writer2.noteTime(102L);
// writer0 should timeout. writer1 and writer2 should result in two more watermarks with following times:
// 1: 100L-101L 2: 101-101
// then first writer should timeout and be discarded. But second writer should continue to be active as its time
// is higher than first watermark. This should result in a second watermark to be emitted.
AssertExtensions.assertEventuallyEquals(true, () -> watermarks.size() == 2, 100000);
Watermark watermark1 = watermarks.poll();
Watermark watermark2 = watermarks.poll();
assertEquals(100L, watermark1.getLowerTimeBound());
assertEquals(102L, watermark1.getUpperTimeBound());
assertEquals(102L, watermark2.getLowerTimeBound());
assertEquals(102L, watermark2.getUpperTimeBound());
// stream cut should be same
assertTrue(watermark2.getStreamCut().entrySet().stream().allMatch(x -> watermark1.getStreamCut().get(x.getKey()).equals(x.getValue())));
// bring back writer1 and post an event with note time smaller than current watermark
writer1.writeEvent("3").get();
writer1.noteTime(101L);
// no watermark should be emitted.
Watermark nullMark = watermarks.poll(10, TimeUnit.SECONDS);
assertNull(nullMark);
}
void createFile(String filename, int loop) throws Exception, InterruptedException {
System.out.println("createFile, filename " + filename + ", loop " + loop);
//warmup
ConcurrentLinkedQueue<CrailBuffer> bufferQueue = new ConcurrentLinkedQueue<CrailBuffer>();
CrailBuffer buf = fs.allocateBuffer();
bufferQueue.add(buf);
warmUp(filename, warmup, bufferQueue);
fs.freeBuffer(buf);
//benchmark
System.out.println("starting benchmark...");
fs.getStatistics().reset();
LinkedBlockingQueue<String> pathQueue = new LinkedBlockingQueue<String>();
fs.create(filename, CrailNodeType.DIRECTORY, CrailStorageClass.DEFAULT, CrailLocationClass.DEFAULT).get().syncDir();
int filecounter = 0;
for (int i = 0; i < loop; i++){
String name = "" + filecounter++;
String f = filename + "/" + name;
pathQueue.add(f);
}
double ops = 0;
long start = System.currentTimeMillis();
while(!pathQueue.isEmpty()){
String path = pathQueue.poll();
fs.create(path, CrailNodeType.DATAFILE, CrailStorageClass.DEFAULT, CrailLocationClass.DEFAULT).get().syncDir();
}
long end = System.currentTimeMillis();
double executionTime = ((double) (end - start)) / 1000.0;
double latency = 0.0;
if (executionTime > 0) {
latency = 1000000.0 * executionTime / ops;
}
System.out.println("execution time " + executionTime);
System.out.println("ops " + ops);
System.out.println("latency " + latency);
fs.getStatistics().print("close");
}