下面列出了io.reactivex.functions.LongConsumer#org.redisson.client.RedisClient 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
String peer = (String) ((EnhancedInstance) allArguments[0]).getSkyWalkingDynamicField();
if (peer == null) {
try {
/*
In some high versions of redisson, such as 3.11.1.
The attribute address in the RedisClientConfig class changed from a lower version of the URI to a RedisURI.
But they all have the host and port attributes, so use the following code for compatibility.
*/
Object address = ClassUtil.getObjectField(((RedisClient) allArguments[0]).getConfig(), "address");
String host = (String) ClassUtil.getObjectField(address, "host");
String port = String.valueOf(ClassUtil.getObjectField(address, "port"));
peer = host + ":" + port;
} catch (Exception e) {
logger.warn("RedisConnection create peer error: ", e);
}
}
objInst.setSkyWalkingDynamicField(peer);
}
@Override
public RedisSentinelConnection getSentinelConnection() {
if (!redisson.getConfig().isSentinelConfig()) {
throw new InvalidDataAccessResourceUsageException("Redisson is not in Sentinel mode");
}
SentinelConnectionManager manager = ((SentinelConnectionManager)((Redisson)redisson).getConnectionManager());
for (RedisClient client : manager.getSentinels()) {
org.redisson.client.RedisConnection connection = client.connect();
try {
String res = connection.sync(RedisCommands.PING);
if ("pong".equalsIgnoreCase(res)) {
return new RedissonSentinelConnection(connection);
}
} catch (Exception e) {
log.warn("Can't connect to " + client, e);
connection.closeAsync();
}
}
throw new InvalidDataAccessResourceUsageException("Sentinels are not found");
}
@Override
public Flux<CommandResponse<KeyCommand, Flux<Tuple>>> zScan(Publisher<KeyScanCommand> commands) {
return execute(commands, command -> {
Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getOptions(), "ScanOptions must not be null!");
byte[] keyBuf = toByteArray(command.getKey());
Flux<Tuple> flux = Flux.create(new SetReactiveIterator<Tuple>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
if (command.getOptions().getPattern() == null) {
return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, ZSCAN,
keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L));
}
return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, ZSCAN,
keyBuf, nextIterPos, "MATCH", command.getOptions().getPattern(),
"COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L));
}
});
return Mono.just(new CommandResponse<>(command, flux));
});
}
@Override
public Flux<CommandResponse<KeyCommand, Flux<ByteBuffer>>> sScan(Publisher<KeyScanCommand> commands) {
return execute(commands, command -> {
Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getOptions(), "ScanOptions must not be null!");
byte[] keyBuf = toByteArray(command.getKey());
Flux<byte[]> flux = Flux.create(new SetReactiveIterator<byte[]>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
if (command.getOptions().getPattern() == null) {
return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.SSCAN,
keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L));
}
return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.SSCAN,
keyBuf, nextIterPos, "MATCH", command.getOptions().getPattern(),
"COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L));
}
});
return Mono.just(new CommandResponse<>(command, flux.map(v -> ByteBuffer.wrap(v))));
});
}
@Override
public RedisSentinelConnection getSentinelConnection() {
if (!redisson.getConfig().isSentinelConfig()) {
throw new InvalidDataAccessResourceUsageException("Redisson is not in Sentinel mode");
}
SentinelConnectionManager manager = ((SentinelConnectionManager)((Redisson)redisson).getConnectionManager());
for (RedisClient client : manager.getSentinels()) {
org.redisson.client.RedisConnection connection = client.connect();
try {
String res = connection.sync(RedisCommands.PING);
if ("pong".equalsIgnoreCase(res)) {
return new RedissonSentinelConnection(connection);
}
} catch (Exception e) {
log.warn("Can't connect to " + client, e);
connection.closeAsync();
}
}
throw new InvalidDataAccessResourceUsageException("Sentinels are not found");
}
@Override
public Flux<CommandResponse<KeyCommand, Flux<Entry<ByteBuffer, ByteBuffer>>>> hScan(
Publisher<KeyScanCommand> commands) {
return execute(commands, command -> {
Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getOptions(), "ScanOptions must not be null!");
byte[] keyBuf = toByteArray(command.getKey());
Flux<Entry<Object, Object>> flux = Flux.create(new MapReactiveIterator<Object, Object, Entry<Object, Object>>(null, null, 0) {
@Override
public RFuture<MapScanResult<Object, Object>> scanIterator(RedisClient client, long nextIterPos) {
if (command.getOptions().getPattern() == null) {
return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.HSCAN,
keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L));
}
return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.HSCAN,
keyBuf, nextIterPos, "MATCH", command.getOptions().getPattern(),
"COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L));
}
});
Flux<Entry<ByteBuffer, ByteBuffer>> f = flux.map(v -> Collections.singletonMap(ByteBuffer.wrap((byte[])v.getKey()), ByteBuffer.wrap((byte[])v.getValue())).entrySet().iterator().next());
return Mono.just(new CommandResponse<>(command, f));
});
}
@Override
public Flux<CommandResponse<KeyCommand, Flux<ByteBuffer>>> sScan(Publisher<KeyScanCommand> commands) {
return execute(commands, command -> {
Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getOptions(), "ScanOptions must not be null!");
byte[] keyBuf = toByteArray(command.getKey());
Flux<byte[]> flux = Flux.create(new SetReactiveIterator<byte[]>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
if (command.getOptions().getPattern() == null) {
return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.SSCAN,
keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L));
}
return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.SSCAN,
keyBuf, nextIterPos, "MATCH", command.getOptions().getPattern(),
"COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L));
}
});
return Mono.just(new CommandResponse<>(command, flux.map(v -> ByteBuffer.wrap(v))));
});
}
@Override
public RedisSentinelConnection getSentinelConnection() {
if (!redisson.getConfig().isSentinelConfig()) {
throw new InvalidDataAccessResourceUsageException("Redisson is not in Sentinel mode");
}
SentinelConnectionManager manager = ((SentinelConnectionManager)((Redisson)redisson).getConnectionManager());
for (RedisClient client : manager.getSentinels()) {
org.redisson.client.RedisConnection connection = client.connect();
try {
String res = connection.sync(RedisCommands.PING);
if ("pong".equalsIgnoreCase(res)) {
return new RedissonSentinelConnection(connection);
}
} catch (Exception e) {
log.warn("Can't connect to " + client, e);
connection.closeAsync();
}
}
throw new InvalidDataAccessResourceUsageException("Sentinels are not found");
}
@Override
public Iterator<V> iterator(final String pattern, final int count) {
return new RedissonBaseIterator<V>() {
@Override
protected ListScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return scanIterator(getName(), client, nextIterPos, pattern, count);
}
@Override
protected void remove(Object value) {
RedissonSetCache.this.remove((V) value);
}
};
}
@Override
public Flux<CommandResponse<KeyCommand, Flux<Entry<ByteBuffer, ByteBuffer>>>> hScan(
Publisher<KeyScanCommand> commands) {
return execute(commands, command -> {
Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getOptions(), "ScanOptions must not be null!");
byte[] keyBuf = toByteArray(command.getKey());
Flux<Entry<Object, Object>> flux = Flux.create(new MapReactiveIterator<Object, Object, Entry<Object, Object>>(null, null, 0) {
@Override
public RFuture<MapScanResult<Object, Object>> scanIterator(RedisClient client, long nextIterPos) {
if (command.getOptions().getPattern() == null) {
return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.HSCAN,
keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L));
}
return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.HSCAN,
keyBuf, nextIterPos, "MATCH", command.getOptions().getPattern(),
"COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L));
}
});
Flux<Entry<ByteBuffer, ByteBuffer>> f = flux.map(v -> Collections.singletonMap(ByteBuffer.wrap((byte[])v.getKey()), ByteBuffer.wrap((byte[])v.getValue())).entrySet().iterator().next());
return Mono.just(new CommandResponse<>(command, f));
});
}
@Override
public RedisSentinelConnection getSentinelConnection() {
if (!redisson.getConfig().isSentinelConfig()) {
throw new InvalidDataAccessResourceUsageException("Redisson is not in Sentinel mode");
}
SentinelConnectionManager manager = ((SentinelConnectionManager)((Redisson)redisson).getConnectionManager());
for (RedisClient client : manager.getSentinels()) {
org.redisson.client.RedisConnection connection = client.connect();
try {
String res = connection.sync(RedisCommands.PING);
if ("pong".equalsIgnoreCase(res)) {
return new RedissonSentinelConnection(connection);
}
} catch (Exception e) {
log.warn("Can't connect to " + client, e);
connection.closeAsync();
}
}
throw new InvalidDataAccessResourceUsageException("Sentinels are not found");
}
@Override
public Iterator<V> iterator(String pattern, int count) {
return new RedissonBaseIterator<V>() {
@Override
protected ListScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return scanIterator(getName(), client, nextIterPos, pattern, count);
}
@Override
protected void remove(Object value) {
throw new UnsupportedOperationException();
}
};
}
@Override
public RedisSentinelConnection getSentinelConnection() {
if (!redisson.getConfig().isSentinelConfig()) {
throw new InvalidDataAccessResourceUsageException("Redisson is not in Sentinel mode");
}
SentinelConnectionManager manager = ((SentinelConnectionManager)((Redisson)redisson).getConnectionManager());
for (RedisClient client : manager.getSentinels()) {
org.redisson.client.RedisConnection connection = client.connect();
try {
String res = connection.sync(RedisCommands.PING);
if ("pong".equalsIgnoreCase(res)) {
return new RedissonSentinelConnection(connection);
}
} catch (Exception e) {
log.warn("Can't connect to " + client, e);
connection.closeAsync();
}
}
throw new InvalidDataAccessResourceUsageException("Sentinels are not found");
}
@Override
public Iterator<V> iterator(final String pattern, final int count) {
return new RedissonBaseIterator<V>() {
@Override
protected ListScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return scanIterator(client, nextIterPos, pattern, count);
}
@Override
protected void remove(Object value) {
RedissonScoredSortedSet.this.remove((V) value);
}
};
}
public ClientConnectionsEntry(RedisClient client, int poolMinSize, int poolMaxSize, int subscribePoolMinSize, int subscribePoolMaxSize,
ConnectionManager connectionManager, NodeType nodeType) {
this.client = client;
this.freeConnectionsCounter = new AsyncSemaphore(poolMaxSize);
this.connectionManager = connectionManager;
this.nodeType = nodeType;
this.freeSubscribeConnectionsCounter = new AsyncSemaphore(subscribePoolMaxSize);
if (subscribePoolMaxSize > 0) {
connectionManager.getConnectionWatcher().add(subscribePoolMinSize, subscribePoolMaxSize, freeSubscribeConnections, freeSubscribeConnectionsCounter, c -> {
freeSubscribeConnections.remove(c);
return allSubscribeConnections.remove(c);
});
}
connectionManager.getConnectionWatcher().add(poolMinSize, poolMaxSize, freeConnections, freeConnectionsCounter, c -> {
freeConnections.remove(c);
return allConnections.remove(c);
});
}
@Before
public void before() throws IOException, InterruptedException {
if (RedissonRuntimeEnvironment.isTravis) {
RedisRunner.startDefaultRedisServerInstance();
}
RedisClientConfig config = new RedisClientConfig();
config.setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort());
redisClient = RedisClient.create(config);
}
public int stop() {
if (runner.isNosave() && !runner.isRandomDir()) {
RedisClient c = createDefaultRedisClientInstance();
RedisConnection connection = c.connect();
try {
connection.async(new RedisStrictCommand<Void>("SHUTDOWN", "NOSAVE", new VoidReplayConvertor()))
.await(3, TimeUnit.SECONDS);
} catch (InterruptedException interruptedException) {
//shutdown via command failed, lets wait and kill it later.
}
c.shutdown();
connection.closeAsync().syncUninterruptibly();
}
Process p = redisProcess;
p.destroy();
boolean normalTermination = false;
try {
normalTermination = p.waitFor(5, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
//OK lets hurry up by force kill;
}
if (!normalTermination) {
p = p.destroyForcibly();
}
cleanup();
int exitCode = p.exitValue();
return exitCode == 1 && RedissonRuntimeEnvironment.isWindows ? 0 : exitCode;
}
public RedisClient createRedisClientInstance() {
if (redisProcess.isAlive()) {
RedisClientConfig config = new RedisClientConfig();
config.setAddress(runner.getInitialBindAddr(), runner.getPort());
return RedisClient.create(config);
}
throw new IllegalStateException("Redis server instance is not running.");
}
protected void startDNSMonitoring(RedisClient masterHost) {
if (config.getDnsMonitoringInterval() != -1) {
Set<RedisURI> slaveAddresses = config.getSlaveAddresses().stream().map(r -> new RedisURI(r)).collect(Collectors.toSet());
dnsMonitor = new DNSMonitor(this, masterHost,
slaveAddresses, config.getDnsMonitoringInterval(), resolverGroup);
dnsMonitor.start();
}
}
public RedisClient createRedisClientInstance() {
if (redisProcess.isAlive()) {
RedisClientConfig config = new RedisClientConfig();
config.setAddress(runner.getInitialBindAddr(), runner.getPort());
return RedisClient.create(config);
}
throw new IllegalStateException("Redis server instance is not running.");
}
private RFuture<RedisClient> setupMasterEntry(RedisClient client) {
RPromise<RedisClient> result = new RedissonPromise<RedisClient>();
result.onComplete((res, e) -> {
if (e != null) {
client.shutdownAsync();
}
});
RFuture<InetSocketAddress> addrFuture = client.resolveAddr();
addrFuture.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
masterEntry = new ClientConnectionsEntry(
client,
config.getMasterConnectionMinimumIdleSize(),
config.getMasterConnectionPoolSize(),
config.getSubscriptionConnectionMinimumIdleSize(),
config.getSubscriptionConnectionPoolSize(),
connectionManager,
NodeType.MASTER);
int counter = 1;
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
counter++;
}
CountableListener<RedisClient> listener = new CountableListener<>(result, client, counter);
RFuture<Void> writeFuture = writeConnectionPool.add(masterEntry);
writeFuture.onComplete(listener);
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
RFuture<Void> pubSubFuture = pubSubConnectionPool.add(masterEntry);
pubSubFuture.onComplete(listener);
}
});
return result;
}
protected final RFuture<RedisClient> changeMaster(int slot, RedisURI address) {
final MasterSlaveEntry entry = getEntry(slot);
final RedisClient oldClient = entry.getClient();
RFuture<RedisClient> future = entry.changeMaster(address);
future.onComplete((res, e) -> {
if (e == null) {
client2entry.remove(oldClient);
client2entry.put(entry.getClient(), entry);
}
});
return future;
}
@Override
public MasterSlaveEntry getEntry(RedisClient redisClient) {
MasterSlaveEntry entry = client2entry.get(redisClient);
if (entry != null) {
return entry;
}
for (MasterSlaveEntry mentry : client2entry.values()) {
if (mentry.hasSlave(redisClient)) {
return mentry;
}
}
return null;
}
public int stop() {
if (runner.isNosave() && !runner.isRandomDir()) {
RedisClient c = createDefaultRedisClientInstance();
RedisConnection connection = c.connect();
try {
connection.async(new RedisStrictCommand<Void>("SHUTDOWN", "NOSAVE", new VoidReplayConvertor()))
.await(3, TimeUnit.SECONDS);
} catch (InterruptedException interruptedException) {
//shutdown via command failed, lets wait and kill it later.
}
c.shutdown();
connection.closeAsync().syncUninterruptibly();
}
Process p = redisProcess;
p.destroy();
boolean normalTermination = false;
try {
normalTermination = p.waitFor(5, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
//OK lets hurry up by force kill;
}
if (!normalTermination) {
p = p.destroyForcibly();
}
cleanup();
int exitCode = p.exitValue();
return exitCode == 1 && RedissonRuntimeEnvironment.isWindows ? 0 : exitCode;
}
public DNSMonitor(ConnectionManager connectionManager, RedisClient masterHost, Collection<RedisURI> slaveHosts, long dnsMonitoringInterval, AddressResolverGroup<InetSocketAddress> resolverGroup) {
this.resolver = resolverGroup.getResolver(connectionManager.getGroup().next());
masterHost.resolveAddr().syncUninterruptibly();
masters.put(masterHost.getConfig().getAddress(), masterHost.getAddr());
for (RedisURI host : slaveHosts) {
Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(host.getHost(), host.getPort()));
resolveFuture.syncUninterruptibly();
slaves.put(host, resolveFuture.getNow());
}
this.connectionManager = connectionManager;
this.dnsMonitoringInterval = dnsMonitoringInterval;
}
public Publisher<V> iterator() {
return new SetRxIterator<V>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
return ((ScanIterator) instance).scanIteratorAsync(instance.getName(), client, nextIterPos, null, 10);
}
}.create();
}
public int stop() {
if (runner.isNosave() && !runner.isRandomDir()) {
RedisClient c = createDefaultRedisClientInstance();
RedisConnection connection = c.connect();
try {
connection.async(new RedisStrictCommand<Void>("SHUTDOWN", "NOSAVE", new VoidReplayConvertor()))
.await(3, TimeUnit.SECONDS);
} catch (InterruptedException interruptedException) {
//shutdown via command failed, lets wait and kill it later.
}
c.shutdown();
connection.closeAsync().syncUninterruptibly();
}
Process p = redisProcess;
p.destroy();
boolean normalTermination = false;
try {
normalTermination = p.waitFor(5, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
//OK lets hurry up by force kill;
}
if (!normalTermination) {
p = p.destroyForcibly();
}
cleanup();
int exitCode = p.exitValue();
return exitCode == 1 && RedissonRuntimeEnvironment.isWindows ? 0 : exitCode;
}
private Flowable<V> scanIteratorReactive(String pattern, int count) {
return new SetRxIterator<V>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
return ((RedissonScoredSortedSet<V>) instance).scanIteratorAsync(client, nextIterPos, pattern, count);
}
}.create();
}
public RedisClient createRedisClientInstance() {
if (redisProcess.isAlive()) {
RedisClientConfig config = new RedisClientConfig();
config.setAddress(runner.getInitialBindAddr(), runner.getPort());
return RedisClient.create(config);
}
throw new IllegalStateException("Redis server instance is not running.");
}
public RedisClient createRedisClientInstance() {
if (redisProcess.isAlive()) {
RedisClientConfig config = new RedisClientConfig();
config.setAddress(runner.getInitialBindAddr(), runner.getPort());
return RedisClient.create(config);
}
throw new IllegalStateException("Redis server instance is not running.");
}