org.springframework.data.domain.SliceImpl#org.apache.ignite.cache.query.QueryCursor源码实例Demo

下面列出了org.springframework.data.domain.SliceImpl#org.apache.ignite.cache.query.QueryCursor 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: ignite   文件: OdbcUtils.java
/**
 * Get affected rows for statement.
 * @param qryCur Cursor.
 * @return Number of table rows affected, if the query is DML, and -1 otherwise.
 */
public static long rowsAffected(QueryCursor<List<?>> qryCur) {
    QueryCursorImpl<List<?>> qryCur0 = (QueryCursorImpl<List<?>>)qryCur;

    if (qryCur0.isQuery())
        return -1;

    Iterator<List<?>> iter = qryCur0.iterator();

    if (iter.hasNext()) {
        List<?> res = iter.next();

        if (!res.isEmpty()) {
            Long affected = (Long) res.get(0);

            if (affected != null)
                return affected;
        }
    }

    return 0;
}
 
源代码2 项目: ignite   文件: IgniteDbPutGetAbstractTest.java
/**
 * @param total Expected total entries.
 */
private void checkScan(int total) {
    for (int i = 0; i < gridCount(); i++) {
        Set<DbKey> allKeys = new HashSet<>();

        Ignite ignite0 = grid(i);

        IgniteCache<DbKey, DbValue> cache0 = ignite0.cache("non-primitive");

        ScanQuery<DbKey, DbValue> qry = new ScanQuery<>();

        QueryCursor<Cache.Entry<DbKey, DbValue>> cur = cache0.query(qry);

        for (Cache.Entry<DbKey, DbValue> e : cur) {
            allKeys.add(e.getKey());
            assertEquals(e.getKey().val, e.getValue().iVal);
        }

        assertEquals(total, allKeys.size());
    }
}
 
源代码3 项目: ignite   文件: FunctionalQueryTest.java
/**
 * @param cache Cache.
 * @param minId Minimal ID.
 * @param pageSize Page size.
 * @param expSize The size of the expected results.
 * @param exp Expected results.
 * @param lazy Lazy mode flag.
 */
private void checkSqlFieldsQuery(ClientCache<Integer, Person> cache, int minId, int pageSize, int expSize,
    Map<Integer, Person> exp, boolean lazy) {
    SqlFieldsQuery qry = new SqlFieldsQuery("select id, name from Person where id >= ?")
        .setArgs(minId)
        .setPageSize(pageSize)
        .setLazy(lazy);

    try (QueryCursor<List<?>> cur = cache.query(qry)) {
        List<List<?>> res = cur.getAll();

        assertEquals(expSize, res.size());

        Map<Integer, Person> act = res.stream().collect(Collectors.toMap(
            r -> Integer.parseInt(r.get(0).toString()),
            r -> new Person(Integer.parseInt(r.get(0).toString()), r.get(1).toString())
        ));

        assertEquals(exp, act);
    }
}
 
源代码4 项目: ignite   文件: IgniteSqlSplitterSelfTest.java
/**
 * Check results of aggregate functions if no rows are selected.
 *
 * @throws Exception If failed,
 */
@Test
public void testEmptyCacheAggregates() throws Exception {
    final String cacheName = "ints";

    IgniteCache<Integer, Value> cache = ignite(0).getOrCreateCache(cacheConfig(cacheName, true,
        Integer.class, Value.class));

    try (QueryCursor<List<?>> qry = cache.query(new SqlFieldsQuery(
        "SELECT count(fst), sum(snd), avg(snd), min(snd), max(snd) FROM Value"))) {
        List<List<?>> result = qry.getAll();

        assertEquals(1, result.size());

        List<?> row = result.get(0);

        assertEquals("count", 0L, ((Number)row.get(0)).longValue());
        assertEquals("sum", null, row.get(1));
        assertEquals("avg", null, row.get(2));
        assertEquals("min", null, row.get(3));
        assertEquals("max", null, row.get(4));
    }
    finally {
        cache.destroy();
    }
}
 
/**
 * @param sql SQL.
 * @param cache Cache.
 * @param expSize Expected results size.
 * @param args Arguments.
 * @return Results.
 */
private List<List<?>> checkQuery(String sql,
    IgniteCache<Object, Object> cache,
    int expSize,
    Object... args) {
    SqlFieldsQuery qry = new SqlFieldsQuery(sql);

    qry.setDistributedJoins(true);
    qry.setArgs(args);

    log.info("Plan: " + queryPlan(cache, qry));

    QueryCursor<List<?>> cur = cache.query(qry);

    List<List<?>> res = cur.getAll();

    if (expSize != res.size())
        log.info("Results: " + res);

    assertEquals(expSize, res.size());

    return res;
}
 
源代码6 项目: ignite   文件: IgniteSqlSplitterSelfTest.java
/** Simple query with distinct aggregates */
private void checkSimpleQueryWithAggrMixed(IgniteCache<Integer, Value> cache) {
    try (QueryCursor<List<?>> qry = cache.query(new SqlFieldsQuery(
        "SELECT count(fst), sum(snd), avg(snd), min(snd), max(snd)," +
            "count(distinct fst), sum(distinct snd), avg(distinct snd), min(distinct snd), max(distinct snd)  " +
            "FROM Value"))) {
        List<List<?>> result = qry.getAll();

        assertEquals(1, result.size());

        List<?> row = result.get(0);

        assertEquals("count", 15L, ((Number)row.get(0)).longValue());
        assertEquals("sum", 30L, ((Number)row.get(1)).longValue());
        assertEquals("avg", 2, ((Integer)row.get(2)).intValue());
        assertEquals("min", 1, ((Integer)row.get(3)).intValue());
        assertEquals("max", 3, ((Integer)row.get(4)).intValue());
        assertEquals("count distinct", 6L, ((Number)row.get(5)).longValue());
        assertEquals("sum distinct", 6L, ((Number)row.get(6)).longValue());
        assertEquals("avg distinct", 2, ((Integer)row.get(7)).intValue());
        assertEquals("min distinct", 1, ((Integer)row.get(8)).intValue());
        assertEquals("max distinct", 3, ((Integer)row.get(9)).intValue());
    }
}
 
源代码7 项目: ignite   文件: IgniteQueryDedicatedPoolTest.java
/**
 * Tests that SPI queries are executed in dedicated pool
 * @throws Exception If failed.
 */
@Test
public void testSpiQueryUsesDedicatedThreadPool() throws Exception {
    startGrid("server");

    try (Ignite client = startClientGrid("client")) {
        IgniteCache<Byte, Byte> cache = client.cache(CACHE_NAME);

        for (byte b = 0; b < Byte.MAX_VALUE; ++b)
            cache.put(b, b);

        QueryCursor<Cache.Entry<Byte, Byte>> cursor = cache.query(new SpiQuery<Byte, Byte>());

        List<Cache.Entry<Byte, Byte>> all = cursor.getAll();

        assertEquals(1, all.size());
        assertEquals(GridIoPolicy.QUERY_POOL, (byte)all.get(0).getValue());

        cursor.close();
    }
}
 
/**
 * @param cache Cache.
 * @return Event counter.
 */
private T2<AtomicInteger, QueryCursor> startListener(IgniteCache<Object, Object> cache) {
    final AtomicInteger evtCnt = new AtomicInteger();

    ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();

    qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
            for (CacheEntryEvent evt : evts) {
                assertNotNull(evt.getKey());
                assertNotNull(evt.getValue());

                if ((Integer)evt.getValue() >= 0)
                    evtCnt.incrementAndGet();
            }
        }
    });

    QueryCursor cur = cache.query(qry);

    return new T2<>(evtCnt, cur);
}
 
/**
 * @throws Exception If failed.
 */
@Test
public void testBackupQueue() throws Exception {
    final CacheEventListener lsnr = new CacheEventListener();

    ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();

    qry.setLocalListener(lsnr);
    qry.setRemoteFilterFactory(new AlwaysFalseFilterFactory());

    try (QueryCursor<?> ignore = grid(0).cache(CACHE_NAME).query(qry)) {
        for (int i = 0; i < KEYS_COUNT; i++) {
            log.info("Put key: " + i);

            for (int j = 0; j < 100; j++)
                grid(j % GRID_COUNT).cache(CACHE_NAME).put(i, new byte[1024 * 50]);
        }

        log.info("Finish.");
    }
}
 
/** {@inheritDoc} */
@Override public void run() {
    IgniteCache cache = node.cache(cacheName);

    // Getting a list of the partitions owned by this node.
    List<Integer> myPartitions = cachePart.get(node.cluster().localNode().id());

    for (Integer part : myPartitions) {

        ScanQuery scanQry = new ScanQuery();

        scanQry.setPartition(part);

        scanQry.setFilter(igniteBiPred);

        try (QueryCursor cursor = cache.query(scanQry)) {
            for (Object obj : cursor) {
                // No-op.
            }
        }

    }
}
 
源代码11 项目: ignite   文件: SqlQueriesExample.java
/**
 * Example for SQL queries to calculate average salary for a specific organization.
 */
private static void sqlQueryWithAggregation() {
    IgniteCache<AffinityKey<Long>, Person> cache = Ignition.ignite().cache(COLLOCATED_PERSON_CACHE);

    // Calculate average of salary of all persons in ApacheIgnite.
    // Note that we also join on Organization cache as well.
    String sql =
        "select avg(salary) " +
        "from Person, \"" + ORG_CACHE + "\".Organization as org " +
        "where Person.orgId = org.id " +
        "and lower(org.name) = lower(?)";

    QueryCursor<List<?>> cursor = cache.query(new SqlFieldsQuery(sql).setArgs("ApacheIgnite"));

    // Calculate average salary for a specific organization.
    print("Average salary for 'ApacheIgnite' employees: ", cursor.getAll());
}
 
/**
 * @param cache Ignite cache.
 * @param map Parameters map.
 * @throws Exception If failed.
 */
private void doContinuousQuery(IgniteCache<Object, Object> cache, Map<Object, Object> map) throws Exception {
    List<QueryCursor> cursors = (ArrayList<QueryCursor>)map.get(cache.getName());

    if (cursors == null) {
        cursors = new ArrayList<>(CONTINUOUS_QUERY_PER_CACHE);
        map.put(cache.getName(), cursors);
    }

    if (cursors.size() == CONTINUOUS_QUERY_PER_CACHE) {
        QueryCursor cursor = cursors.get(nextRandom(cursors.size()));
        cursor.close();
        cursors.remove(cursor);
    }

    ContinuousQuery qry = new ContinuousQuery();

    qry.setLocalListener(new ContinuousQueryUpdater());

    qry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new ContinuousQueryFilter()));

    cursors.add(cache.query(qry));
}
 
/** @throws Exception If failed. */
@Test
public void testPagination() throws Exception {
    // Query with page size 20.
    QueryCursor<List<?>> qry =
        intCache.query(sqlFieldsQuery("select * from Integer").setPageSize(20));

    List<List<?>> res = new ArrayList<>(qry.getAll());

    dedup(res);

    Collections.sort(res, new Comparator<List<?>>() {
        @Override public int compare(List<?> r1, List<?> r2) {
            return ((Integer)r1.get(0)).compareTo((Integer)r2.get(0));
        }
    });

    assertEquals(200, res.size());

    for (List<?> row : res)
        assertEquals("Wrong row size: " + row, 2, row.size());
}
 
/**
 * Scan (with explicit {@code setLocal(true)}, no partition specified) should perform on the local node.
 *
 * @throws Exception If failed.
 */
@Test
public void testScanLocalExplicitNoPart() throws Exception {
    cacheMode = CacheMode.PARTITIONED;
    backups = 0;
    commSpiFactory = new TestLocalCommunicationSpiFactory();

    try {
        Ignite ignite = startGrids(GRID_CNT);

        IgniteCacheProxy<Integer, Integer> cache = fillCache(ignite);

        QueryCursor<Cache.Entry<Integer, Integer>> qry =
            cache.query(new ScanQuery<Integer, Integer>().setLocal(true));

        assertFalse(qry.getAll().isEmpty());
    }
    finally {
        stopAllGrids();
    }
}
 
/**
 * @throws Exception if failed.
 * @param bypassFilter Whether remote filter should be bypassed.
 * @param setLocLsnr Whether local listner should be setted.
 */
private void doQueryWithRemoteFilter(boolean setLocLsnr, boolean bypassFilter) throws Exception {
    FILTERED.clear();

    ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();

    Map<Integer, Integer> listened = new ConcurrentHashMap<>();

    if (setLocLsnr) {
        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
                evts.forEach(event -> listened.put(event.getKey(), event.getValue()));
            }
        });
    }

    qry.setRemoteFilter(evt -> {
            FILTERED.put(evt.getKey(), evt.getValue());

            return bypassFilter;
        });

    try (QueryCursor<Cache.Entry<Integer, Integer>> qryCursor = grid(0).cache(DEFAULT_CACHE_NAME).query(qry)) {
        checkLsnrAndFilterResults(setLocLsnr, bypassFilter, listened);
    }
}
 
源代码16 项目: ignite   文件: IgniteCacheAbstractQuerySelfTest.java
/**
 * @throws Exception If failed.
 */
@Test
public void testFieldsQueryMetadata() throws Exception {
    IgniteCache<UUID, Person> cache = jcache(UUID.class, Person.class);

    for (int i = 0; i < 100; i++)
        cache.put(UUID.randomUUID(), new Person("name-" + i, (i + 1) * 100));

    QueryCursor<List<?>> cur = cache.query(new SqlFieldsQuery("select name, salary from Person where name like ?")
        .setArgs("name-"));

    assertTrue(cur instanceof QueryCursorEx);

    QueryCursorEx<List<?>> curEx = (QueryCursorEx<List<?>>)cur;

    List<GridQueryFieldMetadata> meta = curEx.fieldsMeta();

    assertNotNull(meta);
    assertEquals(2, meta.size());
}
 
源代码17 项目: ignite   文件: SqlQueriesExample.java
/**
 * Example for SQL-based fields queries that return only required
 * fields instead of whole key-value pairs.
 */
private static void sqlFieldsQuery() {
    IgniteCache<Long, Person> cache = Ignition.ignite().cache(PERSON_CACHE);

    // Execute query to get names of all employees.
    QueryCursor<List<?>> cursor = cache.query(new SqlFieldsQuery(
        "select concat(firstName, ' ', lastName) from Person"));

    // In this particular case each row will have one element with full name of an employees.
    List<List<?>> res = cursor.getAll();

    // Print names.
    print("Names of all employees:", res);
}
 
源代码18 项目: alcor   文件: IgniteCache.java
@Override
public Map<K, V> getAll() throws CacheException {
    Query<Cache.Entry<K, V>> qry = new ScanQuery<K, V>();

    try {
        QueryCursor<Cache.Entry<K, V>> cur = cache.query(qry);
        return cur.getAll().stream().collect(Collectors
                .toMap(Cache.Entry::getKey, Cache.Entry::getValue));
    } catch (Exception e) {
        logger.log(Level.WARNING, "IgniteCache getAll operation error:" + e.getMessage());
        throw new CacheException(e.getMessage());
    }
}
 
源代码19 项目: ignite   文件: GridCacheQueryJdbcTask.java
/**
 * @param cursor Cursor.
 * @param iter Iterator.
 * @param totalCnt Total row count already fetched.
 * @param lastAccessTime Last cursor access timestamp.
 */
private Cursor(QueryCursor<List<?>> cursor, Iterator<List<?>> iter, int totalCnt, long lastAccessTime) {
    this.cursor = cursor;
    this.iter = iter;
    this.totalCnt = totalCnt;
    this.lastAccessTime = lastAccessTime;
}
 
源代码20 项目: ignite   文件: IgniteCacheAbstractQuerySelfTest.java
/**
 * @throws Exception If failed.
 */
@Test
public void testOrderByOnly() throws Exception {
    IgniteCache<Integer, Integer> cache = jcache(Integer.class, Integer.class);

    for (int i = 0; i < 10; i++)
        cache.put(i, i);

    QueryCursor<Cache.Entry<Integer, Integer>> q =
        cache.query(new SqlQuery<Integer, Integer>(Integer.class, "_key >= 0"));

    Collection<Cache.Entry<Integer, Integer>> res = q.getAll();

    assertEquals(10, res.size());

    if (cacheMode() != PARTITIONED) {
        Iterator<Cache.Entry<Integer, Integer>> it = res.iterator();

        for (Integer i = 0; i < 10; i++) {
            assertTrue(it.hasNext());

            Cache.Entry<Integer, Integer> e = it.next();

            assertEquals(i, e.getKey());
            assertEquals(i, e.getValue());
        }
    }
}
 
源代码21 项目: ignite   文件: IndexingSpiQuerySelfTest.java
/**
 * @throws Exception If failed.
 */
@Test
public void testNonBinaryIndexingSpi() throws Exception {
    System.setProperty(IgniteSystemProperties.IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI, "true");

    try {
        indexingSpi = new MyIndexingSpi();

        Ignite ignite = startGrid(0);

        CacheConfiguration<PersonKey, Person> ccfg = cacheConfiguration(DEFAULT_CACHE_NAME);

        IgniteCache<PersonKey, Person> cache = ignite.createCache(ccfg);

        for (int i = 0; i < 10; i++) {
            PersonKey key = new PersonKey(i);

            cache.put(key, new Person("John Doe " + i));
        }

        QueryCursor<Cache.Entry<PersonKey, Person>> cursor = cache.query(
            new SpiQuery<PersonKey, Person>().setArgs(new PersonKey(2), new PersonKey(5)));

        for (Cache.Entry<PersonKey, Person> entry : cursor)
            System.out.println(entry);

        cache.remove(new PersonKey(9));
    }
    finally {
        System.clearProperty(IgniteSystemProperties.IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI);
    }
}
 
源代码22 项目: ignite   文件: AsyncChannelTest.java
/**
 * Test multiple concurrent async queries.
 */
@Test
public void testConcurrentQueries() throws Exception {
    try (IgniteClient client = startClient(0)) {
        ClientCache<Integer, Integer> clientCache = client.cache(CACHE_NAME);

        clientCache.clear();

        for (int i = 0; i < 10; i++)
            clientCache.put(i, i);

        CyclicBarrier barrier = new CyclicBarrier(THREADS_CNT);

        GridTestUtils.runMultiThreaded(() -> {
            try {
                barrier.await();
            }
            catch (Exception e) {
                fail();
            }

            for (int i = 0; i < 10; i++) {
                Query<Cache.Entry<Integer, String>> qry = new ScanQuery<Integer, String>().setPageSize(1);

                try (QueryCursor<Cache.Entry<Integer, String>> cur = clientCache.query(qry)) {
                    int cacheSize = clientCache.size(CachePeekMode.PRIMARY);
                    int curSize = cur.getAll().size();

                    assertEquals(cacheSize, curSize);
                }
            }
        }, THREADS_CNT, "thin-client-thread");
    }
}
 
源代码23 项目: ignite   文件: CacheBasedLabelPairCursor.java
/**
 * Queries the specified cache using the specified filter.
 *
 * @param upstreamCache Ignite cache with {@code upstream} data.
 * @param filter Filter for {@code upstream} data. If {@code null} then all entries will be returned.
 * @return Query cursor.
 */
private QueryCursor<Cache.Entry<K, V>> query(IgniteCache<K, V> upstreamCache, IgniteBiPredicate<K, V> filter) {
    ScanQuery<K, V> qry = new ScanQuery<>();

    if (filter != null) // This section was added to keep code correct of qry.setFilter(null) behaviour will changed.
        qry.setFilter(filter);

    return upstreamCache.query(qry);
}
 
源代码24 项目: ignite   文件: StaticCacheDdlTest.java
/**
 * @param cur Cursor to consume.
 */
private void consume(QueryCursor<List<?>> cur) {
    int rows = 0;

    for (List<?> ignore : cur)
        rows++;

    info(rows + " rows processed");
}
 
源代码25 项目: ignite   文件: IgniteCacheAbstractQuerySelfTest.java
/**
 * @throws Exception In case of error.
 */
@Test
public void testComplexTypeKeepBinary() throws Exception {
    if (ignite().configuration().getMarshaller() == null || ignite().configuration().getMarshaller() instanceof BinaryMarshaller) {
        IgniteCache<Key, GridCacheQueryTestValue> cache = jcache(Key.class, GridCacheQueryTestValue.class);

        GridCacheQueryTestValue val1 = new GridCacheQueryTestValue();

        val1.setField1("field1");
        val1.setField2(1);
        val1.setField3(1L);

        GridCacheQueryTestValue val2 = new GridCacheQueryTestValue();

        val2.setField1("field2");
        val2.setField2(2);
        val2.setField3(2L);
        val2.setField6(null);

        cache.put(new Key(100500), val1);
        cache.put(new Key(100501), val2);

        QueryCursor<Cache.Entry<BinaryObject, BinaryObject>> qry = cache.withKeepBinary()
            .query(new SqlQuery<BinaryObject, BinaryObject>(GridCacheQueryTestValue.class,
                "fieldName='field1' and field2=1 and field3=1 and id=100500 and embeddedField2=11 and x=3"));

        Cache.Entry<BinaryObject, BinaryObject> entry = F.first(qry.getAll());

        assertNotNull(entry);
        assertEquals(Long.valueOf(100500L), entry.getKey().field("id"));
        assertEquals(val1, entry.getValue().deserialize());
    }
}
 
源代码26 项目: ignite   文件: UpdatePlan.java
/**
 * @param cur Query cursor.
 * @param plan Update plan.
 * @param op Operation.
 */
private AbstractIterator(QueryCursor<List<?>> cur, UpdatePlan plan,
    EnlistOperation op) {
    this.cur = cur;
    this.plan = plan;
    this.op = op;

    it = cur.iterator();
}
 
源代码27 项目: ignite   文件: ReliabilityTest.java
/**
 * Test that failover doesn't lead to silent query inconsistency.
 */
@Test
public void testQueryConsistencyOnFailover() throws Exception {
    int CLUSTER_SIZE = 2;

    try (LocalIgniteCluster cluster = LocalIgniteCluster.start(CLUSTER_SIZE);
         IgniteClient client = Ignition.startClient(new ClientConfiguration()
             .setAddresses(cluster.clientAddresses().toArray(new String[CLUSTER_SIZE])))
    ) {
        ClientCache<Integer, Integer> cache = client.createCache("cache");

        cache.put(0, 0);
        cache.put(1, 1);

        Query<Cache.Entry<Integer, String>> qry = new ScanQuery<Integer, String>().setPageSize(1);

        try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) {
            int cnt = 0;

            for (Iterator<Cache.Entry<Integer, String>> it = cur.iterator(); it.hasNext(); it.next()) {
                cnt++;

                if (cnt == 1) {
                    for (int i = 0; i < CLUSTER_SIZE; i++)
                        dropAllThinClientConnections(Ignition.allGrids().get(i));
                }
            }

            fail("ClientReconnectedException must be thrown");
        }
        catch (ClientReconnectedException expected) {
            // No-op.
        }
    }
}
 
/**
 * @throws Exception If failed.
 */
@Test
public void testQuery() throws Exception {
    final IgniteCache<Integer, Person> cache = grid(0).cache(DEFAULT_CACHE_NAME);

    cache.clear();

    for (int i = 0; i < 2000; i++)
        cache.put(i, new Person(i));

    GridTestUtils.runMultiThreaded(new Callable<Void>() {
        @Override public Void call() throws Exception {
            for (int i = 0; i < 100; i++) {
                QueryCursor<Cache.Entry<Integer, Person>> qry =
                    cache.query(new SqlQuery<Integer, Person>("Person", "age >= 0"));

                int cnt = 0;

                for (Cache.Entry<Integer, Person> e : qry)
                    cnt++;

                assertEquals(2000, cnt);
            }

            return null;
        }
    }, 16, "test");
}
 
/**
 * @throws Exception if failed.
 * @param bypassFilter Whether remote filter should be bypassed.
 * @param setLocLsnr Whether local listner should be setted.
 */
private void doQueryWithRemoteFilterFactory(boolean setLocLsnr, boolean bypassFilter) throws Exception {
    FILTERED.clear();

    ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();

    Map<Integer, Integer> listened = new ConcurrentHashMap<>();

    if (setLocLsnr) {
        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
                evts.forEach(event -> listened.put(event.getKey(), event.getValue()));
            }
        });
    }

    qry.setRemoteFilterFactory(
        FactoryBuilder.factoryOf((CacheEntryEventSerializableFilter<Integer, Integer>)evt -> {
            FILTERED.put(evt.getKey(), evt.getValue());

            return bypassFilter;
        }));

    try (QueryCursor<Cache.Entry<Integer, Integer>> qryCursor = grid(0).cache(DEFAULT_CACHE_NAME).query(qry)) {
        checkLsnrAndFilterResults(setLocLsnr, bypassFilter, listened);
    }
}
 
源代码30 项目: ignite   文件: IgniteQueryDedicatedPoolTest.java
/**
 * Tests that SQL queries involving actual network IO are executed in dedicated pool.
 * @throws Exception If failed.
 */
@Test
public void testSqlQueryUsesDedicatedThreadPool() throws Exception {
    startGrid("server");

    try (Ignite client = startClientGrid("client")) {
        IgniteCache<Integer, Integer> cache = client.cache(CACHE_NAME);

        // We do this in order to have 1 row in results of select - function is called once per each row of result.
        cache.put(1, 1);

        // We have to refer to a cache explicitly in the query in order for it to be executed
        // in non local distributed manner (yes, there's a "local distributed" manner too - see link above...)
        QueryCursor<List<?>> cursor = cache.query(new SqlFieldsQuery("select currentPolicy() from Integer"));

        List<List<?>> result = cursor.getAll();

        cursor.close();

        assertEquals(1, result.size());

        Byte plc = (Byte)result.get(0).get(0);

        assertNotNull(plc);
        assertEquals(GridIoPolicy.QUERY_POOL, (byte)plc);
    }
}