java.util.concurrent.LinkedBlockingQueue#offer ( )源码实例Demo

下面列出了java.util.concurrent.LinkedBlockingQueue#offer ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: cyclops   文件: ConnectableTest.java
@Test
public void backpressureScheduledDelay(){

    captured= "";

       diff =  System.currentTimeMillis();
      LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(1);
      blockingQueue.add("10");
      blockingQueue.offer("10");
      ReactiveSeq.range(0, Integer.MAX_VALUE)
          .limit(2)
          .peek(v-> diff = System.currentTimeMillis())
          .peek(i->System.out.println("diff is "  +diff))
          .map(i -> i.toString())
          .scheduleFixedDelay(1l, scheduled)
          .connect(blockingQueue)
          .onePer(1, TimeUnit.SECONDS)
          .peek(i->System.out.println("BQ " + blockingQueue))
          .peek(System.out::println)
          .forEach(c->captured=c);

      assertThat(System.currentTimeMillis() - diff,greaterThan(1500l));
}
 
源代码2 项目: cyclops   文件: ConnectableTest.java
@Test
public void backpressureScheduledRate(){

    captured= "";

       diff =  System.currentTimeMillis();
      LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(1);
      blockingQueue.add("10");
      blockingQueue.offer("10");
      ReactiveSeq.range(0, Integer.MAX_VALUE)
          .limit(2)
          .peek(v-> diff = System.currentTimeMillis())
          .map(i -> i.toString())
          .scheduleFixedRate(1l, scheduled)
          .connect(blockingQueue)
          .onePer(1, TimeUnit.SECONDS)
          .peek(i->System.out.println("BQ " + blockingQueue))
          .peek(System.out::println)
          .forEach(c->captured=c);

      assertThat(System.currentTimeMillis() - diff,greaterThan(1500l));
}
 
源代码3 项目: cyclops   文件: ConnectableTest.java
@Test
public void backpressureScheduledCron(){

    captured= "";

       diff =  System.currentTimeMillis();
      LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(1);
      blockingQueue.add("10");
      blockingQueue.offer("10");
      ReactiveSeq.range(0, Integer.MAX_VALUE)
          .limit(2)
          .peek(v-> diff = System.currentTimeMillis())
          .map(i -> i.toString())
          .schedule("* * * * * ?", scheduled)
          .connect(blockingQueue)
          .onePer(2, TimeUnit.SECONDS)
          .peek(i->System.out.println("BQ " + blockingQueue))
          .peek(System.out::println)
          .forEach(c->captured=c);

      assertThat(System.currentTimeMillis() - diff,greaterThan(1500l));
}
 
源代码4 项目: gc-log-analyzer   文件: GCLog.java
/**
 * Parse a GC Log line-by-line using the provided reader.
 */
public int parse(final LineNumberReader br) throws Exception {
  final LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>(1000);
  // fire up a producer thread that feeds lines from the reader into the queue the parser uses to read lines from
  final Thread feeder = new Thread() {
    public void run() {
      String line = null;
      try {
        while ((line = br.readLine()) != null) {
          try {
            queue.offer(line, 10, TimeUnit.SECONDS);
          } catch (InterruptedException e) {
            Thread.interrupted();
            log.error(e);
          }
        }
      } catch (IOException ioExc) {
        log.error(ioExc);
      }
      keepPolling = false; // this tells the queue to stop polling once it sees a null from poll
    }
  };
  feeder.start();
  return parse(queue);
}
 
源代码5 项目: binlake   文件: BinlogWorker.java
/**
 * open the valve to keep on dump data
 */
public void keepDump() {
    LinkedBlockingQueue<Object> thq = throttler;
    if (thq != null) {
        thq.offer(object);
    }
}
 
源代码6 项目: binlake   文件: BinlogHandler.java
/**
 * 将消息 发往队列
 *
 * @param msg
 */
public void offer(IMessage msg) {
    LinkedBlockingQueue<IMessage> queue = this.queue;
    if (queue != null) {
        queue.offer(msg);
    }
}
 
/**
 * Creates a pool of ByteBuffers with the given size.
 *
 * @param poolSize the number of buffers to initialize the pool with
 * @param bufferSize the size of each buffer
 * @return a blocking queue with size equal to poolSize and each buffer equal to bufferSize
 */
protected BlockingQueue<ByteBuffer> createBufferPool(final int poolSize, final int bufferSize) {
    final LinkedBlockingQueue<ByteBuffer> bufferPool = new LinkedBlockingQueue<>(poolSize);
    for (int i = 0; i < poolSize; i++) {
        bufferPool.offer(ByteBuffer.allocate(bufferSize));
    }
    return bufferPool;
}
 
源代码8 项目: fluency   文件: BufferPool.java
public void returnBuffer(ByteBuffer byteBuffer)
{
    LinkedBlockingQueue<ByteBuffer> buffers = bufferPool.get(byteBuffer.capacity());
    if (buffers == null) {
        throw new IllegalStateException("`buffers` shouldn't be null");
    }

    byteBuffer.position(0);
    byteBuffer.limit(byteBuffer.capacity());
    buffers.offer(byteBuffer);
}
 
源代码9 项目: nifi   文件: AbstractListenEventProcessor.java
/**
 * Creates a pool of ByteBuffers with the given size.
 *
 * @param poolSize the number of buffers to initialize the pool with
 * @param bufferSize the size of each buffer
 * @return a blocking queue with size equal to poolSize and each buffer equal to bufferSize
 */
protected BlockingQueue<ByteBuffer> createBufferPool(final int poolSize, final int bufferSize) {
    final LinkedBlockingQueue<ByteBuffer> bufferPool = new LinkedBlockingQueue<>(poolSize);
    for (int i = 0; i < poolSize; i++) {
        bufferPool.offer(ByteBuffer.allocate(bufferSize));
    }
    return bufferPool;
}
 
源代码10 项目: Neptune   文件: PluginManager.java
/**
 * 准备启动指定插件组件
 *
 * @param mContext     主工程Context
 * @param mConnection  bindService时需要的ServiceConnection,如果不是bindService的方式启动组件,传入Null
 * @param mIntent      需要启动组件的Intent
 * @param needAddCache 是否需要缓存Intnet,true:如果插件没有初始化,那么会缓存起来,等插件加载完毕再执行此Intent
 *                     false:如果插件没初始化,则直接抛弃此Intent
 */
public static boolean readyToStartSpecifyPlugin(Context mContext,
                                                ServiceConnection mConnection,
                                                Intent mIntent,
                                                boolean needAddCache) {
    PluginDebugLog.runtimeLog(TAG, "readyToStartSpecifyPlugin launchIntent: " + mIntent);
    String packageName = tryParsePkgName(mContext, mIntent);
    PluginLoadedApk mLoadedApk = getPluginLoadedApkByPkgName(packageName);
    if (mLoadedApk == null) {
        deliver(mContext, false, packageName, ErrorType.ERROR_PLUGIN_NOT_LOADED, "pluginLoadedApk not ready");
        PluginDebugLog.runtimeLog(TAG, packageName + "readyToStartSpecifyPlugin launchIntent exception, plugin loaded apk not exist");
        PActivityStackSupervisor.clearLoadingIntent(packageName);
        return false;
    }

    LinkedBlockingQueue<IntentRequest> cacheIntents =
            PActivityStackSupervisor.getCachedIntent(packageName);
    if (cacheIntents == null) {
        cacheIntents = new LinkedBlockingQueue<IntentRequest>();
        PActivityStackSupervisor.addCachedIntent(packageName, cacheIntents);
    }
    // 避免重复添加Intent请求到队列中,尤其是第一次初始化时在enterProxy中已经添加了一次
    IntentRequest request = new IntentRequest(mIntent, mConnection);
    if (!cacheIntents.contains(request) && needAddCache) {
        PluginDebugLog.runtimeLog(TAG, "readyToStartSpecifyPlugin launchIntent add to cacheIntent....");
        cacheIntents.offer(request);  // 添加到队列
    } else {
        PluginDebugLog.runtimeLog(TAG, "readyToStartSpecifyPlugin launchIntent no need add to cacheIntent....needAddCache:" + needAddCache);
    }

    PluginDebugLog.runtimeLog(TAG, "readyToStartSpecifyPlugin launchIntent_cacheIntents: " + cacheIntents);
    if (!mLoadedApk.hasLaunchIngIntent()) {
        IntentRequest firstRequest = cacheIntents.poll(); //处理队首的Intent
        if (firstRequest != null && firstRequest.getIntent() != null) {
            PluginDebugLog.runtimeFormatLog(TAG, "readyToStartSpecifyPlugin, no launching intent for pkgName: %s, " +
                    "ready to process first intent in queue!", packageName);
            doRealLaunch(mContext, mLoadedApk, firstRequest.getIntent(), firstRequest.getServiceConnection());
        }
    } else {
        PluginDebugLog.runtimeFormatLog(TAG, "readyToStartSpecifyPlugin, has launching intent for pkgName %s " +
                "waiting other intent process over", packageName);
    }
    return true;
}
 
源代码11 项目: database   文件: AbstractKeyArrayIndexProcedure.java
/**
 * Evict a batch (blocking put, but spins to look for an error in the
 * <i>writerFuture</i> to avoid a deadlock if the writer fails).
 * 
 * @param writerFuture
 *            The {@link Future} of the {@link WriterTask} (required).
 * @param queue
 *            The queue onto which the batches are being transferred
 *            (required).
 * @param batch
 *            A batch (required).
 * 
 * @throws InterruptedException
 */
private static void putOnQueue(final Future<?> writerFuture, final LinkedBlockingQueue<Batch> queue,
		final Batch batch) throws InterruptedException {

	while (!writerFuture.isDone()) {

		if (queue.offer(batch, 100L, TimeUnit.MILLISECONDS)) {
			return;
		}

	}
	
	if (writerFuture.isDone()) {

		/*
		 * This is most likely to indicate either an error or interrupt in the writer.
		 */

		throw new RuntimeException("Writer is done, but reader still working?");

	}
	
}