下面列出了怎么用org.springframework.data.redis.core.ReactiveStringRedisTemplate的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testTemplate() {
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
AtomicLong counter = new AtomicLong();
ReactiveStringRedisTemplate template = new ReactiveStringRedisTemplate(factory);
template.listenTo(ChannelTopic.of("test")).flatMap(message -> {
counter.incrementAndGet();
return Mono.empty();
}).subscribe();
template.listenTo(ChannelTopic.of("test2")).flatMap(message -> {
counter.incrementAndGet();
return Mono.empty();
}).subscribe();
ReactiveRedisConnection connection = factory.getReactiveConnection();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
.until(() -> counter.get() == 1);
}
@Test
public void testTemplate() {
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
AtomicReference<byte[]> msg = new AtomicReference<byte[]>();
ReactiveStringRedisTemplate template = new ReactiveStringRedisTemplate(factory);
template.listenTo(ChannelTopic.of("test")).flatMap(message -> {
msg.set(message.getMessage().getBytes());
return template.delete("myobj");
}).subscribe();
ReactiveRedisConnection connection = factory.getReactiveConnection();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
}
@Test
public void testTemplate() {
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
AtomicLong counter = new AtomicLong();
ReactiveStringRedisTemplate template = new ReactiveStringRedisTemplate(factory);
template.listenTo(ChannelTopic.of("test")).flatMap(message -> {
counter.incrementAndGet();
return Mono.empty();
}).subscribe();
template.listenTo(ChannelTopic.of("test2")).flatMap(message -> {
counter.incrementAndGet();
return Mono.empty();
}).subscribe();
ReactiveRedisConnection connection = factory.getReactiveConnection();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
.until(() -> counter.get() == 1);
}
@Override
public void initialize(GenericApplicationContext context) {
RedisReactiveAutoConfiguration redisAutoConfiguration = new RedisReactiveAutoConfiguration();
context.registerBean("reactiveRedisTemplate", ReactiveRedisTemplate.class,
() -> redisAutoConfiguration.reactiveRedisTemplate(context.getBean(ReactiveRedisConnectionFactory.class), context),
(definition) -> ((RootBeanDefinition) definition).setTargetType(ResolvableType.forClassWithGenerics(ReactiveRedisTemplate.class, Object.class, Object.class)));
context.registerBean("reactiveStringRedisTemplate", ReactiveStringRedisTemplate.class,
() -> redisAutoConfiguration.reactiveStringRedisTemplate(context.getBean(ReactiveRedisConnectionFactory.class)));
}
@Bean
@ConditionalOnMissingBean
public RedisRateLimiter redisRateLimiter(ReactiveStringRedisTemplate redisTemplate,
@Qualifier(RedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript<List<Long>> redisScript,
ConfigurationService configurationService) {
return new RedisRateLimiter(redisTemplate, redisScript, configurationService);
}
public RedisRateLimiter(ReactiveStringRedisTemplate redisTemplate,
RedisScript<List<Long>> script, ConfigurationService configurationService) {
super(Config.class, CONFIGURATION_PROPERTY_NAME, configurationService);
this.redisTemplate = redisTemplate;
this.script = script;
this.initialized.compareAndSet(false, true);
}
/**
* Used when setting default configuration in constructor.
* @param context the ApplicationContext object to be used by this object
* @throws BeansException if thrown by application context methods
*/
@Override
@SuppressWarnings("unchecked")
public void setApplicationContext(ApplicationContext context) throws BeansException {
if (initialized.compareAndSet(false, true)) {
if (this.redisTemplate == null) {
this.redisTemplate = context.getBean(ReactiveStringRedisTemplate.class);
}
this.script = context.getBean(REDIS_SCRIPT_NAME, RedisScript.class);
if (context.getBeanNamesForType(ConfigurationService.class).length > 0) {
setConfigurationService(context.getBean(ConfigurationService.class));
}
}
}
@Before
public void setUp() {
when(applicationContext.getBean(ReactiveStringRedisTemplate.class))
.thenReturn(redisTemplate);
when(applicationContext.getBeanNamesForType(ConfigurationService.class))
.thenReturn(CONFIGURATION_SERVICE_BEANS);
redisRateLimiter = new RedisRateLimiter(DEFAULT_REPLENISH_RATE,
DEFAULT_BURST_CAPACITY);
}
@Bean
ReactiveStringRedisTemplate template(ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
return new ReactiveStringRedisTemplate(reactiveRedisConnectionFactory);
}
@Bean
ReactiveStringRedisTemplate template(ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
return new ReactiveStringRedisTemplate(reactiveRedisConnectionFactory);
}
public RedisChatMessagePublisher(ReactiveStringRedisTemplate reactiveStringRedisTemplate, RedisAtomicInteger chatMessageCounter, RedisAtomicLong activeUserCounter, ObjectMapper objectMapper) {
this.reactiveStringRedisTemplate = reactiveStringRedisTemplate;
this.chatMessageCounter = chatMessageCounter;
this.activeUserCounter = activeUserCounter;
this.objectMapper = objectMapper;
}
public RedisChatMessageListener(ReactiveStringRedisTemplate reactiveStringRedisTemplate, ChatWebSocketHandler chatWebSocketHandler) {
this.reactiveStringRedisTemplate = reactiveStringRedisTemplate;
this.chatWebSocketHandler = chatWebSocketHandler;
}
public RedisRepository(ReactiveStringRedisTemplate redis) {
this.redis = redis;
}