类java.util.concurrent.ConcurrentLinkedQueue源码实例Demo

下面列出了怎么用java.util.concurrent.ConcurrentLinkedQueue的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: codebuff   文件: LocalCache.java
Segment(
     LocalCache<K, V> map,
     int initialCapacity,
     long maxSegmentWeight,
     StatsCounter statsCounter) {
  this.map = map;
  this.maxSegmentWeight = maxSegmentWeight;
  this.statsCounter = checkNotNull(statsCounter);
  initTable(newEntryArray(initialCapacity));
  keyReferenceQueue = map.usesKeyReferences() ? new ReferenceQueue<K>() : null;
  valueReferenceQueue = map.usesValueReferences() ? new ReferenceQueue<V>() : null;
  recencyQueue =
    map.usesAccessQueue()
      ? new ConcurrentLinkedQueue<ReferenceEntry<K, V>>()
      : LocalCache.<ReferenceEntry<K, V>>discardingQueue();
  writeQueue =
    map.usesWriteQueue()
      ? new WriteQueue<K, V>()
      : LocalCache.<ReferenceEntry<K, V>>discardingQueue();
  accessQueue =
    map.usesAccessQueue()
      ? new AccessQueue<K, V>()
      : LocalCache.<ReferenceEntry<K, V>>discardingQueue();
}
 
源代码2 项目: reactive-grpc   文件: BackpressureChunkingTest.java
@Test
public void chunkOperatorCorrectlyChunksInfiniteRequestFusion() {
    int chunkSize = DEFAULT_CHUNK_SIZE;

    int partOfChunk = TWO_THIRDS_OF_DEFAULT_CHUNK_SIZE;
    int num = chunkSize * 2;

    AbstractStreamObserverAndPublisher<Long> source =
            new TestStreamObserverAndPublisherWithFusion<Long>(new ConcurrentLinkedQueue<Long>(), null);
    AsyncRangeCallStreamObserver observer = new AsyncRangeCallStreamObserver(Executors.newSingleThreadExecutor(), source, num);
    source.onSubscribe(observer);
    TestSubscriber<Long> testSubscriber = Flowable.fromPublisher(source)
                                                  .observeOn(Schedulers.trampoline())
                                                  .test();


    testSubscriber.awaitTerminalEvent();
    testSubscriber.assertComplete();

    assertThat(observer.requestsQueue).containsExactly(chunkSize, partOfChunk, partOfChunk, partOfChunk);
    assertThat(source.outputFused).isTrue();
}
 
源代码3 项目: crail   文件: CrailBenchmark.java
private void warmUp(String filename, int operations, ConcurrentLinkedQueue<CrailBuffer> bufferList) throws Exception {
	Random random = new Random();
	String warmupFilename = filename + random.nextInt();
	System.out.println("warmUp, warmupFile " + warmupFilename + ", operations " + operations);
	if (operations > 0){
		CrailFile warmupFile = fs.create(warmupFilename, CrailNodeType.DATAFILE, CrailStorageClass.DEFAULT, CrailLocationClass.DEFAULT).get().asFile();
		CrailBufferedOutputStream warmupStream = warmupFile.getBufferedOutputStream(0);
		for (int i = 0; i < operations; i++){
			CrailBuffer buf = bufferList.poll();
			buf.clear();
			warmupStream.write(buf.getByteBuffer());
			bufferList.add(buf);
		}
		warmupStream.purge().get();
		warmupStream.close();
		fs.delete(warmupFilename, false).get().syncDir();			
	}
}
 
@Override
public Queue<Selectable> createFlowTriggerEventQueue(DatalakeClusterUpgradeTriggerEvent event) {
    Queue<Selectable> chain = new ConcurrentLinkedQueue<>();

    Optional<StackStatus> stackStatusOpt = stackStatusService.findFirstByStackIdOrderByCreatedDesc(event.getResourceId());
    StackStatus unknownStackStatus = new StackStatus();
    unknownStackStatus.setDetailedStackStatus(DetailedStackStatus.UNKNOWN);
    DetailedStackStatus detailedStackStatus = stackStatusOpt.orElse(unknownStackStatus).getDetailedStackStatus();
    if (DetailedStackStatus.CLUSTER_UPGRADE_FAILED.equals(detailedStackStatus)) {
        chain.add(new DatalakeClusterUpgradeTriggerEvent(
                CLUSTER_MANAGER_UPGRADE_FINISHED_EVENT.event(), event.getResourceId(), event.accepted(), event.getTargetImage()));
    } else {
        chain.add(new DatalakeClusterUpgradeTriggerEvent(
                CLUSTER_MANAGER_UPGRADE_EVENT.event(), event.getResourceId(), event.accepted(), event.getTargetImage()));
    }

    return chain;
}
 
源代码5 项目: codebase   文件: ConnectedComponents.java
private void constructConnectedComponent(V v, Collection<V> toVisit) {
	IFragment<E,V> f = new Fragment<E,V>(this.g);
	Set<V> visited = new HashSet<V>();
	visited.add(v);
	Queue<V> queue = new ConcurrentLinkedQueue<V>();
	queue.add(v);
	
	while (!queue.isEmpty()) {
		V vv = queue.poll();
		f.addAll(this.g.getEdges(vv));
		
		for (V vvv : this.g.getAdjacent(vv)) {
			if (!visited.contains(vvv))
				queue.add(vvv);
			
			visited.add(vvv);
			toVisit.remove(vvv);
		}
	}
	
	this.components.add(f);
}
 
源代码6 项目: Mi365Locker   文件: MainActivity.java
private void stopScan() {
    for (Map.Entry<String, DeviceConnection> device_entry: this.devices_connections.entrySet())
    {
        device_entry.getValue().dispose();
    }
    bluetoothLeScanner.stopScan(this.mLeScanCallback);

    this.rxBleClient = RxBleClient.create(getApplicationContext());
    this.devicesAdapter = new DeviceAdapter(this, R.layout.list_device_item, new ArrayList<>());
    this.lv_scan.setAdapter(this.devicesAdapter);


    this.btManager= (BluetoothManager) this.getSystemService(Context.BLUETOOTH_SERVICE);
    mBTAdapter = this.btManager.getAdapter();

    bluetoothLeScanner = this.mBTAdapter.getBluetoothLeScanner();


    this.devices_connections = new HashMap<>();
    this.devices_to_attack = new ConcurrentLinkedQueue<>();
    this.scanning = false;
    this.updateStatus();

    this.devicesAdapter.notifyDataSetChanged();
}
 
源代码7 项目: Diorite   文件: YamlCollectionCreator.java
static void putAllCollections(Map<Class<?>, IntFunction<?>> map)
{
    safePut(map, ArrayList.class, ArrayList::new);
    safePut(map, HashSet.class, LinkedHashSet::new);
    safePut(map, Properties.class, x -> new Properties());
    safePut(map, Hashtable.class, Hashtable::new);

    safePut(map, Collection.class, ArrayList::new);
    safePut(map, Set.class, LinkedHashSet::new);
    safePut(map, List.class, ArrayList::new);
    safePut(map, SortedSet.class, x -> new TreeSet<>());
    safePut(map, Queue.class, x -> new ConcurrentLinkedQueue<>());
    safePut(map, Deque.class, x -> new ConcurrentLinkedDeque<>());
    safePut(map, BlockingQueue.class, x -> new LinkedBlockingQueue<>());
    safePut(map, BlockingDeque.class, x -> new LinkedBlockingDeque<>());


    safePut(map, HashMap.class, LinkedHashMap::new);
    safePut(map, LinkedHashMap.class, LinkedHashMap::new);
    safePut(map, ConcurrentHashMap.class, ConcurrentHashMap::new);

    safePut(map, Map.class, LinkedHashMap::new);
    safePut(map, ConcurrentMap.class, x -> new ConcurrentSkipListMap<>());
    safePut(map, ConcurrentNavigableMap.class, x -> new ConcurrentSkipListMap<>());
    safePut(map, SortedMap.class, i -> new TreeMap<>());
}
 
源代码8 项目: openpojo   文件: DefaultPojoClassLookupService.java
public List<PojoClass> getPojoClassesRecursively(final String packageName, final PojoClassFilter pojoClassFilter) {
  final List<PojoClass> pojoClasses = new LinkedList<PojoClass>();
  final PojoClassFilter finalFilterChain = getFinalFilterChain(pojoClassFilter);

  final PojoPackage pojoPackage = PojoPackageFactory.getPojoPackage(packageName);

  Queue<PojoPackage> pending = new ConcurrentLinkedQueue<PojoPackage>();
  pending.add(pojoPackage);

  while (!pending.isEmpty()) {
    final PojoPackage entry = pending.remove();
    pending.addAll(entry.getPojoSubPackages());
    pojoClasses.addAll(entry.getPojoClasses(finalFilterChain));
  }
  return pojoClasses;
}
 
源代码9 项目: openjdk-jdk8u   文件: RemovePollRace.java
Collection<Queue<Boolean>> concurrentQueues() {
    List<Queue<Boolean>> queues = new ArrayList<Queue<Boolean>>();
    queues.add(new ConcurrentLinkedDeque<Boolean>());
    queues.add(new ConcurrentLinkedQueue<Boolean>());
    queues.add(new ArrayBlockingQueue<Boolean>(count, false));
    queues.add(new ArrayBlockingQueue<Boolean>(count, true));
    queues.add(new LinkedBlockingQueue<Boolean>());
    queues.add(new LinkedBlockingDeque<Boolean>());
    queues.add(new LinkedTransferQueue<Boolean>());

    // Following additional implementations are available from:
    // http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
    // queues.add(new SynchronizedLinkedListQueue<Boolean>());

    // Avoid "first fast, second slow" benchmark effect.
    Collections.shuffle(queues);
    return queues;
}
 
源代码10 项目: tutorials   文件: TestConcurrentLinkedQueue.java
@Test
public void givenProducerOffersElementInQueue_WhenConsumerPollsQueue_ThenItRetrievesElement() throws Exception {
    int element = 1;

    ExecutorService executorService = Executors.newFixedThreadPool(2);
    ConcurrentLinkedQueue<Integer> concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
    Runnable offerTask = () -> concurrentLinkedQueue.offer(element);

    Callable<Integer> pollTask = () -> {
        while (concurrentLinkedQueue.peek() != null) {
            return concurrentLinkedQueue.poll()
                .intValue();
        }
        return null;
    };

    executorService.submit(offerTask);
    TimeUnit.SECONDS.sleep(1);

    Future<Integer> returnedElement = executorService.submit(pollTask);
    assertThat(returnedElement.get()
        .intValue(), is(equalTo(element)));
    executorService.awaitTermination(1, TimeUnit.SECONDS);
    executorService.shutdown();
}
 
源代码11 项目: streams   文件: InstagramAbstractProvider.java
@Override
public void prepare(Object configurationObject) {
  this.dataQueue = new ConcurrentLinkedQueue<>();
  this.isCompleted = new AtomicBoolean(false);

  Objects.requireNonNull(config);
  Objects.requireNonNull(config.getOauth());
  Objects.requireNonNull(config.getOauth().getClientId());
  Objects.requireNonNull(config.getOauth().getClientSecret());
  Objects.requireNonNull(config.getOauth().getAccessToken());
  Objects.requireNonNull(config.getThreadsPerProvider());

  try {
    client = Instagram.getInstance(this.config);
  } catch (InstantiationException ex) {
    LOGGER.error("InstantiationException", ex);
  }

  Objects.requireNonNull(client);

}
 
源代码12 项目: xDrip-plus   文件: PendiqService.java
private void expectReply(final ConcurrentLinkedQueue<QueueItem> queue, final QueueItem item) {
    final long wait_time = 3000;
    Inevitable.task("pendiq-expect-reply-" + item.description, wait_time, new Runnable() {
        @Override
        public void run() {
            if (JoH.msSince(lastProcessedIncomingData) > wait_time) {
                UserError.Log.d(TAG, "GOT NO REPLY FOR: " + item.description + " @ " + item.retries);
                item.retries++;
                if (item.retries <= MAX_QUEUE_RETRIES) {
                    UserError.Log.d(TAG, "Retrying due to no reply: " + item.description);
                    writeQueueItem(queue, item);
                }
            }
        }
    });
}
 
源代码13 项目: openjdk-jdk8u-backup   文件: RemovePollRace.java
Collection<Queue<Boolean>> concurrentQueues() {
    List<Queue<Boolean>> queues = new ArrayList<Queue<Boolean>>();
    queues.add(new ConcurrentLinkedDeque<Boolean>());
    queues.add(new ConcurrentLinkedQueue<Boolean>());
    queues.add(new ArrayBlockingQueue<Boolean>(count, false));
    queues.add(new ArrayBlockingQueue<Boolean>(count, true));
    queues.add(new LinkedBlockingQueue<Boolean>());
    queues.add(new LinkedBlockingDeque<Boolean>());
    queues.add(new LinkedTransferQueue<Boolean>());

    // Following additional implementations are available from:
    // http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
    // queues.add(new SynchronizedLinkedListQueue<Boolean>());

    // Avoid "first fast, second slow" benchmark effect.
    Collections.shuffle(queues);
    return queues;
}
 
源代码14 项目: g4proxy   文件: ProxyToServerConnection.java
/**
 * Create a new ProxyToServerConnection.
 *
 * @param proxyServer
 * @param clientConnection
 * @param serverHostAndPort
 * @param initialFilters
 * @param initialHttpRequest
 * @return
 * @throws UnknownHostException
 */
static ProxyToServerConnection create(DefaultHttpProxyServer proxyServer,
        ClientToProxyConnection clientConnection,
        String serverHostAndPort,
        HttpFilters initialFilters,
        HttpRequest initialHttpRequest,
        GlobalTrafficShapingHandler globalTrafficShapingHandler)
        throws UnknownHostException {
    Queue<ChainedProxy> chainedProxies = new ConcurrentLinkedQueue<ChainedProxy>();
    ChainedProxyManager chainedProxyManager = proxyServer
            .getChainProxyManager();
    if (chainedProxyManager != null) {
        chainedProxyManager.lookupChainedProxies(initialHttpRequest,
                chainedProxies);
        if (chainedProxies.size() == 0) {
            // ChainedProxyManager returned no proxies, can't connect
            return null;
        }
    }
    return new ProxyToServerConnection(proxyServer,
            clientConnection,
            serverHostAndPort,
            chainedProxies.poll(),
            chainedProxies,
            initialFilters,
            globalTrafficShapingHandler);
}
 
源代码15 项目: stratio-cassandra   文件: SmartThriftClient.java
private Client get(ByteBuffer pk)
{
    Set<Host> hosts = metadata.getReplicas(metadata.quote(keyspace), pk);
    InetAddress address = null;
    if (hosts.size() > 0)
    {
        int pos = roundrobin.incrementAndGet() % hosts.size();
        for (int i = 0 ; address == null && i < hosts.size() ; i++)
        {
            if (pos < 0)
                pos = -pos;
            Host host = Iterators.get(hosts.iterator(), (pos + i) % hosts.size());
            if (whiteset == null || whiteset.contains(host.getAddress()))
                address = host.getAddress();
        }
    }
    if (address == null)
        address = whitelist.get(ThreadLocalRandom.current().nextInt(whitelist.size()));
    ConcurrentLinkedQueue<Client> q = cache.get(address);
    if (q == null)
    {
        ConcurrentLinkedQueue<Client> newQ = new ConcurrentLinkedQueue<Client>();
        q = cache.putIfAbsent(address, newQ);
        if (q == null)
            q = newQ;
    }
    Client tclient = q.poll();
    if (tclient != null)
        return tclient;
    return new Client(settings.getRawThriftClient(address.getHostAddress()), address);
}
 
源代码16 项目: MHAP   文件: FastaData.java
private FastaData(ConcurrentLinkedQueue<Sequence> seqList)
{
	this.sequenceList = new ConcurrentLinkedQueue<Sequence>(seqList);
	this.fileReader = null;
	this.lastLine = null;
	this.readFullFile = true;
	this.numberProcessed = new AtomicLong(this.sequenceList.size());
	this.offset = 0;
}
 
源代码17 项目: adblib   文件: AdbStream.java
/**
 * Creates a new AdbStream object on the specified AdbConnection
 * with the given local ID.
 *
 * @param adbConn AdbConnection that this stream is running on
 * @param localId Local ID of the stream
 */
public AdbStream(AdbConnection adbConn, int localId) {
    this.adbConn = adbConn;
    this.localId = localId;
    this.readQueue = new ConcurrentLinkedQueue<byte[]>();
    this.writeReady = new AtomicBoolean(false);
    this.isClosed = false;
}
 
源代码18 项目: ignite   文件: GridDebug.java
/**
 * Reset queue to empty one.
 */
public static void reset() {
    ConcurrentLinkedQueue<Item> old = que.get();

    if (old != null) // Was not stopped.
        que.compareAndSet(old, new ConcurrentLinkedQueue<Item>());
}
 
@Test
public void requestRespectsDisable() throws Exception {
    ConcurrentLinkedQueue<AssertionError> errors = new ConcurrentLinkedQueue<>();
    StreamingHttpRequest req = client.get("/").transformPayloadBody(p -> p.beforeRequest(__ -> {
        if (isInvalidThread()) {
            errors.add(new AssertionError("Invalid thread called request-n. Thread: "
                    + currentThread()));
        }
    }));
    client.request(overridingStrategy, req)
            .beforeOnSuccess(__ -> {
                if (isInvalidThread()) {
                    errors.add(new AssertionError("Invalid thread called response metadata. " +
                            "Thread: " + currentThread()));
                }
            })
            .flatMapPublisher(StreamingHttpResponse::payloadBody)
            .beforeOnNext(__ -> {
                if (isInvalidThread()) {
                    errors.add(new AssertionError("Invalid thread called response payload onNext. " +
                            "Thread: " + currentThread()));
                }
            })
            .beforeOnComplete(() -> {
                if (isInvalidThread()) {
                    errors.add(new AssertionError("Invalid thread called response payload onComplete. " +
                            "Thread: " + currentThread()));
                }
            }).toFuture().get();

    assertThat("Unexpected errors: " + errors, errors, hasSize(0));
}
 
@Test
public void constructors() {
    ConstructorTestBuilder ctb = new ConstructorTestBuilder(PublisherWindowStartEnd.class);
    
    ctb.addRef("source", PublisherNever.instance());
    ctb.addRef("start", PublisherNever.instance());
    ctb.addRef("end", (Function<Object, Publisher<Object>>)o -> PublisherNever.instance());
    ctb.addRef("drainQueueSupplier", (Supplier<Queue<Object>>)() -> new ConcurrentLinkedQueue<>());
    ctb.addRef("processorQueueSupplier", (Supplier<Queue<Object>>)() -> new ConcurrentLinkedQueue<>());
    
    ctb.test();
}
 
源代码21 项目: AvatarMQ   文件: MessageCache.java
private void commitMessage(ConcurrentLinkedQueue<T> messages) {

        LinkedList<T> list = new LinkedList<T>();

        list.addAll(messages);
        cache.clear();

        if (list != null && list.size() > 0) {
            parallelDispatch(list);
            list.clear();
        }
    }
 
@Override
public Queue<Selectable> createFlowTriggerEventQueue(StackEvent event) {

    Queue<Selectable> flowEventChain = new ConcurrentLinkedQueue<>();
    flowEventChain.add(new StackEvent(START_EXTERNAL_DATABASE_CREATION_EVENT.event(), event.getResourceId(), event.accepted()));
    flowEventChain.add(new StackEvent(START_CREATION_EVENT.event(), event.getResourceId(), event.accepted()));
    flowEventChain.add(new StackEvent(CLUSTER_CREATION_EVENT.event(), event.getResourceId()));
    return flowEventChain;
}
 
源代码23 项目: binlake   文件: BinlogWorker.java
@Override
public LogPosition getLatestLogPosWithRm() {
    ConcurrentLinkedQueue<LogPosition> logPositions = this.logPositions;
    if (logPositions == null) {
        return null;
    }

    // 轻易不要开work日志
    debugLogPosition(logPositions);

    LogPosition curr;
    LogPosition pre = null;
    int len = 0;

    List<LogPosition> rms = new LinkedList<>();
    // 避免进入无线循环当中 所以控制次数  不需要担心队列过长 因为有truncate Log position 保证
    Iterator<LogPosition> iter = logPositions.iterator();
    while (iter.hasNext() && (curr = iter.next()) != null
            && len++ < THROTTLE_QUEUE_SIZE) {
        if (curr.isCommit()) {
            rms.add(curr); // 添加到 删除队列
            pre = curr;
            continue;
        }
        break; // 如果不是commit 直接退出
    }

    removeQueueWithLock(logPositions, rms);

    if (pre != null) {
        pre.mergeOriginLogPos(originPos);
        pre.refresh();
        return pre;
    }
    return null;
}
 
源代码24 项目: amidst   文件: FragmentQueueProcessor.java
@CalledByAny
public FragmentQueueProcessor(
		ConcurrentLinkedQueue<Fragment> availableQueue,
		ConcurrentLinkedQueue<Fragment> loadingQueue,
		ConcurrentLinkedQueue<Fragment> recycleQueue,
		FragmentCache cache,
		LayerManager layerManager,
		Setting<Dimension> dimensionSetting) {
	this.availableQueue = availableQueue;
	this.loadingQueue = loadingQueue;
	this.recycleQueue = recycleQueue;
	this.cache = cache;
	this.layerManager = layerManager;
	this.dimensionSetting = dimensionSetting;
}
 
源代码25 项目: titan1withtp3.1   文件: FulgoraMapEmitter.java
@Override
public void emit(K key, V value) {
    if (this.doReduce)
        this.reduceMap.computeIfAbsent(key, k -> new ConcurrentLinkedQueue<>()).add(value);
    else
        this.mapQueue.add(new KeyValue<>(key, value));
}
 
源代码26 项目: amidst   文件: FragmentQueueProcessor.java
@CalledByAny
public FragmentQueueProcessor(
		ConcurrentLinkedQueue<Fragment> availableQueue,
		ConcurrentLinkedQueue<Fragment> loadingQueue,
		ConcurrentLinkedQueue<Fragment> recycleQueue,
		FragmentCache cache,
		LayerManager layerManager,
		Setting<Dimension> dimensionSetting) {
	this.availableQueue = availableQueue;
	this.loadingQueue = loadingQueue;
	this.recycleQueue = recycleQueue;
	this.cache = cache;
	this.layerManager = layerManager;
	this.dimensionSetting = dimensionSetting;
}
 
@Test
public void testSideOutput() throws Exception {
	try (
			TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = getInitializedTestHarness(
					new FunctionWithSideOutput(), STATE_DESCRIPTOR)
	) {

		testHarness.processWatermark1(new Watermark(10L));
		testHarness.processWatermark2(new Watermark(10L));
		testHarness.processElement2(new StreamRecord<>(5, 12L));

		testHarness.processWatermark1(new Watermark(40L));
		testHarness.processWatermark2(new Watermark(40L));
		testHarness.processElement1(new StreamRecord<>("6", 13L));
		testHarness.processElement1(new StreamRecord<>("6", 15L));

		testHarness.processWatermark1(new Watermark(50L));
		testHarness.processWatermark2(new Watermark(50L));

		ConcurrentLinkedQueue<StreamRecord<String>> expectedBr = new ConcurrentLinkedQueue<>();
		expectedBr.add(new StreamRecord<>("BR:5 WM:10 TS:12", 12L));

		ConcurrentLinkedQueue<StreamRecord<String>> expectedNonBr = new ConcurrentLinkedQueue<>();
		expectedNonBr.add(new StreamRecord<>("NON-BR:6 WM:40 TS:13", 13L));
		expectedNonBr.add(new StreamRecord<>("NON-BR:6 WM:40 TS:15", 15L));

		ConcurrentLinkedQueue<StreamRecord<String>> brSideOutput = testHarness.getSideOutput(FunctionWithSideOutput.BROADCAST_TAG);
		ConcurrentLinkedQueue<StreamRecord<String>> nonBrSideOutput = testHarness.getSideOutput(FunctionWithSideOutput.NON_BROADCAST_TAG);

		TestHarnessUtil.assertOutputEquals("Wrong Side Output", expectedBr, brSideOutput);
		TestHarnessUtil.assertOutputEquals("Wrong Side Output", expectedNonBr, nonBrSideOutput);
	}
}
 
源代码28 项目: litchi   文件: SQLQueueComponent.java
protected ConcurrentLinkedQueue<DbQueueModel> getQueue(Table<?> table) {
    synchronized (syncLock) {
        ConcurrentLinkedQueue<DbQueueModel> queue = TABLE_QUEUE.get(table.tableName());
        if (queue == null) {
            queue = new ConcurrentLinkedQueue<>();
            TABLE_QUEUE.putIfAbsent(table.tableName(), queue);
            TABLE_INFO.putIfAbsent(table.tableName(), table.getTableInfo());
        }
        return queue;
    }
}
 
源代码29 项目: flink   文件: LegacyKeyedCoProcessOperatorTest.java
/**
 * Verifies that we don't have leakage between different keys.
 */
@Test
public void testProcessingTimeTimerWithState() throws Exception {

	LegacyKeyedCoProcessOperator<String, Integer, String, String> operator =
			new LegacyKeyedCoProcessOperator<>(new ProcessingTimeTriggeringStatefulProcessFunction());

	TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
			new KeyedTwoInputStreamOperatorTestHarness<>(
					operator,
					new IntToStringKeySelector<>(),
					new IdentityKeySelector<String>(),
					BasicTypeInfo.STRING_TYPE_INFO);

	testHarness.setup();
	testHarness.open();

	testHarness.setProcessingTime(1);
	testHarness.processElement1(new StreamRecord<>(17)); // should set timer for 6
	testHarness.processElement1(new StreamRecord<>(13)); // should set timer for 6

	testHarness.setProcessingTime(2);
	testHarness.processElement1(new StreamRecord<>(13)); // should delete timer again
	testHarness.processElement2(new StreamRecord<>("42")); // should set timer for 7

	testHarness.setProcessingTime(6);
	testHarness.setProcessingTime(7);

	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	expectedOutput.add(new StreamRecord<>("INPUT1:17"));
	expectedOutput.add(new StreamRecord<>("INPUT1:13"));
	expectedOutput.add(new StreamRecord<>("INPUT2:42"));
	expectedOutput.add(new StreamRecord<>("STATE:17"));
	expectedOutput.add(new StreamRecord<>("STATE:42"));

	TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());

	testHarness.close();
}
 
@Test
public void testLateElements() throws Exception {
    testHarness.processWatermark(1L);
    assertEquals(1L, operator.lastWatermark);
    testHarness.processElement(record(K1, 0L));
    testHarness.processElement(record(K1, 1L));
    testHarness.processWatermark(2L);
    assertEquals(2L, operator.lastWatermark);

    Queue<Object> actual = testHarness.getOutput();
    Queue<Object> expected = new ConcurrentLinkedQueue<>();
    expected.add(watermark(1L));
    expected.add(watermark(2L));
    TestHarnessUtil.assertOutputEquals("Unexpected output", expected, actual);
}
 
 类所在包
 同包方法