下面列出了io.reactivex.functions.LongConsumer#org.redisson.api.listener.MessageListener 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public <T extends DispatchEventMessage> void subscribe(PubSubEventType type, final PubSubListener<T> listener, Class<T> clazz) {
String name = type.toString();
RTopic topic = redissonSub.getTopic(name);
int regId = topic.addListener(DispatchEventMessage.class, new MessageListener<DispatchEventMessage>() {
@Override
public void onMessage(CharSequence channel, DispatchEventMessage msg) {
if (!nodeId.equals(msg.getNodeId())) {
listener.onMessage((T)msg);
}
}
});
Queue<Integer> list = map.get(name);
if (list == null) {
list = new ConcurrentLinkedQueue<Integer>();
Queue<Integer> oldList = map.putIfAbsent(name, list);
if (oldList != null) {
list = oldList;
}
}
list.add(regId);
}
public static void main(String[] args) throws InterruptedException {
// connects to 127.0.0.1:6379 by default
RedissonClient redisson = Redisson.create();
CountDownLatch latch = new CountDownLatch(1);
RTopic topic = redisson.getTopic("topic2");
topic.addListener(String.class, new MessageListener<String>() {
@Override
public void onMessage(CharSequence channel, String msg) {
latch.countDown();
}
});
topic.publish("msg");
latch.await();
redisson.shutdown();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
if (isTerminated()) {
return true;
}
CountDownLatch latch = new CountDownLatch(1);
MessageListener<Long> listener = new MessageListener<Long>() {
@Override
public void onMessage(CharSequence channel, Long msg) {
if (msg == TERMINATED_STATE) {
latch.countDown();
}
}
};
int listenerId = terminationTopic.addListener(Long.class, listener);
if (isTerminated()) {
terminationTopic.removeListener(listenerId);
return true;
}
boolean res = latch.await(timeout, unit);
terminationTopic.removeListener(listenerId);
return res;
}
@Override
public void removeListener(MessageListener<?> listener) {
AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
acquire(semaphore);
PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName);
if (entry == null) {
semaphore.release();
return;
}
entry.removeListener(channelName, listener);
if (!entry.hasListeners(channelName)) {
subscribeService.unsubscribe(channelName, semaphore);
} else {
semaphore.release();
}
}
@Override
public RFuture<Void> removeListenerAsync(MessageListener<?> listener) {
RPromise<Void> promise = new RedissonPromise<Void>();
AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
semaphore.acquire(() -> {
PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName);
if (entry == null) {
semaphore.release();
promise.trySuccess(null);
return;
}
entry.removeListener(channelName, listener);
if (!entry.hasListeners(channelName)) {
subscribeService.unsubscribe(channelName, semaphore)
.onComplete(new TransferListener<Void>(promise));
} else {
semaphore.release();
promise.trySuccess(null);
}
});
return promise;
}
@Test
public void testRemoveByInstance() throws InterruptedException {
RedissonClient redisson = BaseTest.createInstance();
RTopic topic1 = redisson.getTopic("topic1");
MessageListener listener = new MessageListener() {
@Override
public void onMessage(CharSequence channel, Object msg) {
Assert.fail();
}
};
topic1.addListener(Message.class, listener);
topic1 = redisson.getTopic("topic1");
topic1.removeListener(listener);
topic1.publish(new Message("123"));
redisson.shutdown();
}
@Test
public void testRemoveListenerById() throws InterruptedException {
RTopicReactive topic1 = redisson.getTopic("topic1");
MessageListener listener = new MessageListener() {
@Override
public void onMessage(CharSequence channel, Object msg) {
Assert.fail();
}
};
Mono<Integer> res = topic1.addListener(Message.class, listener);
Integer listenerId = res.block();
topic1 = redisson.getTopic("topic1");
topic1.removeListener(listenerId);
topic1.publish(new Message("123"));
}
@Test
public void testRemoveListenerByInstance() throws InterruptedException {
RTopicReactive topic1 = redisson.getTopic("topic1");
MessageListener listener = new MessageListener() {
@Override
public void onMessage(CharSequence channel, Object msg) {
Assert.fail();
}
};
topic1.addListener(Message.class, listener).block();
topic1 = redisson.getTopic("topic1");
topic1.removeListener(listener);
topic1.publish(new Message("123"));
}
@Test
public void testRemoveListenerById() throws InterruptedException {
RTopicRx topic1 = redisson.getTopic("topic1");
MessageListener listener = new MessageListener() {
@Override
public void onMessage(CharSequence channel, Object msg) {
Assert.fail();
}
};
Single<Integer> res = topic1.addListener(Message.class, listener);
Integer listenerId = res.blockingGet();
topic1 = redisson.getTopic("topic1");
topic1.removeListener(listenerId);
topic1.publish(new Message("123"));
}
@Test
public void testRemoveListenerByInstance() throws InterruptedException {
RTopicRx topic1 = redisson.getTopic("topic1");
MessageListener listener = new MessageListener() {
@Override
public void onMessage(CharSequence channel, Object msg) {
Assert.fail();
}
};
topic1.addListener(Message.class, listener);
topic1 = redisson.getTopic("topic1");
topic1.removeListener(listener);
topic1.publish(new Message("123"));
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
redissonClient.getTopic(NamingConstant.TOPIC_APP_CONFIG_REFRESH).addListener(String.class,
new MessageListener<String>() {
@Override
public void onMessage(CharSequence channel, String msg) {
contextRefresher.refresh();
log.info("业务模块配置刷新完毕");
}
});
}
/**
* @return Observable which returns all incoming messages on this channel.
*/
public Observable<Message> observe() {
return Observable.create(emitter -> {
MessageListener<JsonObject> listener = (t, message) -> {
if (message != null && message.containsKey("id") && message.containsKey("payload")) {
emitter.onNext(new Message(message));
}
};
topic.addListener(JsonObject.class, listener)
.doOnError(err -> logger.error("Cannot handle message", err))
.subscribe();
emitter.setCancellable(() -> topic.removeListener(listener));
});
}
private static void init(RedissonClient redisson) {
if (!inited) {
synchronized (GuavaRedisCache.class) {
if (!inited) {
topic = redisson.getTopic(CACHE_CHANGE_TOPIC);
topic.addListener(CacheChangedVo.class, new MessageListener<CacheChangedVo>() {
@Override
public void onMessage(CharSequence channel, CacheChangedVo cacheChangedVo) {
String clientid = cacheChangedVo.getClientId();
if (StrUtil.isBlank(clientid)) {
log.error("clientid is null");
return;
}
if (Objects.equals(CacheChangedVo.CLIENTID, clientid)) {
log.debug("自己发布的消息,{}", clientid);
return;
}
String cacheName = cacheChangedVo.getCacheName();
GuavaRedisCache guavaRedisCache = GuavaRedisCache.getCache(cacheName);
if (guavaRedisCache == null) {
log.info("不能根据cacheName[{}]找到GuavaRedisCache对象", cacheName);
return;
}
CacheChangeType type = cacheChangedVo.getType();
if (type == CacheChangeType.PUT || type == CacheChangeType.UPDATE || type == CacheChangeType.REMOVE) {
String key = cacheChangedVo.getKey();
guavaRedisCache.guavaCache.remove(key);
} else if (type == CacheChangeType.CLEAR) {
guavaRedisCache.guavaCache.clear();
}
}
});
inited = true;
}
}
}
}
private static void init(RedissonClient redisson) {
if (!inited) {
synchronized (CaffeineRedisCache.class) {
if (!inited) {
topic = redisson.getTopic(CACHE_CHANGE_TOPIC);
topic.addListener(CacheChangedVo.class, new MessageListener<CacheChangedVo>() {
@Override
public void onMessage(CharSequence channel, CacheChangedVo cacheChangedVo) {
String clientid = cacheChangedVo.getClientId();
if (StrUtil.isBlank(clientid)) {
log.error("clientid is null");
return;
}
if (Objects.equals(CacheChangedVo.CLIENTID, clientid)) {
log.debug("自己发布的消息,{}", clientid);
return;
}
String cacheName = cacheChangedVo.getCacheName();
CaffeineRedisCache caffeineRedisCache = CaffeineRedisCache.getCache(cacheName);
if (caffeineRedisCache == null) {
log.info("不能根据cacheName[{}]找到CaffeineRedisCache对象", cacheName);
return;
}
CacheChangeType type = cacheChangedVo.getType();
if (type == CacheChangeType.PUT || type == CacheChangeType.UPDATE || type == CacheChangeType.REMOVE) {
String key = cacheChangedVo.getKey();
caffeineRedisCache.localCache.remove(key);
} else if (type == CacheChangeType.CLEAR) {
caffeineRedisCache.localCache.clear();
}
}
});
inited = true;
}
}
}
}
public <M> Flux<M> getMessages(Class<M> type) {
return Flux.<M>create(emitter -> {
emitter.onRequest(n -> {
AtomicLong counter = new AtomicLong(n);
RFuture<Integer> t = topic.addListenerAsync(type, new MessageListener<M>() {
@Override
public void onMessage(CharSequence channel, M msg) {
emitter.next(msg);
if (counter.decrementAndGet() == 0) {
topic.removeListenerAsync(this);
emitter.complete();
}
}
});
t.onComplete((id, e) -> {
if (e != null) {
emitter.error(e);
return;
}
emitter.onDispose(() -> {
topic.removeListenerAsync(id);
});
});
});
});
}
public RedissonBaseAdder(CommandAsyncExecutor connectionManager, String name, RedissonClient redisson) {
super(connectionManager, name);
topic = redisson.getTopic(suffixName(getName(), "topic"), LongCodec.INSTANCE);
semaphore = redisson.getSemaphore(suffixName(getName(), "semaphore"));
listenerId = topic.addListener(Long.class, new MessageListener<Long>() {
@Override
public void onMessage(CharSequence channel, Long msg) {
if (msg == SUM_MSG) {
RFuture<T> addAndGetFuture = addAndGetAsync();
addAndGetFuture.onComplete((res, e) -> {
if (e != null) {
log.error("Can't increase sum", e);
return;
}
semaphore.releaseAsync().onComplete((r, ex) -> {
if (ex != null) {
log.error("Can't release semaphore", ex);
return;
}
});
});
}
if (msg == CLEAR_MSG) {
doReset();
semaphore.releaseAsync().onComplete((res, e) -> {
if (e != null) {
log.error("Can't release semaphore", e);
}
});
}
}
});
}
public <M> Flowable<M> getMessages(Class<M> type) {
ReplayProcessor<M> p = ReplayProcessor.create();
return p.doOnRequest(new LongConsumer() {
@Override
public void accept(long n) throws Exception {
AtomicLong counter = new AtomicLong(n);
RFuture<Integer> t = topic.addListenerAsync(type, new MessageListener<M>() {
@Override
public void onMessage(CharSequence channel, M msg) {
p.onNext(msg);
if (counter.decrementAndGet() == 0) {
topic.removeListenerAsync(this);
p.onComplete();
}
}
});
t.onComplete((id, e) -> {
if (e != null) {
p.onError(e);
return;
}
p.doOnCancel(new Action() {
@Override
public void run() throws Exception {
topic.removeListenerAsync(id);
}
});
});
}
});
}
public void testPubSub() throws InterruptedException, ExecutionException {
RTopic topic = redisson.getTopic("simple");
topic.addListener(String.class, new MessageListener<String>() {
@Override
public void onMessage(CharSequence channel, String msg) {
System.out.println("msg: " + msg);
}
});
for (int i = 0; i < 100; i++) {
Thread.sleep(1000);
topic.publish("test" + i);
}
}
@Override
public void registerWorkers(WorkerOptions options) {
if (options.getWorkers() == 0) {
throw new IllegalArgumentException("workers amount can't be zero");
}
QueueTransferTask task = new QueueTransferTask(connectionManager) {
@Override
protected RTopic getTopic() {
return new RedissonTopic(LongCodec.INSTANCE, commandExecutor, schedulerChannelName);
}
@Override
protected RFuture<Long> pushTaskAsync() {
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local expiredTaskIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
+ "local retryInterval = redis.call('get', KEYS[4]);"
+ "if #expiredTaskIds > 0 then "
+ "redis.call('zrem', KEYS[2], unpack(expiredTaskIds));"
+ "if retryInterval ~= false then "
+ "local startTime = tonumber(ARGV[1]) + tonumber(retryInterval);"
+ "for i = 1, #expiredTaskIds, 1 do "
+ "local name = expiredTaskIds[i];"
+ "local scheduledName = expiredTaskIds[i];"
+ "if string.sub(scheduledName, 1, 2) ~= 'ff' then "
+ "scheduledName = 'ff' .. scheduledName; "
+ "else "
+ "name = string.sub(name, 3, string.len(name)); "
+ "end;"
+ "redis.call('zadd', KEYS[2], startTime, scheduledName);"
+ "local v = redis.call('zrange', KEYS[2], 0, 0); "
// if new task added to queue head then publish its startTime
// to all scheduler workers
+ "if v[1] == expiredTaskIds[i] then "
+ "redis.call('publish', KEYS[3], startTime); "
+ "end;"
+ "if redis.call('linsert', KEYS[1], 'before', name, name) < 1 then "
+ "redis.call('rpush', KEYS[1], name); "
+ "else "
+ "redis.call('lrem', KEYS[1], -1, name); "
+ "end; "
+ "end; "
+ "else "
+ "redis.call('rpush', KEYS[1], unpack(expiredTaskIds));"
+ "end; "
+ "end; "
// get startTime from scheduler queue head task
+ "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
+ "if v[1] ~= nil then "
+ "return v[2]; "
+ "end "
+ "return nil;",
Arrays.<Object>asList(requestQueueName, schedulerQueueName, schedulerChannelName, tasksRetryIntervalName),
System.currentTimeMillis(), 50);
}
};
queueTransferService.schedule(getName(), task);
TasksRunnerService service =
new TasksRunnerService(commandExecutor, redisson, codec, requestQueueName, responses);
service.setStatusName(statusName);
service.setTasksCounterName(tasksCounterName);
service.setTasksName(tasksName);
service.setTerminationTopicName(terminationTopic.getChannelNames().get(0));
service.setSchedulerChannelName(schedulerChannelName);
service.setSchedulerQueueName(schedulerQueueName);
service.setTasksExpirationTimeName(tasksExpirationTimeName);
service.setTasksRetryIntervalName(tasksRetryIntervalName);
service.setBeanFactory(options.getBeanFactory());
ExecutorService es = commandExecutor.getConnectionManager().getExecutor();
if (options.getExecutorService() != null) {
es = options.getExecutorService();
}
remoteService.setListeners(options.getListeners());
remoteService.setTaskTimeout(options.getTaskTimeout());
remoteService.register(RemoteExecutorService.class, service, options.getWorkers(), es);
workersGroupListenerId = workersTopic.addListener(String.class, new MessageListener<String>() {
@Override
public void onMessage(CharSequence channel, String id) {
redisson.getAtomicLong(workersCounterName + ":" + id).getAndAdd(options.getWorkers());
redisson.getSemaphore(workersSemaphoreName + ":" + id).release();
}
});
}
public PubSubMessageListener(Class<V> type, MessageListener<V> listener, String name) {
super();
this.type = type;
this.listener = listener;
this.name = name;
}
public MessageListener<V> getListener() {
return listener;
}
@Override
public <M> int addListener(Class<M> type, MessageListener<? extends M> listener) {
PubSubMessageListener<M> pubSubListener = new PubSubMessageListener<M>(type, (MessageListener<M>) listener, name);
return addListener(pubSubListener);
}
@Override
public <M> RFuture<Integer> addListenerAsync(Class<M> type, MessageListener<M> listener) {
PubSubMessageListener<M> pubSubListener = new PubSubMessageListener<M>(type, listener, name);
return addListenerAsync(pubSubListener);
}
@Test
public void testSubscribeLimit() throws Exception {
RedisProcess runner = new RedisRunner()
.port(RedisRunner.findFreePort())
.nosave()
.randomDir()
.run();
int connection = 10;
int subscription = 5;
Config config = new Config();
config.useSingleServer()
.setAddress("redis://localhost:" + runner.getRedisServerPort())
.setSubscriptionConnectionPoolSize(connection)
.setSubscriptionsPerConnection(subscription);
RedissonClient redissonClient = Redisson.create(config);
final Queue<RTopic> queue = new LinkedList<>();
int i = 0;
boolean timeout = false;
while (true) {
try{
if (timeout) {
System.out.println("destroy");
queue.poll().removeAllListeners();
}
RTopic topic = redissonClient.getTopic(++i + "");
topic.addListener(Object.class, new MessageListener<Object>() {
@Override
public void onMessage(CharSequence channel, Object msg) {
// TODO Auto-generated method stub
}
});
queue.offer(topic);
if (i > 1000) {
break;
}
System.out.println(i + " - " + queue.size());
}catch(Exception e){
timeout = true;
e.printStackTrace();
}
}
redissonClient.shutdown();
runner.stop();
}
/**
* Subscribes to this topic.
* <code>MessageListener.onMessage</code> is called when any message
* is published on this topic.
*
* @param <M> type of message
* @param type - type of message
* @param listener for messages
* @return locally unique listener id
* @see org.redisson.api.listener.MessageListener
*/
<M> RFuture<Integer> addListenerAsync(Class<M> type, MessageListener<M> listener);
/**
* Removes the listener by its instance
*
* @param listener - listener instance
* @return void
*/
RFuture<Void> removeListenerAsync(MessageListener<?> listener);
/**
* Subscribes to this topic.
* <code>MessageListener.onMessage</code> is called when any message
* is published on this topic.
*
* @param <M> - type of message
* @param type - type of message
* @param listener for messages
* @return locally unique listener id
* @see org.redisson.api.listener.MessageListener
*/
<M> int addListener(Class<M> type, MessageListener<? extends M> listener);
/**
* Removes the listener by its instance
*
* @param listener - listener instance
*/
void removeListener(MessageListener<?> listener);
/**
* Subscribes to this topic.
* <code>MessageListener.onMessage</code> is called when any message
* is published on this topic.
*
* @param <M> - type of message
* @param type - type of message
* @param listener for messages
* @return locally unique listener id
* @see org.redisson.api.listener.MessageListener
*/
<M> Single<Integer> addListener(Class<M> type, MessageListener<M> listener);
/**
* Removes the listener by <code>instance</code> for listening this topic
*
* @param listener - message listener
* @return void
*/
Completable removeListener(MessageListener<?> listener);