下面列出了io.netty.channel.epoll.EpollDatagramChannel#org.redisson.config.Config 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@BeforeClass
public static void before() throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setSubscriptionMode(SubscriptionMode.SLAVE)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
redisson = Redisson.create(config);
connection = new RedissonClusterConnection(redisson);
}
@Test
public void testAsync() throws Exception {
RedisProcess runner = new RedisRunner()
.nosave()
.randomDir()
.port(6311)
.run();
URL configUrl = getClass().getResource("redisson-jcache.json");
Config cfg = Config.fromJSON(configUrl);
Configuration<String, String> config = RedissonConfiguration.fromConfig(cfg);
Cache<String, String> cache = Caching.getCachingProvider().getCacheManager()
.createCache("test", config);
CacheAsync<String, String> async = cache.unwrap(CacheAsync.class);
async.putAsync("1", "2").get();
assertThat(async.getAsync("1").get()).isEqualTo("2");
cache.close();
runner.stop();
}
@Override
public <T extends Codec, K extends RObject> T getCodec(RObjectField anno, Class<?> cls, Class<K> rObjectClass, String fieldName, Config config) {
try {
if (!ClassUtils.getDeclaredField(cls, fieldName).isAnnotationPresent(anno.getClass())) {
throw new IllegalArgumentException("Annotation RObjectField does not present on field " + fieldName + " of type [" + cls.getCanonicalName() + "]");
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
if (rObjectClass.isInterface()) {
throw new IllegalArgumentException("Cannot lookup an interface class of RObject [" + rObjectClass.getCanonicalName() + "]. Concrete class only.");
}
Class<?> codecClass;
if (anno.codec() == RObjectField.DEFAULT.class) {
codecClass = config.getCodec().getClass();
} else {
codecClass = anno.codec();
}
return this.<T>getCodec((Class<T>) codecClass);
}
@Test
public void testExpirationWithMaxSize() throws InterruptedException {
Config config = new Config();
config.useSingleServer().setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort());
config.setMaxCleanUpDelay(2);
config.setMinCleanUpDelay(1);
RedissonClient redisson = Redisson.create(config);
RMapCache<String, String> map = redisson.getMapCache("test", StringCodec.INSTANCE);
assertThat(map.trySetMaxSize(2)).isTrue();
map.put("1", "1", 3, TimeUnit.SECONDS);
map.put("2", "2", 0, TimeUnit.SECONDS, 3, TimeUnit.SECONDS);
map.put("3", "3", 3, TimeUnit.SECONDS);
map.put("4", "4", 0, TimeUnit.SECONDS, 3, TimeUnit.SECONDS);
Thread.sleep(5000);
assertThat(map.size()).isZero();
assertThat(redisson.getKeys().count()).isEqualTo(2);
redisson.shutdown();
}
private Config loadConfig(ClassLoader classLoader, String fileName) {
InputStream is = classLoader.getResourceAsStream(fileName);
if (is != null) {
try {
return Config.fromJSON(is);
} catch (IOException e) {
try {
is = classLoader.getResourceAsStream(fileName);
return Config.fromYAML(is);
} catch (IOException e1) {
throw new CacheException("Can't parse yaml config", e1);
}
}
}
return null;
}
@Activate
public void activate(ComponentContext componentContext) {
config = componentContext.getProperties();
String address = (config.get("address") != null) ? config.get("address").toString() : ADDRESS_DEFAULT;
String mode = (config.get("map") != null) ? config.get("map").toString() : MODE_DEFAULT;
String masterAddress = (config.get("masterAddress") != null) ? config.get("masterAddress").toString() : null;
String masterName = (config.get("masterName") != null) ? config.get("masterName").toString() : null;
int scanInterval = (config.get("scanInterval") != null) ? Integer.parseInt(config.get("scanInterval").toString()) : 2000;
Config redissonConfig = new Config();
if (mode.equalsIgnoreCase("Single")) {
redissonConfig.useSingleServer().setAddress(address);
} else if (mode.equalsIgnoreCase("Master_Slave")) {
redissonConfig.useMasterSlaveServers().setMasterAddress(masterAddress).addSlaveAddress(address);
} else if (mode.equalsIgnoreCase("Sentinel")) {
redissonConfig.useSentinelServers().addSentinelAddress(masterName).addSentinelAddress(address);
} else if (mode.equalsIgnoreCase("Cluster")) {
redissonConfig.useClusterServers().setScanInterval(scanInterval).addNodeAddress(address);
}
redissonClient = Redisson.create(redissonConfig);
}
@BeforeClass
public static void setUp() throws Exception {
String testServerAddress = "redis://127.0.0.1:6371";
redisServer = new RedisServer(6371);
if (redisServer.isActive()) {
redisServer.stop();
}
redisServer.start();
RedisLockConfiguration redisLockConfiguration = new SystemPropertiesRedisLockConfiguration() {
@Override
public String getRedisServerAddress() {
return testServerAddress;
}
};
Config redissonConfig = new Config();
redissonConfig.useSingleServer().setAddress(testServerAddress).setTimeout(10000);
redisLock = new RedisLock((Redisson) Redisson.create(redissonConfig), redisLockConfiguration);
// Create another instance of redisson for tests.
config = new Config();
config.useSingleServer().setAddress(testServerAddress).setTimeout(10000);
redisson = Redisson.create(config);
}
public static Config createConfig() {
// String redisAddress = System.getProperty("redisAddress");
// if (redisAddress == null) {
// redisAddress = "127.0.0.1:6379";
// }
Config config = new Config();
// config.setCodec(new MsgPackJacksonCodec());
// config.useSentinelServers().setMasterName("mymaster").addSentinelAddress("127.0.0.1:26379", "127.0.0.1:26389");
// config.useClusterServers().addNodeAddress("127.0.0.1:7004", "127.0.0.1:7001", "127.0.0.1:7000");
config.useSingleServer()
.setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort());
// .setPassword("mypass1");
// config.useMasterSlaveConnection()
// .setMasterAddress("127.0.0.1:6379")
// .addSlaveAddress("127.0.0.1:6399")
// .addSlaveAddress("127.0.0.1:6389");
return config;
}
@Test
public void testMigrate() throws FailedToStartRedisException, IOException, InterruptedException {
RedisProcess runner = new RedisRunner()
.appendonly(true)
.randomDir()
.randomPort()
.run();
RBucket<String> bucket = redisson.getBucket("test");
bucket.set("someValue");
bucket.migrate(runner.getRedisServerBindAddress(), runner.getRedisServerPort(), 0, 5000);
Config config = new Config();
config.useSingleServer().setAddress(runner.getRedisServerAddressAndPort());
RedissonClient r = Redisson.create(config);
RBucket<String> bucket2 = r.getBucket("test");
assertThat(bucket2.get()).isEqualTo("someValue");
assertThat(bucket.isExists()).isFalse();
runner.stop();
}
@BeforeClass
public static void before() throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setSubscriptionMode(SubscriptionMode.SLAVE)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
redisson = Redisson.create(config);
connection = new RedissonClusterConnection(redisson);
}
@Test
public void testSyncSlavesWait() {
Config config = createConfig();
config.useSingleServer()
.setConnectionMinimumIdleSize(1)
.setConnectionPoolSize(1);
RedissonClient redisson = Redisson.create(config);
try {
batchOptions
.skipResult()
.syncSlaves(2, 1, TimeUnit.SECONDS);
RBatch batch = redisson.createBatch(batchOptions);
RBucketAsync<Integer> bucket = batch.getBucket("1");
bucket.setAsync(1);
batch.execute();
String[] t = redisson.getKeys().getKeysStreamByPattern("*").toArray(String[]::new);
} catch (Exception ex) {
ex.printStackTrace();
}
}
public void testTaskFinishing() throws Exception {
AtomicInteger counter = new AtomicInteger();
new MockUp<TasksRunnerService>() {
@Mock
private void finish(Invocation invocation, String requestId) {
if (counter.incrementAndGet() > 1) {
invocation.proceed();
}
}
};
Config config = createConfig();
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test2", 1));
node.shutdown();
node = RedissonNode.create(nodeConfig);
node.start();
RExecutorService executor = redisson.getExecutorService("test2");
RExecutorFuture<?> f = executor.submit(new FailoverTask("finished"));
Thread.sleep(2000);
node.shutdown();
f.get();
assertThat(redisson.<Boolean>getBucket("finished").get()).isTrue();
}
public static Config createConfig() {
// String redisAddress = System.getProperty("redisAddress");
// if (redisAddress == null) {
// redisAddress = "127.0.0.1:6379";
// }
Config config = new Config();
// config.setCodec(new MsgPackJacksonCodec());
// config.useSentinelServers().setMasterName("mymaster").addSentinelAddress("127.0.0.1:26379", "127.0.0.1:26389");
// config.useClusterServers().addNodeAddress("127.0.0.1:7004", "127.0.0.1:7001", "127.0.0.1:7000");
config.useSingleServer()
.setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort());
// .setPassword("mypass1");
// config.useMasterSlaveConnection()
// .setMasterAddress("127.0.0.1:6379")
// .addSlaveAddress("127.0.0.1:6399")
// .addSlaveAddress("127.0.0.1:6389");
return config;
}
/**
* Create a new connection manager for a single server using the supplied address.
*
* @return a new connection manager
*/
public RedissonClient create() {
final String address = null == this.address ? "redis://127.0.0.1:6379" : this.address;
final int timeout = this.timeout < 0 ? 60 * 1000 : this.timeout;
// TODO: support all the other types supported by the RedissonClientFactory.
// TODO: Create a hash of config options so that only one manager is used per unique server. This should
// improve contention.
Config config = new Config();
config.useSingleServer().
setConnectionPoolSize(1).
setConnectionMinimumIdleSize(1).
setAddress(address).
setTimeout(timeout);
return Redisson.create(config);
}
public static void main(String[] args) {
Config config = new Config();
config.useClusterServers()
.addNodeAddress("127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003");
RedissonClient redisson = Redisson.create(config);
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("myExecutor", 1));
RedissonNode node = RedissonNode.create(nodeConfig);
node.start();
RExecutorService e = redisson.getExecutorService("myExecutor");
e.execute(new RunnableTask());
e.submit(new CallableTask());
e.shutdown();
node.shutdown();
}
public static void main(String[] args) {
Config config = new Config();
config.useSingleServer().setAddress("redis://localhost:6379");
RedissonClient client = Redisson.create(config);
RRateLimiter rateLimiter = client.getRateLimiter("rate_limiter");
rateLimiter.trySetRate(RateType.OVERALL, 1, 5, RateIntervalUnit.SECONDS);
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executorService.submit(() -> {
try {
rateLimiter.acquire();
System.out.println("时间:" + System.currentTimeMillis() + ",线程" + Thread.currentThread().getId()
+ "进入数据区:" + System.currentTimeMillis());
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
public void testLeak() throws InterruptedException {
Config config = new Config();
config.useSingleServer()
.setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort());
RedissonClient localRedisson = Redisson.create(config);
String key = RandomString.make(120);
for (int i = 0; i < 500; i++) {
RMapCache<String, String> cache = localRedisson.getMapCache("mycache");
RLock keyLock = cache.getLock(key);
keyLock.lockInterruptibly(10, TimeUnit.SECONDS);
try {
cache.get(key);
cache.put(key, RandomString.make(4*1024*1024), 5, TimeUnit.SECONDS);
} finally {
if (keyLock != null) {
keyLock.unlock();
}
}
}
}
@JsonIgnore
public Config configureRedis() throws URISyntaxException {
var redisConfig = new Config();
var redis = new URI(Settings.instance().redisUrl());
if (!redis.getScheme().equals("redis") && !redis.getScheme().equals("rediss")) {
throw new IllegalArgumentException("Invalid scheme for Redis connection URI!");
}
var database = redis.getPath() == null || redis.getPath().isBlank() ? 0
: Integer.parseUnsignedInt(redis.getPath().substring(1));
redisConfig.setTransportMode(Epoll.isAvailable() ? TransportMode.EPOLL : TransportMode.NIO);
redisConfig.setNettyThreads(16);
redisConfig.useSingleServer()
.setAddress(redis.getScheme() + "://"
+ requireNonNullElse(redis.getHost(), "localhost") + ":"
+ requireNonNullElse(redis.getPort(), 6379))
.setDatabase(database)
.setPassword(redis.getUserInfo());
return redisConfig;
}
@BeforeClass
public static void before() throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setSubscriptionMode(SubscriptionMode.SLAVE)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
redisson = Redisson.create(config);
connection = new RedissonClusterConnection(redisson);
}
/**
* @author kuangyoubo
* @author fanpan26
* 优先级
* 通过名字注入 > 配置文件 > 参数配置 > 默认
*/
private void initRedis() {
if( redisConfig.useInjectRedissonClient() ) {
logger.info("Get the RedissonClient through injection, Bean name is \"{}\"", redisConfig.getClientBeanName());
try {
redissonClient = applicationContext.getBean(redisConfig.getClientBeanName(), RedissonClient.class);
return;
} catch (BeansException e) {
logger.warn("RedissonClient is not found, Recreate RedissonClient on configuration information.");
}
}
/**
* 优先级
* 配置文件 > 参数配置 > 默认
*/
Config config = getConfigByFile();
if(config == null) {
config = redisConfig.useConfigParameter() ? redisConfig.getClusterOrSentinelConfig() : getSingleServerConfig();
}
redissonClient = Redisson.create(config);
}
@Bean(name = "redissonClient", destroyMethod = "shutdown")
public RedissonClient redisson() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://" + redisHost + ":" + redisPort)
.setDatabase(0)
.setConnectTimeout(5000)
.setTimeout(3000);
return Redisson.create(config);
}
@Before
@Override
public void before() throws IOException, InterruptedException {
super.before();
Config config = createConfig();
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test", 1));
node = RedissonNode.create(nodeConfig);
node.start();
}
@Override
public <T extends Codec> T getCodec(REntity anno, Class<?> cls, Config config) {
if (!ClassUtils.isAnnotationPresent(cls, anno.annotationType())) {
throw new IllegalArgumentException("Annotation REntity does not present on type [" + cls.getCanonicalName() + "]");
}
Class<?> codecClass;
if (anno.codec() == REntity.DEFAULT.class) {
codecClass = config.getCodec().getClass();
} else {
codecClass = anno.codec();
}
return this.getCodec((Class<T>) codecClass);
}
@Test
public void testInCluster() throws Exception {
RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave();
RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave();
RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave();
RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave();
RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave();
RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterRunner.ClusterProcesses process = clusterRunner.run();
Thread.sleep(5000);
Config config = new Config();
config.useClusterServers()
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RLock lock = redisson.getLock("myLock");
lock.lock();
assertThat(lock.isLocked()).isTrue();
lock.unlock();
assertThat(lock.isLocked()).isFalse();
redisson.shutdown();
process.shutdown();
}
@Test
public void testTakeReattach() throws Exception {
RedisProcess runner = new RedisRunner()
.nosave()
.randomDir()
.randomPort()
.run();
Config config = new Config();
config.useSingleServer().setAddress(runner.getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RBlockingQueue<Integer> queue1 = getQueue(redisson);
RFuture<Integer> f = queue1.takeAsync();
f.await(1, TimeUnit.SECONDS);
runner.stop();
runner = new RedisRunner()
.port(runner.getRedisServerPort())
.nosave()
.randomDir()
.run();
queue1.put(123);
// check connection rotation
for (int i = 0; i < 10; i++) {
queue1.put(i + 10000);
}
assertThat(queue1.size()).isEqualTo(10);
Integer result = f.get(1, TimeUnit.SECONDS);
assertThat(result).isEqualTo(123);
runner.stop();
redisson.shutdown();
}
@Override
public BeanDefinition parse(Element element, ParserContext parserContext) {
//Sort out the Config Class
BeanDefinitionBuilder configBuilder
= helper.createBeanDefinitionBuilder(element, parserContext,
Config.class);
String configId = helper.getId(null, configBuilder, parserContext);
helper.parseAttributes(element, parserContext, configBuilder);
helper.registerBeanDefinition(configBuilder, configId,
null, parserContext);
//Do the main Redisson bean
BeanDefinitionBuilder builder
= helper.createBeanDefinitionBuilder(element, parserContext,
Redisson.class);
builder.setFactoryMethod("create");
builder.setDestroyMethodName("shutdown");
builder.addConstructorArgReference(configId);
parserContext.getDelegate().parseQualifierElements(element,
builder.getRawBeanDefinition());
String id = helper.getId(element, builder, parserContext);
helper.parseAttributes(element, parserContext, configBuilder);
//Sort out all the nested elements
parseChildElements(element, configId, id, builder, parserContext);
helper.registerBeanDefinition(builder, id,
helper.parseAliase(element), parserContext);
return builder.getBeanDefinition();
}
private RedissonClient createClient(NioEventLoopGroup group, String host) {
Config config1 = new Config();
config1.useSingleServer().setAddress(host);
config1.setEventLoopGroup(group);
RedissonClient client1 = Redisson.create(config1);
client1.getKeys().flushdb();
return client1;
}
@Test
public void testReconnection() throws IOException, InterruptedException, TimeoutException {
RedisProcess runner = new RedisRunner()
.appendonly(true)
.randomDir()
.randomPort()
.run();
Config config = new Config();
config.useSingleServer().setAddress(runner.getRedisServerAddressAndPort());
RedissonClient r = Redisson.create(config);
r.getBucket("myBucket").set(1);
assertThat(r.getBucket("myBucket").get()).isEqualTo(1);
Assert.assertEquals(0, runner.stop());
AtomicBoolean hasError = new AtomicBoolean();
try {
r.getBucket("myBucket").get();
} catch (Exception e) {
// skip error
hasError.set(true);
}
assertThat(hasError.get()).isTrue();
RedisProcess pp = new RedisRunner()
.appendonly(true)
.port(runner.getRedisServerPort())
.dir(runner.getDefaultDir())
.run();
assertThat(r.getBucket("myBucket").get()).isEqualTo(1);
r.shutdown();
Assert.assertEquals(0, pp.stop());
}
/**
* 单机模式自动装配
* @return
*/
@Bean
@ConditionalOnProperty(name = "redisson.address")
public RedissonClient redissonSingle() {
Config config = new Config();
SingleServerConfig serverConfig = config.useSingleServer()
.setAddress(redssionProperties.getAddress())
.setTimeout(redssionProperties.getTimeout())
.setConnectionPoolSize(redssionProperties.getConnectionPoolSize())
.setConnectionMinimumIdleSize(redssionProperties.getConnectionMinimumIdleSize());
return Redisson.create(config);
}
/**
* 单机模式
*
* @param redissonProperties redisson配置
* @return client
*/
static RedissonClient redissonSingle(RedissonProperties redissonProperties) {
Config config = new Config();
SingleServerConfig serverConfig = config.useSingleServer()
.setAddress(redissonProperties.getAddress()).setTimeout(redissonProperties.getTimeout())
.setConnectionPoolSize(redissonProperties.getConnectionPoolSize())
.setConnectionMinimumIdleSize(redissonProperties.getConnectionMinimumIdleSize());
if (redissonProperties.getPassword() != null && redissonProperties.getPassword().length() > 0) {
serverConfig.setPassword(redissonProperties.getPassword());
}
return Redisson.create(config);
}