下面列出了java.util.concurrent.LinkedBlockingQueue#offer ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void backpressureScheduledDelay(){
captured= "";
diff = System.currentTimeMillis();
LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(1);
blockingQueue.add("10");
blockingQueue.offer("10");
ReactiveSeq.range(0, Integer.MAX_VALUE)
.limit(2)
.peek(v-> diff = System.currentTimeMillis())
.peek(i->System.out.println("diff is " +diff))
.map(i -> i.toString())
.scheduleFixedDelay(1l, scheduled)
.connect(blockingQueue)
.onePer(1, TimeUnit.SECONDS)
.peek(i->System.out.println("BQ " + blockingQueue))
.peek(System.out::println)
.forEach(c->captured=c);
assertThat(System.currentTimeMillis() - diff,greaterThan(1500l));
}
@Test
public void backpressureScheduledRate(){
captured= "";
diff = System.currentTimeMillis();
LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(1);
blockingQueue.add("10");
blockingQueue.offer("10");
ReactiveSeq.range(0, Integer.MAX_VALUE)
.limit(2)
.peek(v-> diff = System.currentTimeMillis())
.map(i -> i.toString())
.scheduleFixedRate(1l, scheduled)
.connect(blockingQueue)
.onePer(1, TimeUnit.SECONDS)
.peek(i->System.out.println("BQ " + blockingQueue))
.peek(System.out::println)
.forEach(c->captured=c);
assertThat(System.currentTimeMillis() - diff,greaterThan(1500l));
}
@Test
public void backpressureScheduledCron(){
captured= "";
diff = System.currentTimeMillis();
LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(1);
blockingQueue.add("10");
blockingQueue.offer("10");
ReactiveSeq.range(0, Integer.MAX_VALUE)
.limit(2)
.peek(v-> diff = System.currentTimeMillis())
.map(i -> i.toString())
.schedule("* * * * * ?", scheduled)
.connect(blockingQueue)
.onePer(2, TimeUnit.SECONDS)
.peek(i->System.out.println("BQ " + blockingQueue))
.peek(System.out::println)
.forEach(c->captured=c);
assertThat(System.currentTimeMillis() - diff,greaterThan(1500l));
}
/**
* Parse a GC Log line-by-line using the provided reader.
*/
public int parse(final LineNumberReader br) throws Exception {
final LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>(1000);
// fire up a producer thread that feeds lines from the reader into the queue the parser uses to read lines from
final Thread feeder = new Thread() {
public void run() {
String line = null;
try {
while ((line = br.readLine()) != null) {
try {
queue.offer(line, 10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.interrupted();
log.error(e);
}
}
} catch (IOException ioExc) {
log.error(ioExc);
}
keepPolling = false; // this tells the queue to stop polling once it sees a null from poll
}
};
feeder.start();
return parse(queue);
}
/**
* open the valve to keep on dump data
*/
public void keepDump() {
LinkedBlockingQueue<Object> thq = throttler;
if (thq != null) {
thq.offer(object);
}
}
/**
* 将消息 发往队列
*
* @param msg
*/
public void offer(IMessage msg) {
LinkedBlockingQueue<IMessage> queue = this.queue;
if (queue != null) {
queue.offer(msg);
}
}
/**
* Creates a pool of ByteBuffers with the given size.
*
* @param poolSize the number of buffers to initialize the pool with
* @param bufferSize the size of each buffer
* @return a blocking queue with size equal to poolSize and each buffer equal to bufferSize
*/
protected BlockingQueue<ByteBuffer> createBufferPool(final int poolSize, final int bufferSize) {
final LinkedBlockingQueue<ByteBuffer> bufferPool = new LinkedBlockingQueue<>(poolSize);
for (int i = 0; i < poolSize; i++) {
bufferPool.offer(ByteBuffer.allocate(bufferSize));
}
return bufferPool;
}
public void returnBuffer(ByteBuffer byteBuffer)
{
LinkedBlockingQueue<ByteBuffer> buffers = bufferPool.get(byteBuffer.capacity());
if (buffers == null) {
throw new IllegalStateException("`buffers` shouldn't be null");
}
byteBuffer.position(0);
byteBuffer.limit(byteBuffer.capacity());
buffers.offer(byteBuffer);
}
/**
* Creates a pool of ByteBuffers with the given size.
*
* @param poolSize the number of buffers to initialize the pool with
* @param bufferSize the size of each buffer
* @return a blocking queue with size equal to poolSize and each buffer equal to bufferSize
*/
protected BlockingQueue<ByteBuffer> createBufferPool(final int poolSize, final int bufferSize) {
final LinkedBlockingQueue<ByteBuffer> bufferPool = new LinkedBlockingQueue<>(poolSize);
for (int i = 0; i < poolSize; i++) {
bufferPool.offer(ByteBuffer.allocate(bufferSize));
}
return bufferPool;
}
/**
* 准备启动指定插件组件
*
* @param mContext 主工程Context
* @param mConnection bindService时需要的ServiceConnection,如果不是bindService的方式启动组件,传入Null
* @param mIntent 需要启动组件的Intent
* @param needAddCache 是否需要缓存Intnet,true:如果插件没有初始化,那么会缓存起来,等插件加载完毕再执行此Intent
* false:如果插件没初始化,则直接抛弃此Intent
*/
public static boolean readyToStartSpecifyPlugin(Context mContext,
ServiceConnection mConnection,
Intent mIntent,
boolean needAddCache) {
PluginDebugLog.runtimeLog(TAG, "readyToStartSpecifyPlugin launchIntent: " + mIntent);
String packageName = tryParsePkgName(mContext, mIntent);
PluginLoadedApk mLoadedApk = getPluginLoadedApkByPkgName(packageName);
if (mLoadedApk == null) {
deliver(mContext, false, packageName, ErrorType.ERROR_PLUGIN_NOT_LOADED, "pluginLoadedApk not ready");
PluginDebugLog.runtimeLog(TAG, packageName + "readyToStartSpecifyPlugin launchIntent exception, plugin loaded apk not exist");
PActivityStackSupervisor.clearLoadingIntent(packageName);
return false;
}
LinkedBlockingQueue<IntentRequest> cacheIntents =
PActivityStackSupervisor.getCachedIntent(packageName);
if (cacheIntents == null) {
cacheIntents = new LinkedBlockingQueue<IntentRequest>();
PActivityStackSupervisor.addCachedIntent(packageName, cacheIntents);
}
// 避免重复添加Intent请求到队列中,尤其是第一次初始化时在enterProxy中已经添加了一次
IntentRequest request = new IntentRequest(mIntent, mConnection);
if (!cacheIntents.contains(request) && needAddCache) {
PluginDebugLog.runtimeLog(TAG, "readyToStartSpecifyPlugin launchIntent add to cacheIntent....");
cacheIntents.offer(request); // 添加到队列
} else {
PluginDebugLog.runtimeLog(TAG, "readyToStartSpecifyPlugin launchIntent no need add to cacheIntent....needAddCache:" + needAddCache);
}
PluginDebugLog.runtimeLog(TAG, "readyToStartSpecifyPlugin launchIntent_cacheIntents: " + cacheIntents);
if (!mLoadedApk.hasLaunchIngIntent()) {
IntentRequest firstRequest = cacheIntents.poll(); //处理队首的Intent
if (firstRequest != null && firstRequest.getIntent() != null) {
PluginDebugLog.runtimeFormatLog(TAG, "readyToStartSpecifyPlugin, no launching intent for pkgName: %s, " +
"ready to process first intent in queue!", packageName);
doRealLaunch(mContext, mLoadedApk, firstRequest.getIntent(), firstRequest.getServiceConnection());
}
} else {
PluginDebugLog.runtimeFormatLog(TAG, "readyToStartSpecifyPlugin, has launching intent for pkgName %s " +
"waiting other intent process over", packageName);
}
return true;
}
/**
* Evict a batch (blocking put, but spins to look for an error in the
* <i>writerFuture</i> to avoid a deadlock if the writer fails).
*
* @param writerFuture
* The {@link Future} of the {@link WriterTask} (required).
* @param queue
* The queue onto which the batches are being transferred
* (required).
* @param batch
* A batch (required).
*
* @throws InterruptedException
*/
private static void putOnQueue(final Future<?> writerFuture, final LinkedBlockingQueue<Batch> queue,
final Batch batch) throws InterruptedException {
while (!writerFuture.isDone()) {
if (queue.offer(batch, 100L, TimeUnit.MILLISECONDS)) {
return;
}
}
if (writerFuture.isDone()) {
/*
* This is most likely to indicate either an error or interrupt in the writer.
*/
throw new RuntimeException("Writer is done, but reader still working?");
}
}