下面列出了org.springframework.data.domain.SliceImpl#org.apache.ignite.internal.processors.cache.CacheEntryImpl 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public <K, T> PageResult<T> pageQuery(SqlQuery<K, T> query, Pagination pagination) {
PageResult<T> pageResult = new PageResult<>();
pageResult.setPagination(pagination);
List<CacheEntryImpl> list = cache.withKeepBinary().query(query).getAll();
int start = pagination.getStart();
int pageSize = pagination.getSize();
int totalRecord = list.size();
if (totalRecord < start) {
return pageResult;
}
int end = start + pageSize;
list = list.subList(start, end < totalRecord ? end : totalRecord);
List<T> data = new ArrayList<>();
for (CacheEntryImpl cursor : list) {
data.add(((BinaryObject) cursor.getValue()).deserialize());
}
pageResult.setResult(data);
pagination.setTotalRecord(totalRecord);
pageResult.setPagination(pagination);
return pageResult;
}
/** */
public static Map<Long, List<CacheEntryImpl<Long, ProductOrder>>> generateOrdersPerProductEntries(
Collection<CacheEntryImpl<Long, Product>> products, int ordersPerProductCount) {
Map<Long, List<CacheEntryImpl<Long, ProductOrder>>> map = new HashMap<>();
for (CacheEntryImpl<Long, Product> entry : products) {
List<CacheEntryImpl<Long, ProductOrder>> orders = new LinkedList<>();
for (long i = 0; i < ordersPerProductCount; i++) {
ProductOrder order = generateRandomOrder(entry.getKey());
orders.add(new CacheEntryImpl<>(order.getId(), order));
}
map.put(entry.getKey(), orders);
}
return map;
}
/** */
public static <K> boolean checkSimplePersonCollectionsEqual(Map<K, SimplePerson> map, Collection<CacheEntryImpl<K, SimplePerson>> col,
boolean primitiveFieldsOnly) {
if (map == null || col == null || map.size() != col.size())
return false;
for (CacheEntryImpl<K, SimplePerson> entry : col) {
boolean equals = primitiveFieldsOnly ?
entry.getValue().equalsPrimitiveFields(map.get(entry.getKey())) :
entry.getValue().equals(map.get(entry.getKey()));
if (!equals)
return false;
}
return true;
}
/** */
public static <K> boolean checkPersonCollectionsEqual(Map<K, Person> map, Collection<CacheEntryImpl<K, Person>> col,
boolean primitiveFieldsOnly) {
if (map == null || col == null || map.size() != col.size())
return false;
for (CacheEntryImpl<K, Person> entry : col) {
boolean equals = primitiveFieldsOnly ?
entry.getValue().equalsPrimitiveFields(map.get(entry.getKey())) :
entry.getValue().equals(map.get(entry.getKey()));
if (!equals)
return false;
}
return true;
}
/** {@inheritDoc} */
@Override public Iterator<Cache.Entry<?, ?>> query(@Nullable String cacheName, Collection<Object> params,
@Nullable IndexingQueryFilter filters) throws IgniteSpiException {
if (params.size() < 2)
throw new IgniteSpiException("Range parameters required.");
Iterator<Object> paramsIt = params.iterator();
Object from = paramsIt.next();
Object to = paramsIt.next();
from = from instanceof BinaryObject ? ((BinaryObject)from).deserialize() : from;
to = to instanceof BinaryObject ? ((BinaryObject)to).deserialize() : to;
SortedMap<Object, Object> map = idx.subMap(from, to);
Collection<Cache.Entry<?, ?>> res = new ArrayList<>(map.size());
for (Map.Entry<Object, Object> entry : map.entrySet())
res.add(new CacheEntryImpl<>(entry.getKey(), entry.getValue()));
return res.iterator();
}
/**
* Simple store test.
*
* @param writeCoalescing Write coalescing flag.
* @throws Exception If failed.
*/
private void testSimpleStore(boolean writeCoalescing) throws Exception {
initStore(2, writeCoalescing);
try {
store.write(new CacheEntryImpl<>(1, "v1"));
store.write(new CacheEntryImpl<>(2, "v2"));
assertEquals("v1", store.load(1));
assertEquals("v2", store.load(2));
assertNull(store.load(3));
assertEquals(store.loadAll(Arrays.asList(3, 4, 5)).size(), 0);
store.delete(1);
assertNull(store.load(1));
assertEquals(store.loadAll(Arrays.asList(1)).size(), 0);
assertEquals("v2", store.load(2));
assertNull(store.load(3));
assertEquals(store.loadAll(Arrays.asList(3)).size(), 0);
}
finally {
shutdownStore();
}
}
/**
* Tests that all values will be written to the underlying store
* right in the same order as they were put into the store.
*
* @param writeCoalescing Write coalescing flag.
* @throws Exception If failed.
*/
private void testBatchApply(boolean writeCoalescing) throws Exception {
delegate = new GridCacheTestStore(new ConcurrentLinkedHashMap<Integer, String>() {
@Override public void clear() { }
});
initStore(1, writeCoalescing);
List<Integer> intList = new ArrayList<>(CACHE_SIZE);
try {
for (int i = 0; i < CACHE_SIZE; i++) {
store.write(new CacheEntryImpl<>(i, "val" + i));
intList.add(i);
}
}
finally {
shutdownStore();
}
Map<Integer, String> underlyingMap = delegate.getMap();
assertTrue("Store map key set: " + underlyingMap.keySet(), F.eqOrdered(underlyingMap.keySet(), intList));
}
@Before
public void setUp() {
Cache.Entry entry = new CacheEntryImpl("serviceId", AlertEntry.builder().errorCode("errorCode").build());
when(ignite.cache(CacheNames.Alerts.name())).thenReturn(igniteCache);
when(igniteAlertsStore.getAlertsCache()).thenReturn(igniteCache);
when(igniteCache.query(any(SqlQuery.class))).thenReturn(queryCursor);
when(queryCursor.getAll()).thenReturn(Arrays.asList(entry));
}
/** {@inheritDoc} */
@Override public Iterator<Cache.Entry<?, ?>> query(@Nullable String cacheName, Collection<Object> params,
@Nullable IndexingQueryFilter filters) {
return idx.containsKey(GridIoPolicy.QUERY_POOL) ?
Collections.<Cache.Entry<?, ?>>singletonList(
new CacheEntryImpl<>(GridIoPolicy.QUERY_POOL, GridIoPolicy.QUERY_POOL)).iterator()
: Collections.<Cache.Entry<?, ?>>emptyList().iterator();
}
/** */
public static <K, V> Collection<K> getKeys(Collection<CacheEntryImpl<K, V>> entries) {
List<K> list = new LinkedList<>();
for (CacheEntryImpl<K, ?> entry : entries)
list.add(entry.getKey());
return list;
}
/** */
public static Collection<CacheEntryImpl<Long, Long>> generateLongsEntries(int cnt) {
Collection<CacheEntryImpl<Long, Long>> entries = new LinkedList<>();
for (long i = 0; i < cnt; i++)
entries.add(new CacheEntryImpl<>(i, i + 123));
return entries;
}
/** */
public static Collection<CacheEntryImpl<String, String>> generateStringsEntries(int cnt) {
Collection<CacheEntryImpl<String, String>> entries = new LinkedList<>();
for (int i = 0; i < cnt; i++)
entries.add(new CacheEntryImpl<>(Integer.toString(i), randomString(5)));
return entries;
}
/** */
public static Collection<CacheEntryImpl<Long, Person>> generateLongsPersonsEntries() {
Collection<CacheEntryImpl<Long, Person>> entries = new LinkedList<>();
for (long i = 0; i < BULK_OPERATION_SIZE; i++)
entries.add(new CacheEntryImpl<>(i, generateRandomPerson(i)));
return entries;
}
/** */
public static Collection<CacheEntryImpl<SimplePersonId, SimplePerson>> generateSimplePersonIdsPersonsEntries(int cnt) {
Collection<CacheEntryImpl<SimplePersonId, SimplePerson>> entries = new LinkedList<>();
for (int i = 0; i < cnt; i++) {
PersonId id = generateRandomPersonId();
entries.add(new CacheEntryImpl<>(new SimplePersonId(id), new SimplePerson(generateRandomPerson(id.getPersonNumber()))));
}
return entries;
}
/** */
public static Collection<CacheEntryImpl<PersonId, Person>> generatePersonIdsPersonsEntries(int cnt) {
Collection<CacheEntryImpl<PersonId, Person>> entries = new LinkedList<>();
for (int i = 0; i < cnt; i++) {
PersonId id = generateRandomPersonId();
entries.add(new CacheEntryImpl<>(id, generateRandomPerson(id.getPersonNumber())));
}
return entries;
}
/** */
public static List<CacheEntryImpl<Long, Product>> generateProductEntries() {
List<CacheEntryImpl<Long, Product>> entries = new LinkedList<>();
for (long i = 0; i < BULK_OPERATION_SIZE; i++)
entries.add(new CacheEntryImpl<>(i, generateRandomProduct(i)));
return entries;
}
/** */
public static Collection<Long> getProductIds(Collection<CacheEntryImpl<Long, Product>> entries) {
List<Long> ids = new LinkedList<>();
for (CacheEntryImpl<Long, Product> entry : entries)
ids.add(entry.getKey());
return ids;
}
/** */
public static Collection<CacheEntryImpl<Long, ProductOrder>> generateOrderEntries() {
Collection<CacheEntryImpl<Long, ProductOrder>> entries = new LinkedList<>();
for (long i = 0; i < BULK_OPERATION_SIZE; i++) {
ProductOrder order = generateRandomOrder(i);
entries.add(new CacheEntryImpl<>(order.getId(), order));
}
return entries;
}
public static Collection<Long> getOrderIds(Map<Long, List<CacheEntryImpl<Long, ProductOrder>>> orders) {
Set<Long> ids = new HashSet<>();
for (Long key : orders.keySet()) {
for (CacheEntryImpl<Long, ProductOrder> entry : orders.get(key))
ids.add(entry.getKey());
}
return ids;
}
/** */
public static <K, V> boolean checkCollectionsEqual(Map<K, V> map, Collection<CacheEntryImpl<K, V>> col) {
if (map == null || col == null || map.size() != col.size())
return false;
for (CacheEntryImpl<K, V> entry : col) {
if (!entry.getValue().equals(map.get(entry.getKey())))
return false;
}
return true;
}
/** */
public static <K> boolean checkProductCollectionsEqual(Map<K, Product> map, Collection<CacheEntryImpl<K, Product>> col) {
if (map == null || col == null || map.size() != col.size())
return false;
for (CacheEntryImpl<K, Product> entry : col)
if (!entry.getValue().equals(map.get(entry.getKey())))
return false;
return true;
}
/** */
public static <K> boolean checkOrderCollectionsEqual(Map<K, ProductOrder> map, Collection<CacheEntryImpl<K, ProductOrder>> col) {
if (map == null || col == null || map.size() != col.size())
return false;
for (CacheEntryImpl<K, ProductOrder> entry : col)
if (!entry.getValue().equals(map.get(entry.getKey())))
return false;
return true;
}
/** */
private void doWork(CacheEntryImpl entry) {
try {
process(cacheStore, entry);
updateMetrics(1);
}
catch (Throwable e) {
log.error("Failed to perform single operation", e);
updateErrorMetrics(1);
}
}
/** */
private void doWork(Collection<CacheEntryImpl> entries) {
try {
process(cacheStore, entries);
updateMetrics(entries.size());
}
catch (Throwable e) {
log.error("Failed to perform batch operation", e);
updateErrorMetrics(entries.size());
}
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override protected void process(CacheStore cacheStore, Collection<CacheEntryImpl> entries) {
keys.clear();
for (CacheEntryImpl entry : entries)
keys.add(entry.getKey());
cacheStore.loadAll(keys);
}
/** {@inheritDoc} */
@Override public <T> T unwrap(Class<T> cls) {
if (cls != null && cls.isAssignableFrom(getClass()))
return cls.cast(this);
if (cls.isAssignableFrom(CacheEntryImpl.class))
return (T)new CacheEntryImpl<>(getKey(), getValue());
if (cls.isAssignableFrom(CacheEntry.class))
return (T)new CacheEntryImplEx<>(getKey(), getValue(), null);
throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls);
}
/** {@inheritDoc} */
@Override public Iterable<Entry<K, V>> call() throws Exception {
Collection<Entry<K, V>> res = new ArrayList<>();
for (Entry<K, V> e : cache().localEntries(peekModes))
res.add(new CacheEntryImpl<>(e.getKey(), e.getValue()));
return res;
}
/**
* Tests correct store shutdown when underlying store fails.
*
* @param writeCoalescing Write coalescing flag.
* @throws Exception If failed.
*/
private void testShutdownWithFailure(final boolean writeCoalescing) throws Exception {
final AtomicReference<Exception> err = new AtomicReference<>();
multithreadedAsync(new Runnable() {
@Override public void run() {
try {
delegate.setShouldFail(true);
initStore(2, writeCoalescing);
try {
store.write(new CacheEntryImpl<>(1, "val1"));
store.write(new CacheEntryImpl<>(2, "val2"));
}
finally {
shutdownStore();
delegate.setShouldFail(false);
}
}
catch (Exception e) {
err.set(e);
}
}
}, 1).get();
if (err.get() != null)
throw err.get();
}
/**
* Checks that write behind cache flush frequency was correctly adjusted to nanos expecting putAllCnt to be
* less or equal than elapsed time divided by flush frequency.
*
* @throws Exception If failed.
*/
@Test
public void testSimpleStoreFlushFrequencyWithoutCoalescing() throws Exception {
initStore(1, false);
long writeBehindFlushFreqNanos = FLUSH_FREQUENCY * 1000 * 1000;
int threshold = store.getWriteBehindStoreBatchSize() / 10;
try {
long start = System.nanoTime();
for (int i = 0; i < threshold / 2; i++)
store.write(new CacheEntryImpl<>(i, "v" + i));
U.sleep(FLUSH_FREQUENCY + 300);
for (int i = threshold / 2; i < threshold; i++)
store.write(new CacheEntryImpl<>(i, "v" + i));
long elapsed = System.nanoTime() - start;
U.sleep(FLUSH_FREQUENCY + 300);
int expFlushOps = (int)(1 + elapsed / writeBehindFlushFreqNanos);
assertTrue(delegate.getPutAllCount() <= expFlushOps);
}
finally {
shutdownStore();
}
}
/** */
@SuppressWarnings("unchecked")
public static CacheEntryImpl generateLoadTestsEntry(long i) {
return new CacheEntryImpl(TestsHelper.generateLoadTestsKey(i), TestsHelper.generateLoadTestsValue(i));
}