下面列出了org.springframework.data.domain.SliceImpl#org.apache.ignite.cache.query.QueryCursor 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Get affected rows for statement.
* @param qryCur Cursor.
* @return Number of table rows affected, if the query is DML, and -1 otherwise.
*/
public static long rowsAffected(QueryCursor<List<?>> qryCur) {
QueryCursorImpl<List<?>> qryCur0 = (QueryCursorImpl<List<?>>)qryCur;
if (qryCur0.isQuery())
return -1;
Iterator<List<?>> iter = qryCur0.iterator();
if (iter.hasNext()) {
List<?> res = iter.next();
if (!res.isEmpty()) {
Long affected = (Long) res.get(0);
if (affected != null)
return affected;
}
}
return 0;
}
/**
* @param total Expected total entries.
*/
private void checkScan(int total) {
for (int i = 0; i < gridCount(); i++) {
Set<DbKey> allKeys = new HashSet<>();
Ignite ignite0 = grid(i);
IgniteCache<DbKey, DbValue> cache0 = ignite0.cache("non-primitive");
ScanQuery<DbKey, DbValue> qry = new ScanQuery<>();
QueryCursor<Cache.Entry<DbKey, DbValue>> cur = cache0.query(qry);
for (Cache.Entry<DbKey, DbValue> e : cur) {
allKeys.add(e.getKey());
assertEquals(e.getKey().val, e.getValue().iVal);
}
assertEquals(total, allKeys.size());
}
}
/**
* @param cache Cache.
* @param minId Minimal ID.
* @param pageSize Page size.
* @param expSize The size of the expected results.
* @param exp Expected results.
* @param lazy Lazy mode flag.
*/
private void checkSqlFieldsQuery(ClientCache<Integer, Person> cache, int minId, int pageSize, int expSize,
Map<Integer, Person> exp, boolean lazy) {
SqlFieldsQuery qry = new SqlFieldsQuery("select id, name from Person where id >= ?")
.setArgs(minId)
.setPageSize(pageSize)
.setLazy(lazy);
try (QueryCursor<List<?>> cur = cache.query(qry)) {
List<List<?>> res = cur.getAll();
assertEquals(expSize, res.size());
Map<Integer, Person> act = res.stream().collect(Collectors.toMap(
r -> Integer.parseInt(r.get(0).toString()),
r -> new Person(Integer.parseInt(r.get(0).toString()), r.get(1).toString())
));
assertEquals(exp, act);
}
}
/**
* Check results of aggregate functions if no rows are selected.
*
* @throws Exception If failed,
*/
@Test
public void testEmptyCacheAggregates() throws Exception {
final String cacheName = "ints";
IgniteCache<Integer, Value> cache = ignite(0).getOrCreateCache(cacheConfig(cacheName, true,
Integer.class, Value.class));
try (QueryCursor<List<?>> qry = cache.query(new SqlFieldsQuery(
"SELECT count(fst), sum(snd), avg(snd), min(snd), max(snd) FROM Value"))) {
List<List<?>> result = qry.getAll();
assertEquals(1, result.size());
List<?> row = result.get(0);
assertEquals("count", 0L, ((Number)row.get(0)).longValue());
assertEquals("sum", null, row.get(1));
assertEquals("avg", null, row.get(2));
assertEquals("min", null, row.get(3));
assertEquals("max", null, row.get(4));
}
finally {
cache.destroy();
}
}
/**
* @param sql SQL.
* @param cache Cache.
* @param expSize Expected results size.
* @param args Arguments.
* @return Results.
*/
private List<List<?>> checkQuery(String sql,
IgniteCache<Object, Object> cache,
int expSize,
Object... args) {
SqlFieldsQuery qry = new SqlFieldsQuery(sql);
qry.setDistributedJoins(true);
qry.setArgs(args);
log.info("Plan: " + queryPlan(cache, qry));
QueryCursor<List<?>> cur = cache.query(qry);
List<List<?>> res = cur.getAll();
if (expSize != res.size())
log.info("Results: " + res);
assertEquals(expSize, res.size());
return res;
}
/** Simple query with distinct aggregates */
private void checkSimpleQueryWithAggrMixed(IgniteCache<Integer, Value> cache) {
try (QueryCursor<List<?>> qry = cache.query(new SqlFieldsQuery(
"SELECT count(fst), sum(snd), avg(snd), min(snd), max(snd)," +
"count(distinct fst), sum(distinct snd), avg(distinct snd), min(distinct snd), max(distinct snd) " +
"FROM Value"))) {
List<List<?>> result = qry.getAll();
assertEquals(1, result.size());
List<?> row = result.get(0);
assertEquals("count", 15L, ((Number)row.get(0)).longValue());
assertEquals("sum", 30L, ((Number)row.get(1)).longValue());
assertEquals("avg", 2, ((Integer)row.get(2)).intValue());
assertEquals("min", 1, ((Integer)row.get(3)).intValue());
assertEquals("max", 3, ((Integer)row.get(4)).intValue());
assertEquals("count distinct", 6L, ((Number)row.get(5)).longValue());
assertEquals("sum distinct", 6L, ((Number)row.get(6)).longValue());
assertEquals("avg distinct", 2, ((Integer)row.get(7)).intValue());
assertEquals("min distinct", 1, ((Integer)row.get(8)).intValue());
assertEquals("max distinct", 3, ((Integer)row.get(9)).intValue());
}
}
/**
* Tests that SPI queries are executed in dedicated pool
* @throws Exception If failed.
*/
@Test
public void testSpiQueryUsesDedicatedThreadPool() throws Exception {
startGrid("server");
try (Ignite client = startClientGrid("client")) {
IgniteCache<Byte, Byte> cache = client.cache(CACHE_NAME);
for (byte b = 0; b < Byte.MAX_VALUE; ++b)
cache.put(b, b);
QueryCursor<Cache.Entry<Byte, Byte>> cursor = cache.query(new SpiQuery<Byte, Byte>());
List<Cache.Entry<Byte, Byte>> all = cursor.getAll();
assertEquals(1, all.size());
assertEquals(GridIoPolicy.QUERY_POOL, (byte)all.get(0).getValue());
cursor.close();
}
}
/**
* @param cache Cache.
* @return Event counter.
*/
private T2<AtomicInteger, QueryCursor> startListener(IgniteCache<Object, Object> cache) {
final AtomicInteger evtCnt = new AtomicInteger();
ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
@Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
for (CacheEntryEvent evt : evts) {
assertNotNull(evt.getKey());
assertNotNull(evt.getValue());
if ((Integer)evt.getValue() >= 0)
evtCnt.incrementAndGet();
}
}
});
QueryCursor cur = cache.query(qry);
return new T2<>(evtCnt, cur);
}
/**
* @throws Exception If failed.
*/
@Test
public void testBackupQueue() throws Exception {
final CacheEventListener lsnr = new CacheEventListener();
ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
qry.setLocalListener(lsnr);
qry.setRemoteFilterFactory(new AlwaysFalseFilterFactory());
try (QueryCursor<?> ignore = grid(0).cache(CACHE_NAME).query(qry)) {
for (int i = 0; i < KEYS_COUNT; i++) {
log.info("Put key: " + i);
for (int j = 0; j < 100; j++)
grid(j % GRID_COUNT).cache(CACHE_NAME).put(i, new byte[1024 * 50]);
}
log.info("Finish.");
}
}
/** {@inheritDoc} */
@Override public void run() {
IgniteCache cache = node.cache(cacheName);
// Getting a list of the partitions owned by this node.
List<Integer> myPartitions = cachePart.get(node.cluster().localNode().id());
for (Integer part : myPartitions) {
ScanQuery scanQry = new ScanQuery();
scanQry.setPartition(part);
scanQry.setFilter(igniteBiPred);
try (QueryCursor cursor = cache.query(scanQry)) {
for (Object obj : cursor) {
// No-op.
}
}
}
}
/**
* Example for SQL queries to calculate average salary for a specific organization.
*/
private static void sqlQueryWithAggregation() {
IgniteCache<AffinityKey<Long>, Person> cache = Ignition.ignite().cache(COLLOCATED_PERSON_CACHE);
// Calculate average of salary of all persons in ApacheIgnite.
// Note that we also join on Organization cache as well.
String sql =
"select avg(salary) " +
"from Person, \"" + ORG_CACHE + "\".Organization as org " +
"where Person.orgId = org.id " +
"and lower(org.name) = lower(?)";
QueryCursor<List<?>> cursor = cache.query(new SqlFieldsQuery(sql).setArgs("ApacheIgnite"));
// Calculate average salary for a specific organization.
print("Average salary for 'ApacheIgnite' employees: ", cursor.getAll());
}
/**
* @param cache Ignite cache.
* @param map Parameters map.
* @throws Exception If failed.
*/
private void doContinuousQuery(IgniteCache<Object, Object> cache, Map<Object, Object> map) throws Exception {
List<QueryCursor> cursors = (ArrayList<QueryCursor>)map.get(cache.getName());
if (cursors == null) {
cursors = new ArrayList<>(CONTINUOUS_QUERY_PER_CACHE);
map.put(cache.getName(), cursors);
}
if (cursors.size() == CONTINUOUS_QUERY_PER_CACHE) {
QueryCursor cursor = cursors.get(nextRandom(cursors.size()));
cursor.close();
cursors.remove(cursor);
}
ContinuousQuery qry = new ContinuousQuery();
qry.setLocalListener(new ContinuousQueryUpdater());
qry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new ContinuousQueryFilter()));
cursors.add(cache.query(qry));
}
/** @throws Exception If failed. */
@Test
public void testPagination() throws Exception {
// Query with page size 20.
QueryCursor<List<?>> qry =
intCache.query(sqlFieldsQuery("select * from Integer").setPageSize(20));
List<List<?>> res = new ArrayList<>(qry.getAll());
dedup(res);
Collections.sort(res, new Comparator<List<?>>() {
@Override public int compare(List<?> r1, List<?> r2) {
return ((Integer)r1.get(0)).compareTo((Integer)r2.get(0));
}
});
assertEquals(200, res.size());
for (List<?> row : res)
assertEquals("Wrong row size: " + row, 2, row.size());
}
/**
* Scan (with explicit {@code setLocal(true)}, no partition specified) should perform on the local node.
*
* @throws Exception If failed.
*/
@Test
public void testScanLocalExplicitNoPart() throws Exception {
cacheMode = CacheMode.PARTITIONED;
backups = 0;
commSpiFactory = new TestLocalCommunicationSpiFactory();
try {
Ignite ignite = startGrids(GRID_CNT);
IgniteCacheProxy<Integer, Integer> cache = fillCache(ignite);
QueryCursor<Cache.Entry<Integer, Integer>> qry =
cache.query(new ScanQuery<Integer, Integer>().setLocal(true));
assertFalse(qry.getAll().isEmpty());
}
finally {
stopAllGrids();
}
}
/**
* @throws Exception if failed.
* @param bypassFilter Whether remote filter should be bypassed.
* @param setLocLsnr Whether local listner should be setted.
*/
private void doQueryWithRemoteFilter(boolean setLocLsnr, boolean bypassFilter) throws Exception {
FILTERED.clear();
ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
Map<Integer, Integer> listened = new ConcurrentHashMap<>();
if (setLocLsnr) {
qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
@Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
evts.forEach(event -> listened.put(event.getKey(), event.getValue()));
}
});
}
qry.setRemoteFilter(evt -> {
FILTERED.put(evt.getKey(), evt.getValue());
return bypassFilter;
});
try (QueryCursor<Cache.Entry<Integer, Integer>> qryCursor = grid(0).cache(DEFAULT_CACHE_NAME).query(qry)) {
checkLsnrAndFilterResults(setLocLsnr, bypassFilter, listened);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testFieldsQueryMetadata() throws Exception {
IgniteCache<UUID, Person> cache = jcache(UUID.class, Person.class);
for (int i = 0; i < 100; i++)
cache.put(UUID.randomUUID(), new Person("name-" + i, (i + 1) * 100));
QueryCursor<List<?>> cur = cache.query(new SqlFieldsQuery("select name, salary from Person where name like ?")
.setArgs("name-"));
assertTrue(cur instanceof QueryCursorEx);
QueryCursorEx<List<?>> curEx = (QueryCursorEx<List<?>>)cur;
List<GridQueryFieldMetadata> meta = curEx.fieldsMeta();
assertNotNull(meta);
assertEquals(2, meta.size());
}
/**
* Example for SQL-based fields queries that return only required
* fields instead of whole key-value pairs.
*/
private static void sqlFieldsQuery() {
IgniteCache<Long, Person> cache = Ignition.ignite().cache(PERSON_CACHE);
// Execute query to get names of all employees.
QueryCursor<List<?>> cursor = cache.query(new SqlFieldsQuery(
"select concat(firstName, ' ', lastName) from Person"));
// In this particular case each row will have one element with full name of an employees.
List<List<?>> res = cursor.getAll();
// Print names.
print("Names of all employees:", res);
}
@Override
public Map<K, V> getAll() throws CacheException {
Query<Cache.Entry<K, V>> qry = new ScanQuery<K, V>();
try {
QueryCursor<Cache.Entry<K, V>> cur = cache.query(qry);
return cur.getAll().stream().collect(Collectors
.toMap(Cache.Entry::getKey, Cache.Entry::getValue));
} catch (Exception e) {
logger.log(Level.WARNING, "IgniteCache getAll operation error:" + e.getMessage());
throw new CacheException(e.getMessage());
}
}
/**
* @param cursor Cursor.
* @param iter Iterator.
* @param totalCnt Total row count already fetched.
* @param lastAccessTime Last cursor access timestamp.
*/
private Cursor(QueryCursor<List<?>> cursor, Iterator<List<?>> iter, int totalCnt, long lastAccessTime) {
this.cursor = cursor;
this.iter = iter;
this.totalCnt = totalCnt;
this.lastAccessTime = lastAccessTime;
}
/**
* @throws Exception If failed.
*/
@Test
public void testOrderByOnly() throws Exception {
IgniteCache<Integer, Integer> cache = jcache(Integer.class, Integer.class);
for (int i = 0; i < 10; i++)
cache.put(i, i);
QueryCursor<Cache.Entry<Integer, Integer>> q =
cache.query(new SqlQuery<Integer, Integer>(Integer.class, "_key >= 0"));
Collection<Cache.Entry<Integer, Integer>> res = q.getAll();
assertEquals(10, res.size());
if (cacheMode() != PARTITIONED) {
Iterator<Cache.Entry<Integer, Integer>> it = res.iterator();
for (Integer i = 0; i < 10; i++) {
assertTrue(it.hasNext());
Cache.Entry<Integer, Integer> e = it.next();
assertEquals(i, e.getKey());
assertEquals(i, e.getValue());
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testNonBinaryIndexingSpi() throws Exception {
System.setProperty(IgniteSystemProperties.IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI, "true");
try {
indexingSpi = new MyIndexingSpi();
Ignite ignite = startGrid(0);
CacheConfiguration<PersonKey, Person> ccfg = cacheConfiguration(DEFAULT_CACHE_NAME);
IgniteCache<PersonKey, Person> cache = ignite.createCache(ccfg);
for (int i = 0; i < 10; i++) {
PersonKey key = new PersonKey(i);
cache.put(key, new Person("John Doe " + i));
}
QueryCursor<Cache.Entry<PersonKey, Person>> cursor = cache.query(
new SpiQuery<PersonKey, Person>().setArgs(new PersonKey(2), new PersonKey(5)));
for (Cache.Entry<PersonKey, Person> entry : cursor)
System.out.println(entry);
cache.remove(new PersonKey(9));
}
finally {
System.clearProperty(IgniteSystemProperties.IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI);
}
}
/**
* Test multiple concurrent async queries.
*/
@Test
public void testConcurrentQueries() throws Exception {
try (IgniteClient client = startClient(0)) {
ClientCache<Integer, Integer> clientCache = client.cache(CACHE_NAME);
clientCache.clear();
for (int i = 0; i < 10; i++)
clientCache.put(i, i);
CyclicBarrier barrier = new CyclicBarrier(THREADS_CNT);
GridTestUtils.runMultiThreaded(() -> {
try {
barrier.await();
}
catch (Exception e) {
fail();
}
for (int i = 0; i < 10; i++) {
Query<Cache.Entry<Integer, String>> qry = new ScanQuery<Integer, String>().setPageSize(1);
try (QueryCursor<Cache.Entry<Integer, String>> cur = clientCache.query(qry)) {
int cacheSize = clientCache.size(CachePeekMode.PRIMARY);
int curSize = cur.getAll().size();
assertEquals(cacheSize, curSize);
}
}
}, THREADS_CNT, "thin-client-thread");
}
}
/**
* Queries the specified cache using the specified filter.
*
* @param upstreamCache Ignite cache with {@code upstream} data.
* @param filter Filter for {@code upstream} data. If {@code null} then all entries will be returned.
* @return Query cursor.
*/
private QueryCursor<Cache.Entry<K, V>> query(IgniteCache<K, V> upstreamCache, IgniteBiPredicate<K, V> filter) {
ScanQuery<K, V> qry = new ScanQuery<>();
if (filter != null) // This section was added to keep code correct of qry.setFilter(null) behaviour will changed.
qry.setFilter(filter);
return upstreamCache.query(qry);
}
/**
* @param cur Cursor to consume.
*/
private void consume(QueryCursor<List<?>> cur) {
int rows = 0;
for (List<?> ignore : cur)
rows++;
info(rows + " rows processed");
}
/**
* @throws Exception In case of error.
*/
@Test
public void testComplexTypeKeepBinary() throws Exception {
if (ignite().configuration().getMarshaller() == null || ignite().configuration().getMarshaller() instanceof BinaryMarshaller) {
IgniteCache<Key, GridCacheQueryTestValue> cache = jcache(Key.class, GridCacheQueryTestValue.class);
GridCacheQueryTestValue val1 = new GridCacheQueryTestValue();
val1.setField1("field1");
val1.setField2(1);
val1.setField3(1L);
GridCacheQueryTestValue val2 = new GridCacheQueryTestValue();
val2.setField1("field2");
val2.setField2(2);
val2.setField3(2L);
val2.setField6(null);
cache.put(new Key(100500), val1);
cache.put(new Key(100501), val2);
QueryCursor<Cache.Entry<BinaryObject, BinaryObject>> qry = cache.withKeepBinary()
.query(new SqlQuery<BinaryObject, BinaryObject>(GridCacheQueryTestValue.class,
"fieldName='field1' and field2=1 and field3=1 and id=100500 and embeddedField2=11 and x=3"));
Cache.Entry<BinaryObject, BinaryObject> entry = F.first(qry.getAll());
assertNotNull(entry);
assertEquals(Long.valueOf(100500L), entry.getKey().field("id"));
assertEquals(val1, entry.getValue().deserialize());
}
}
/**
* @param cur Query cursor.
* @param plan Update plan.
* @param op Operation.
*/
private AbstractIterator(QueryCursor<List<?>> cur, UpdatePlan plan,
EnlistOperation op) {
this.cur = cur;
this.plan = plan;
this.op = op;
it = cur.iterator();
}
/**
* Test that failover doesn't lead to silent query inconsistency.
*/
@Test
public void testQueryConsistencyOnFailover() throws Exception {
int CLUSTER_SIZE = 2;
try (LocalIgniteCluster cluster = LocalIgniteCluster.start(CLUSTER_SIZE);
IgniteClient client = Ignition.startClient(new ClientConfiguration()
.setAddresses(cluster.clientAddresses().toArray(new String[CLUSTER_SIZE])))
) {
ClientCache<Integer, Integer> cache = client.createCache("cache");
cache.put(0, 0);
cache.put(1, 1);
Query<Cache.Entry<Integer, String>> qry = new ScanQuery<Integer, String>().setPageSize(1);
try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) {
int cnt = 0;
for (Iterator<Cache.Entry<Integer, String>> it = cur.iterator(); it.hasNext(); it.next()) {
cnt++;
if (cnt == 1) {
for (int i = 0; i < CLUSTER_SIZE; i++)
dropAllThinClientConnections(Ignition.allGrids().get(i));
}
}
fail("ClientReconnectedException must be thrown");
}
catch (ClientReconnectedException expected) {
// No-op.
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testQuery() throws Exception {
final IgniteCache<Integer, Person> cache = grid(0).cache(DEFAULT_CACHE_NAME);
cache.clear();
for (int i = 0; i < 2000; i++)
cache.put(i, new Person(i));
GridTestUtils.runMultiThreaded(new Callable<Void>() {
@Override public Void call() throws Exception {
for (int i = 0; i < 100; i++) {
QueryCursor<Cache.Entry<Integer, Person>> qry =
cache.query(new SqlQuery<Integer, Person>("Person", "age >= 0"));
int cnt = 0;
for (Cache.Entry<Integer, Person> e : qry)
cnt++;
assertEquals(2000, cnt);
}
return null;
}
}, 16, "test");
}
/**
* @throws Exception if failed.
* @param bypassFilter Whether remote filter should be bypassed.
* @param setLocLsnr Whether local listner should be setted.
*/
private void doQueryWithRemoteFilterFactory(boolean setLocLsnr, boolean bypassFilter) throws Exception {
FILTERED.clear();
ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
Map<Integer, Integer> listened = new ConcurrentHashMap<>();
if (setLocLsnr) {
qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
@Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
evts.forEach(event -> listened.put(event.getKey(), event.getValue()));
}
});
}
qry.setRemoteFilterFactory(
FactoryBuilder.factoryOf((CacheEntryEventSerializableFilter<Integer, Integer>)evt -> {
FILTERED.put(evt.getKey(), evt.getValue());
return bypassFilter;
}));
try (QueryCursor<Cache.Entry<Integer, Integer>> qryCursor = grid(0).cache(DEFAULT_CACHE_NAME).query(qry)) {
checkLsnrAndFilterResults(setLocLsnr, bypassFilter, listened);
}
}
/**
* Tests that SQL queries involving actual network IO are executed in dedicated pool.
* @throws Exception If failed.
*/
@Test
public void testSqlQueryUsesDedicatedThreadPool() throws Exception {
startGrid("server");
try (Ignite client = startClientGrid("client")) {
IgniteCache<Integer, Integer> cache = client.cache(CACHE_NAME);
// We do this in order to have 1 row in results of select - function is called once per each row of result.
cache.put(1, 1);
// We have to refer to a cache explicitly in the query in order for it to be executed
// in non local distributed manner (yes, there's a "local distributed" manner too - see link above...)
QueryCursor<List<?>> cursor = cache.query(new SqlFieldsQuery("select currentPolicy() from Integer"));
List<List<?>> result = cursor.getAll();
cursor.close();
assertEquals(1, result.size());
Byte plc = (Byte)result.get(0).get(0);
assertNotNull(plc);
assertEquals(GridIoPolicy.QUERY_POOL, (byte)plc);
}
}