下面列出了io.reactivex.functions.LongConsumer#org.redisson.api.RFuture 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public RFuture<Collection<ScoredEntry<V>>> entryRangeAsync(
double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive,
int offset, int count) {
Span span = tracingRedissonHelper.buildSpan("entryRangeAsync", set);
span.setTag("startScore", startScore);
span.setTag("startScoreInclusive", startScoreInclusive);
span.setTag("endScore", endScore);
span.setTag("endScoreInclusive", endScoreInclusive);
span.setTag("offset", offset);
span.setTag("count", count);
return tracingRedissonHelper.prepareRFuture(span,
() -> set
.entryRangeAsync(startScore, startScoreInclusive, endScore, endScoreInclusive, offset,
count));
}
@Override
public RFuture<Long> deleteAsync(String... keys) {
return commandExecutor.writeBatchedAsync(null, RedisCommands.DEL, new SlotCallback<Long, Long>() {
AtomicLong results = new AtomicLong();
@Override
public void onSlotResult(Long result) {
results.addAndGet(result);
}
@Override
public Long onFinish() {
return results.get();
}
}, keys);
}
@Override
public RFuture<Boolean> containsAllAsync(Collection<?> c) {
List<Object> params = new ArrayList<Object>();
params.add(fromIndex);
params.add(toIndex.get() - 1);
encode(params, c);
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
"local fromIndex = table.remove(ARGV, 1);" +
"local toIndex = table.remove(ARGV, 2);" +
"local items = redis.call('lrange', KEYS[1], tonumber(fromIndex), tonumber(toIndex)) " +
"for i=1, #items do " +
"for j = 1, #ARGV, 1 do " +
"if items[i] == ARGV[j] then " +
"table.remove(ARGV, j) " +
"end " +
"end " +
"end " +
"return #ARGV == 0 and 1 or 0",
Collections.<Object>singletonList(getName()), params.toArray());
}
@Override
public RFuture<Boolean> expireAtAsync(long timestamp) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local maxSize = tonumber(redis.call('hget', KEYS[5], 'max-size')); " +
"if maxSize ~= nil and maxSize ~= 0 then " +
" redis.call('pexpire', KEYS[5], ARGV[1]); " +
" redis.call('zadd', KEYS[4], 92233720368547758, 'redisson__expiretag'); " +
" redis.call('pexpire', KEYS[4], ARGV[1]); " +
"end; " +
"redis.call('zadd', KEYS[2], 92233720368547758, 'redisson__expiretag'); " +
"redis.call('pexpireat', KEYS[2], ARGV[1]); " +
"redis.call('zadd', KEYS[3], 92233720368547758, 'redisson__expiretag'); " +
"redis.call('pexpire', KEYS[3], ARGV[1]); " +
"return redis.call('pexpireat', KEYS[1], ARGV[1]); ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName(), getLastAccessTimeSetName(), getOptionsName()),
timestamp);
}
private void checkClusterState(ClusterServersConfig cfg, Iterator<RedisURI> iterator, AtomicReference<Throwable> lastException) {
if (!iterator.hasNext()) {
if (lastException.get() != null) {
log.error("Can't update cluster state", lastException.get());
}
scheduleClusterChangeCheck(cfg, null);
return;
}
if (!getShutdownLatch().acquire()) {
return;
}
RedisURI uri = iterator.next();
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, uri, null, configEndpointHostName);
connectionFuture.onComplete((connection, e) -> {
if (e != null) {
lastException.set(e);
getShutdownLatch().release();
checkClusterState(cfg, iterator, lastException);
return;
}
updateClusterState(cfg, connection, iterator, uri, lastException);
});
}
@Override
protected void doSubscribe(byte[]... channels) {
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
for (byte[] channel : channels) {
RFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
@Override
public void onMessage(CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) ch).getName(), channel)) {
return;
}
DefaultMessage msg = new DefaultMessage(((ChannelName) ch).getName(), (byte[])message);
getListener().onMessage(msg, null);
}
});
list.add(f);
}
for (RFuture<?> future : list) {
connectionManager.getCommandExecutor().syncSubscription(future);
}
}
@Override
public RFuture<Void> putAsync(V value) {
RemotePromise<Void> future = (RemotePromise<Void>) service.invoke(value);
RPromise<Void> result = new RedissonPromise<>();
future.getAddFuture().onComplete((r, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
result.trySuccess(null);
});
return result;
}
@Override
public RFuture<Integer> sortToAsync(String destName, String byPattern, List<String> getPatterns, SortOrder order, int offset, int count) {
List<Object> params = new ArrayList<Object>();
params.add(getName());
if (byPattern != null) {
params.add("BY");
params.add(byPattern);
}
if (offset != -1 && count != -1) {
params.add("LIMIT");
}
if (offset != -1) {
params.add(offset);
}
if (count != -1) {
params.add(count);
}
for (String pattern : getPatterns) {
params.add("GET");
params.add(pattern);
}
params.add(order);
params.add("STORE");
params.add(destName);
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SORT_TO, params.toArray());
}
@Override
public void clusterMeet(RedisClusterNode node) {
Assert.notNull(node, "Cluster node must not be null for CLUSTER MEET command!");
Assert.hasText(node.getHost(), "Node to meet cluster must have a host!");
Assert.isTrue(node.getPort() > 0, "Node to meet cluster must have a port greater 0!");
RFuture<Void> f = executorService.writeAsync((String)null, StringCodec.INSTANCE, RedisCommands.CLUSTER_MEET, node.getHost(), node.getPort());
syncFuture(f);
}
@Override
protected RFuture<V> addAndGetOperationAsync(K key, Number value) {
ByteBuf keyState = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(keyState);
ByteBuf msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
RFuture<V> future = commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, new RedisCommand<Object>("EVAL", new NumberConvertor(value.getClass())),
"local result = redis.call('HINCRBYFLOAT', KEYS[1], ARGV[1], ARGV[2]); "
+ "if ARGV[3] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[4]); "
+ "end;"
+ "if ARGV[3] == '2' then "
+ "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);"
+ "redis.call('publish', KEYS[2], ARGV[4]); "
+ "end;"
+ "return result; ",
Arrays.<Object>asList(getName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()),
keyState, new BigDecimal(value.toString()).toPlainString(), invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId);
future.onComplete((res, e) -> {
if (res != null) {
CacheKey cKey = toCacheKey(key);
cachePut(cKey, key, res);
}
});
return future;
}
protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
channelName = prefixName("redisson_delay_queue_channel", getName());
queueName = prefixName("redisson_delay_queue", getName());
timeoutSetName = prefixName("redisson_delay_queue_timeout", getName());
QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {
@Override
protected RFuture<Long> pushTaskAsync() {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
+ "if #expiredValues > 0 then "
+ "for i, v in ipairs(expiredValues) do "
+ "local randomId, value = struct.unpack('dLc0', v);"
+ "redis.call('rpush', KEYS[1], value);"
+ "redis.call('lrem', KEYS[3], 1, v);"
+ "end; "
+ "redis.call('zrem', KEYS[2], unpack(expiredValues));"
+ "end; "
// get startTime from scheduler queue head task
+ "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
+ "if v[1] ~= nil then "
+ "return v[2]; "
+ "end "
+ "return nil;",
Arrays.<Object>asList(getName(), timeoutSetName, queueName),
System.currentTimeMillis(), 100);
}
@Override
protected RTopic getTopic() {
return new RedissonTopic(LongCodec.INSTANCE, commandExecutor, channelName);
}
};
queueTransferService.schedule(queueName, task);
this.queueTransferService = queueTransferService;
}
@Override
public Mono<List<ByteBuffer>> keys(RedisClusterNode node, ByteBuffer pattern) {
Mono<List<String>> m = executorService.reactive(() -> {
return (RFuture<List<String>>)(Object) executorService.readAllAsync(StringCodec.INSTANCE, RedisCommands.KEYS, toByteArray(pattern));
});
return m.map(v -> v.stream().map(t -> ByteBuffer.wrap(t.getBytes(CharsetUtil.UTF_8))).collect(Collectors.toList()));
}
private RFuture<Integer> addListenerAsync(RedisPubSubListener<?> pubSubListener) {
RFuture<PubSubConnectionEntry> future = subscribeService.subscribe(codec, channelName, pubSubListener);
RPromise<Integer> result = new RedissonPromise<Integer>();
future.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
result.trySuccess(System.identityHashCode(pubSubListener));
});
return result;
}
@Override
public RFuture<V> putAsync(K key, V value) {
checkKey(key);
checkValue(value);
RFuture<V> future = putOperationAsync(key, value);
if (hasNoWriter()) {
return future;
}
return mapWriterFuture(future, new MapWriterTask.Add(key, value));
}
@Override
public RFuture<Set<V>> readUnionAsync(String... names) {
List<Object> args = new ArrayList<Object>(names.length + 1);
args.add(getName());
args.addAll(Arrays.asList(names));
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SUNION, args.toArray());
}
private ListScanResult<Object> scanIterator(RedisClient client, long startPos, String pattern, int count) {
List<Object> params = new ArrayList<Object>();
params.add(System.currentTimeMillis());
params.add(startPos);
params.add(encodeMapKey(key));
if (pattern != null) {
params.add(pattern);
}
params.add(count);
RFuture<ListScanResult<Object>> f = commandExecutor.evalReadAsync(client, getName(), codec, EVAL_SSCAN,
"local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[3]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "if expireDate <= tonumber(ARGV[1]) then "
+ "return {0, {}};"
+ "end;"
+ "local res; "
+ "if (#ARGV == 5) then "
+ "res = redis.call('sscan', KEYS[2], ARGV[2], 'match', ARGV[4], 'count', ARGV[5]); "
+ "else "
+ "res = redis.call('sscan', KEYS[2], ARGV[2], 'count', ARGV[4]); "
+ "end;"
+ "return res;",
Arrays.<Object>asList(timeoutSetName, getName()),
params.toArray());
return get(f);
}
@Override
public RFuture<Boolean> putAsync(K key, V value) {
ByteBuf keyState = encodeMapKey(key);
String keyHash = hash(keyState);
ByteBuf valueState = encodeMapValue(value);
String setName = getValuesName(keyHash);
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
"redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " +
"return redis.call('sadd', KEYS[2], ARGV[3]); ",
Arrays.<Object>asList(getName(), setName), keyState, keyHash, valueState);
}
@Override
public RFuture<List<V>> rangeAsync(int fromIndex, int toIndex) {
Span span = tracingRedissonHelper.buildSpan("rangeAsync", list);
span.setTag("fromIndex", fromIndex);
span.setTag("toIndex", toIndex);
return tracingRedissonHelper.prepareRFuture(span, () -> list.rangeAsync(fromIndex, toIndex));
}
@Override
public RFuture<Boolean> containsAllAsync(Collection<?> c) {
if (c.isEmpty()) {
return RedissonPromise.newSucceededFuture(true);
}
String tempName = suffixName(getName(), "redisson_temp");
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
"redis.call('sadd', KEYS[2], unpack(ARGV)); "
+ "local size = redis.call('sdiff', KEYS[2], KEYS[1]);"
+ "redis.call('del', KEYS[2]); "
+ "return #size == 0 and 1 or 0; ",
Arrays.<Object>asList(getName(), tempName), encode(c).toArray());
}
private void checkState(SentinelServersConfig cfg, Iterator<RedisClient> iterator, AtomicReference<Throwable> lastException) {
if (!iterator.hasNext()) {
if (lastException.get() != null) {
log.error("Can't update cluster state", lastException.get());
}
performSentinelDNSCheck(null);
scheduleChangeCheck(cfg, null);
return;
}
if (!getShutdownLatch().acquire()) {
return;
}
RedisClient client = iterator.next();
RFuture<RedisConnection> connectionFuture = connectToNode(null, null, client, null);
connectionFuture.onComplete((connection, e) -> {
if (e != null) {
lastException.set(e);
getShutdownLatch().release();
checkState(cfg, iterator, lastException);
return;
}
updateState(cfg, connection, iterator);
});
}
@Override
public RFuture<List<V>> rangeAsync(int fromIndex, int toIndex) {
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_MAP_VALUE_LIST,
"local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "if expireDate <= tonumber(ARGV[1]) then "
+ "return {};"
+ "end; "
+ "return redis.call('lrange', KEYS[2], ARGV[3], ARGV[4]);",
Arrays.<Object>asList(timeoutSetName, getName()),
System.currentTimeMillis(), encodeMapKey(key), fromIndex, toIndex);
}
@Override
public Properties getConfig(RedisClusterNode node, String pattern) {
MasterSlaveEntry entry = getEntry(node);
RFuture<List<String>> f = executorService.writeAsync(entry, StringCodec.INSTANCE, RedisCommands.CONFIG_GET, pattern);
List<String> r = syncFuture(f);
if (r != null) {
return Converters.toProperties(r);
}
return null;
}
@Override
public RFuture<Collection<ScoredEntry<V>>> entryRangeAsync(
double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive) {
Span span = tracingRedissonHelper.buildSpan("entryRangeAsync", set);
span.setTag("startScore", startScore);
span.setTag("startScoreInclusive", startScoreInclusive);
span.setTag("endScore", endScore);
span.setTag("endScoreInclusive", endScoreInclusive);
return tracingRedissonHelper.prepareRFuture(span,
() -> set.entryRangeAsync(startScore, startScoreInclusive, endScore, endScoreInclusive));
}
@Override
public Mono<String> flushAll() {
return executorService.reactive(() -> {
RFuture<Void> f = executorService.writeAllAsync(FLUSHALL);
return toStringFuture(f);
});
}
@Override
public RFuture<Set<V>> readSortAsync(SortOrder order, int offset, int count) {
Span span = tracingRedissonHelper.buildSpan("readSortAsync", set);
span.setTag("order", nullable(order));
span.setTag("offset", offset);
span.setTag("count", count);
return tracingRedissonHelper
.prepareRFuture(span, () -> set.readSortAsync(order, offset, count));
}
@Override
public Mono<String> flushAll() {
return executorService.reactive(() -> {
RFuture<Void> f = executorService.writeAllAsync(FLUSHALL);
return toStringFuture(f);
});
}
@Override
public RFuture<Collection<V>> valueRangeAsync(int startIndex, int endIndex) {
Span span = tracingRedissonHelper.buildSpan("valueRangeAsync", set);
span.setTag("startIndex", startIndex);
span.setTag("endIndex", endIndex);
return tracingRedissonHelper
.prepareRFuture(span, () -> set.valueRangeAsync(startIndex, endIndex));
}
@Override
public void resetConfigStats(RedisClusterNode node) {
MasterSlaveEntry entry = getEntry(node);
RFuture<Void> f = executorService.writeAsync(entry, StringCodec.INSTANCE, RedisCommands.CONFIG_RESETSTAT);
syncFuture(f);
}
public void changeMaster(InetSocketAddress address, RedisURI uri) {
ClientConnectionsEntry oldMaster = masterEntry;
RFuture<RedisClient> future = setupMasterEntry(address, uri);
changeMaster(uri, oldMaster, future);
}
@Override
public RFuture<Collection<V>> valueRangeAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive) {
String startValue = value(startScore, startScoreInclusive);
String endValue = value(endScore, endScoreInclusive);
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANGEBYSCORE_LIST, getName(), startValue, endValue);
}