下面列出了怎么用org.springframework.data.redis.connection.ReactiveSubscription的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testSubscribe() {
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
ReactiveRedisConnection connection = factory.getReactiveConnection();
Mono<ReactiveSubscription> s = connection.pubSubCommands().createSubscription();
AtomicReference<byte[]> msg = new AtomicReference<byte[]>();
ReactiveSubscription ss = s.block();
ss.subscribe(ByteBuffer.wrap("test".getBytes())).block();
ss.receive().doOnEach(message -> {
msg.set(message.get().getMessage().array());
}).subscribe();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
ss.unsubscribe();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
}
@Test
public void testUnSubscribe() {
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
ReactiveRedisConnection connection = factory.getReactiveConnection();
Mono<ReactiveSubscription> s = connection.pubSubCommands().createSubscription();
AtomicReference<byte[]> msg = new AtomicReference<byte[]>();
ReactiveSubscription ss = s.block();
ss.subscribe(ByteBuffer.wrap("test".getBytes())).block();
ss.receive().doOnEach(message -> {
msg.set(message.get().getMessage().array());
}).subscribe();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
ss.unsubscribe();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
}
@Test
public void testSubscribe() {
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
ReactiveRedisConnection connection = factory.getReactiveConnection();
Mono<ReactiveSubscription> s = connection.pubSubCommands().createSubscription();
AtomicReference<byte[]> msg = new AtomicReference<byte[]>();
ReactiveSubscription ss = s.block();
ss.subscribe(ByteBuffer.wrap("test".getBytes())).block();
ss.receive().doOnEach(message -> {
msg.set(message.get().getMessage().array());
}).subscribe();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
ss.unsubscribe();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
}
@Test
public void testUnSubscribe() {
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
ReactiveRedisConnection connection = factory.getReactiveConnection();
Mono<ReactiveSubscription> s = connection.pubSubCommands().createSubscription();
AtomicReference<byte[]> msg = new AtomicReference<byte[]>();
ReactiveSubscription ss = s.block();
ss.subscribe(ByteBuffer.wrap("test".getBytes())).block();
ss.receive().doOnEach(message -> {
msg.set(message.get().getMessage().array());
}).subscribe();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
ss.unsubscribe();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
}
@Test
public void testSubscribe() {
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
ReactiveRedisConnection connection = factory.getReactiveConnection();
Mono<ReactiveSubscription> s = connection.pubSubCommands().createSubscription();
AtomicReference<byte[]> msg = new AtomicReference<byte[]>();
ReactiveSubscription ss = s.block();
ss.subscribe(ByteBuffer.wrap("test".getBytes())).block();
ss.receive().doOnEach(message -> {
msg.set(message.get().getMessage().array());
}).subscribe();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
ss.unsubscribe();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
}
@Test
public void testUnSubscribe() {
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
ReactiveRedisConnection connection = factory.getReactiveConnection();
Mono<ReactiveSubscription> s = connection.pubSubCommands().createSubscription();
AtomicReference<byte[]> msg = new AtomicReference<byte[]>();
ReactiveSubscription ss = s.block();
ss.subscribe(ByteBuffer.wrap("test".getBytes())).block();
ss.receive().doOnEach(message -> {
msg.set(message.get().getMessage().array());
}).subscribe();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
ss.unsubscribe();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
}
public Mono<Void> subscribeMessageChannelAndPublishOnWebSocket() {
return reactiveStringRedisTemplate.listenTo(new PatternTopic(MESSAGE_TOPIC))
.map(ReactiveSubscription.Message::getMessage)
.flatMap(message -> ObjectStringConverter.stringToObject(message, ChatMessage.class))
.filter(chatMessage -> !chatMessage.getMessage().isEmpty())
.flatMap(chatWebSocketHandler::sendMessage)
.then();
}
@Override
public Mono<ReactiveSubscription> createSubscription() {
return Mono.just(new RedissonReactiveSubscription(executorService.getConnectionManager()));
}
@Override
public Mono<ReactiveSubscription> createSubscription() {
return Mono.just(new RedissonReactiveSubscription(executorService.getConnectionManager()));
}
@Override
public Mono<ReactiveSubscription> createSubscription() {
return Mono.just(new RedissonReactiveSubscription(executorService.getConnectionManager()));
}