io.reactivex.functions.LongConsumer#org.redisson.api.listener.MessageListener源码实例Demo

下面列出了io.reactivex.functions.LongConsumer#org.redisson.api.listener.MessageListener 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

public <T extends DispatchEventMessage> void subscribe(PubSubEventType type, final PubSubListener<T> listener, Class<T> clazz) {
    String name = type.toString();
    RTopic topic = redissonSub.getTopic(name);
    int regId = topic.addListener(DispatchEventMessage.class, new MessageListener<DispatchEventMessage>() {
        @Override
        public void onMessage(CharSequence channel, DispatchEventMessage msg) {
            if (!nodeId.equals(msg.getNodeId())) {
                listener.onMessage((T)msg);
            }
        }
    });

    Queue<Integer> list = map.get(name);
    if (list == null) {
        list = new ConcurrentLinkedQueue<Integer>();
        Queue<Integer> oldList = map.putIfAbsent(name, list);
        if (oldList != null) {
            list = oldList;
        }
    }
    list.add(regId);
}
 
源代码2 项目: redisson-examples   文件: TopicExamples.java
public static void main(String[] args) throws InterruptedException {
    // connects to 127.0.0.1:6379 by default
    RedissonClient redisson = Redisson.create();

    CountDownLatch latch = new CountDownLatch(1);
    
    RTopic topic = redisson.getTopic("topic2");
    topic.addListener(String.class, new MessageListener<String>() {
        @Override
        public void onMessage(CharSequence channel, String msg) {
            latch.countDown();
        }
    });
    
    topic.publish("msg");
    latch.await();
    
    redisson.shutdown();
}
 
源代码3 项目: redisson   文件: RedissonExecutorService.java
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
    if (isTerminated()) {
        return true;
    }
    
    CountDownLatch latch = new CountDownLatch(1);
    MessageListener<Long> listener = new MessageListener<Long>() {
        @Override
        public void onMessage(CharSequence channel, Long msg) {
            if (msg == TERMINATED_STATE) {
                latch.countDown();
            }
        }
    };
    int listenerId = terminationTopic.addListener(Long.class, listener);

    if (isTerminated()) {
        terminationTopic.removeListener(listenerId);
        return true;
    }
    
    boolean res = latch.await(timeout, unit);
    terminationTopic.removeListener(listenerId);
    return res;
}
 
源代码4 项目: redisson   文件: RedissonTopic.java
@Override
public void removeListener(MessageListener<?> listener) {
    AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
    acquire(semaphore);
    
    PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName);
    if (entry == null) {
        semaphore.release();
        return;
    }

    entry.removeListener(channelName, listener);
    if (!entry.hasListeners(channelName)) {
        subscribeService.unsubscribe(channelName, semaphore);
    } else {
        semaphore.release();
    }

}
 
源代码5 项目: redisson   文件: RedissonTopic.java
@Override
public RFuture<Void> removeListenerAsync(MessageListener<?> listener) {
    RPromise<Void> promise = new RedissonPromise<Void>();
    AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
    semaphore.acquire(() -> {
        PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName);
        if (entry == null) {
            semaphore.release();
            promise.trySuccess(null);
            return;
        }
        
        entry.removeListener(channelName, listener);
        if (!entry.hasListeners(channelName)) {
            subscribeService.unsubscribe(channelName, semaphore)
                .onComplete(new TransferListener<Void>(promise));
        } else {
            semaphore.release();
            promise.trySuccess(null);
        }
    });
    return promise;
}
 
源代码6 项目: redisson   文件: RedissonTopicTest.java
@Test
public void testRemoveByInstance() throws InterruptedException {
    RedissonClient redisson = BaseTest.createInstance();
    RTopic topic1 = redisson.getTopic("topic1");
    MessageListener listener = new MessageListener() {
        @Override
        public void onMessage(CharSequence channel, Object msg) {
            Assert.fail();
        }
    };
    
    topic1.addListener(Message.class, listener);

    topic1 = redisson.getTopic("topic1");
    topic1.removeListener(listener);
    topic1.publish(new Message("123"));

    redisson.shutdown();
}
 
源代码7 项目: redisson   文件: RedissonTopicReactiveTest.java
@Test
public void testRemoveListenerById() throws InterruptedException {
    RTopicReactive topic1 = redisson.getTopic("topic1");
    MessageListener listener = new MessageListener() {
        @Override
        public void onMessage(CharSequence channel, Object msg) {
            Assert.fail();
        }
    };
    
    Mono<Integer> res = topic1.addListener(Message.class, listener);
    Integer listenerId = res.block();

    topic1 = redisson.getTopic("topic1");
    topic1.removeListener(listenerId);
    topic1.publish(new Message("123"));
}
 
源代码8 项目: redisson   文件: RedissonTopicReactiveTest.java
@Test
public void testRemoveListenerByInstance() throws InterruptedException {
    RTopicReactive topic1 = redisson.getTopic("topic1");
    MessageListener listener = new MessageListener() {
        @Override
        public void onMessage(CharSequence channel, Object msg) {
            Assert.fail();
        }
    };
    
    topic1.addListener(Message.class, listener).block();

    topic1 = redisson.getTopic("topic1");
    topic1.removeListener(listener);
    topic1.publish(new Message("123"));
}
 
源代码9 项目: redisson   文件: RedissonTopicRxTest.java
@Test
public void testRemoveListenerById() throws InterruptedException {
    RTopicRx topic1 = redisson.getTopic("topic1");
    MessageListener listener = new MessageListener() {
        @Override
        public void onMessage(CharSequence channel, Object msg) {
            Assert.fail();
        }
    };
    
    Single<Integer> res = topic1.addListener(Message.class, listener);
    Integer listenerId = res.blockingGet();

    topic1 = redisson.getTopic("topic1");
    topic1.removeListener(listenerId);
    topic1.publish(new Message("123"));
}
 
源代码10 项目: redisson   文件: RedissonTopicRxTest.java
@Test
public void testRemoveListenerByInstance() throws InterruptedException {
    RTopicRx topic1 = redisson.getTopic("topic1");
    MessageListener listener = new MessageListener() {
        @Override
        public void onMessage(CharSequence channel, Object msg) {
            Assert.fail();
        }
    };
    
    topic1.addListener(Message.class, listener);

    topic1 = redisson.getTopic("topic1");
    topic1.removeListener(listener);
    topic1.publish(new Message("123"));
}
 
源代码11 项目: mPaaS   文件: AppConfigService.java
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
    redissonClient.getTopic(NamingConstant.TOPIC_APP_CONFIG_REFRESH).addListener(String.class,
            new MessageListener<String>() {
                @Override
                public void onMessage(CharSequence channel, String msg) {
                    contextRefresher.refresh();
                    log.info("业务模块配置刷新完毕");
                }
            });
}
 
源代码12 项目: kyoko   文件: RPCChannel.java
/**
 * @return Observable which returns all incoming messages on this channel.
 */
public Observable<Message> observe() {
    return Observable.create(emitter -> {
        MessageListener<JsonObject> listener = (t, message) -> {
            if (message != null && message.containsKey("id") && message.containsKey("payload")) {
                emitter.onNext(new Message(message));
            }
        };

        topic.addListener(JsonObject.class, listener)
                .doOnError(err -> logger.error("Cannot handle message", err))
                .subscribe();
        emitter.setCancellable(() -> topic.removeListener(listener));
    });
}
 
源代码13 项目: t-io   文件: GuavaRedisCache.java
private static void init(RedissonClient redisson) {
	if (!inited) {
		synchronized (GuavaRedisCache.class) {
			if (!inited) {
				topic = redisson.getTopic(CACHE_CHANGE_TOPIC);
				topic.addListener(CacheChangedVo.class, new MessageListener<CacheChangedVo>() {
					@Override
					public void onMessage(CharSequence channel, CacheChangedVo cacheChangedVo) {
						String clientid = cacheChangedVo.getClientId();
						if (StrUtil.isBlank(clientid)) {
							log.error("clientid is null");
							return;
						}
						if (Objects.equals(CacheChangedVo.CLIENTID, clientid)) {
							log.debug("自己发布的消息,{}", clientid);
							return;
						}

						String cacheName = cacheChangedVo.getCacheName();
						GuavaRedisCache guavaRedisCache = GuavaRedisCache.getCache(cacheName);
						if (guavaRedisCache == null) {
							log.info("不能根据cacheName[{}]找到GuavaRedisCache对象", cacheName);
							return;
						}

						CacheChangeType type = cacheChangedVo.getType();
						if (type == CacheChangeType.PUT || type == CacheChangeType.UPDATE || type == CacheChangeType.REMOVE) {
							String key = cacheChangedVo.getKey();
							guavaRedisCache.guavaCache.remove(key);
						} else if (type == CacheChangeType.CLEAR) {
							guavaRedisCache.guavaCache.clear();
						}
					}
				});
				inited = true;
			}
		}
	}
}
 
源代码14 项目: t-io   文件: CaffeineRedisCache.java
private static void init(RedissonClient redisson) {
	if (!inited) {
		synchronized (CaffeineRedisCache.class) {
			if (!inited) {
				topic = redisson.getTopic(CACHE_CHANGE_TOPIC);
				topic.addListener(CacheChangedVo.class, new MessageListener<CacheChangedVo>() {
					@Override
					public void onMessage(CharSequence channel, CacheChangedVo cacheChangedVo) {
						String clientid = cacheChangedVo.getClientId();
						if (StrUtil.isBlank(clientid)) {
							log.error("clientid is null");
							return;
						}
						if (Objects.equals(CacheChangedVo.CLIENTID, clientid)) {
							log.debug("自己发布的消息,{}", clientid);
							return;
						}

						String cacheName = cacheChangedVo.getCacheName();
						CaffeineRedisCache caffeineRedisCache = CaffeineRedisCache.getCache(cacheName);
						if (caffeineRedisCache == null) {
							log.info("不能根据cacheName[{}]找到CaffeineRedisCache对象", cacheName);
							return;
						}

						CacheChangeType type = cacheChangedVo.getType();
						if (type == CacheChangeType.PUT || type == CacheChangeType.UPDATE || type == CacheChangeType.REMOVE) {
							String key = cacheChangedVo.getKey();
							caffeineRedisCache.localCache.remove(key);
						} else if (type == CacheChangeType.CLEAR) {
							caffeineRedisCache.localCache.clear();
						}
					}
				});
				inited = true;
			}
		}
	}
}
 
源代码15 项目: redisson   文件: RedissonTopicReactive.java
public <M> Flux<M> getMessages(Class<M> type) {
    return Flux.<M>create(emitter -> {
        emitter.onRequest(n -> {
            AtomicLong counter = new AtomicLong(n);
            RFuture<Integer> t = topic.addListenerAsync(type, new MessageListener<M>() {
                @Override
                public void onMessage(CharSequence channel, M msg) {
                    emitter.next(msg);
                    if (counter.decrementAndGet() == 0) {
                        topic.removeListenerAsync(this);
                        emitter.complete();
                    }
                }
            });
            t.onComplete((id, e) -> {
                if (e != null) {
                    emitter.error(e);
                    return;
                }
                
                emitter.onDispose(() -> {
                    topic.removeListenerAsync(id);
                });
            });
        });
    });
}
 
源代码16 项目: redisson   文件: RedissonBaseAdder.java
public RedissonBaseAdder(CommandAsyncExecutor connectionManager, String name, RedissonClient redisson) {
    super(connectionManager, name);
    
    topic = redisson.getTopic(suffixName(getName(), "topic"), LongCodec.INSTANCE);
    semaphore = redisson.getSemaphore(suffixName(getName(), "semaphore"));
    listenerId = topic.addListener(Long.class, new MessageListener<Long>() {
        
        @Override
        public void onMessage(CharSequence channel, Long msg) {
            if (msg == SUM_MSG) {
                RFuture<T> addAndGetFuture = addAndGetAsync();
                addAndGetFuture.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't increase sum", e);
                        return;
                    }
                    
                    semaphore.releaseAsync().onComplete((r, ex) -> {
                        if (ex != null) {
                            log.error("Can't release semaphore", ex);
                            return;
                        }
                    });
                });
            }
            
            if (msg == CLEAR_MSG) {
                doReset();
                semaphore.releaseAsync().onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't release semaphore", e);
                    }
                });
            }
        }

    });
    
}
 
源代码17 项目: redisson   文件: RedissonTopicRx.java
public <M> Flowable<M> getMessages(Class<M> type) {
    ReplayProcessor<M> p = ReplayProcessor.create();
    return p.doOnRequest(new LongConsumer() {
        @Override
        public void accept(long n) throws Exception {
            AtomicLong counter = new AtomicLong(n);
            RFuture<Integer> t = topic.addListenerAsync(type, new MessageListener<M>() {
                @Override
                public void onMessage(CharSequence channel, M msg) {
                    p.onNext(msg);
                    if (counter.decrementAndGet() == 0) {
                        topic.removeListenerAsync(this);
                        p.onComplete();
                    }
                }
            });
            t.onComplete((id, e) -> {
                if (e != null) {
                    p.onError(e);
                    return;
                }
                
                p.doOnCancel(new Action() {
                    @Override
                    public void run() throws Exception {
                        topic.removeListenerAsync(id);
                    }
                });
            });
        }
    });
}
 
源代码18 项目: redisson   文件: TimeoutTest.java
public void testPubSub() throws InterruptedException, ExecutionException {
    RTopic topic = redisson.getTopic("simple");
    topic.addListener(String.class, new MessageListener<String>() {
        @Override
        public void onMessage(CharSequence channel, String msg) {
            System.out.println("msg: " + msg);
        }
    });
    for (int i = 0; i < 100; i++) {
        Thread.sleep(1000);
        topic.publish("test" + i);
    }
}
 
源代码19 项目: redisson   文件: RedissonExecutorService.java
@Override
public void registerWorkers(WorkerOptions options) {
    if (options.getWorkers() == 0) {
        throw new IllegalArgumentException("workers amount can't be zero");
    }
    
    QueueTransferTask task = new QueueTransferTask(connectionManager) {
        @Override
        protected RTopic getTopic() {
            return new RedissonTopic(LongCodec.INSTANCE, commandExecutor, schedulerChannelName);
        }

        @Override
        protected RFuture<Long> pushTaskAsync() {
            return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
                    "local expiredTaskIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
                  + "local retryInterval = redis.call('get', KEYS[4]);"
                  + "if #expiredTaskIds > 0 then "
                      + "redis.call('zrem', KEYS[2], unpack(expiredTaskIds));"
                      + "if retryInterval ~= false then "
                          + "local startTime = tonumber(ARGV[1]) + tonumber(retryInterval);"
                      
                          + "for i = 1, #expiredTaskIds, 1 do "
                              + "local name = expiredTaskIds[i];"
                              + "local scheduledName = expiredTaskIds[i];"
                              + "if string.sub(scheduledName, 1, 2) ~= 'ff' then "
                                  + "scheduledName = 'ff' .. scheduledName; "
                              + "else "
                                  + "name = string.sub(name, 3, string.len(name)); "
                              + "end;"
                                  
                              + "redis.call('zadd', KEYS[2], startTime, scheduledName);"
                              + "local v = redis.call('zrange', KEYS[2], 0, 0); "
                              // if new task added to queue head then publish its startTime 
                              // to all scheduler workers 
                              + "if v[1] == expiredTaskIds[i] then "
                                  + "redis.call('publish', KEYS[3], startTime); "
                              + "end;"
                                
                            + "if redis.call('linsert', KEYS[1], 'before', name, name) < 1 then "
                                + "redis.call('rpush', KEYS[1], name); "
                            + "else "
                                + "redis.call('lrem', KEYS[1], -1, name); "
                            + "end; "
                          + "end; "
                      + "else "
                          + "redis.call('rpush', KEYS[1], unpack(expiredTaskIds));"
                      + "end; "
                  + "end; "
                    // get startTime from scheduler queue head task
                  + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
                  + "if v[1] ~= nil then "
                     + "return v[2]; "
                  + "end "
                  + "return nil;",
                  Arrays.<Object>asList(requestQueueName, schedulerQueueName, schedulerChannelName, tasksRetryIntervalName), 
                  System.currentTimeMillis(), 50);
        }
    };
    queueTransferService.schedule(getName(), task);
    
    TasksRunnerService service = 
            new TasksRunnerService(commandExecutor, redisson, codec, requestQueueName, responses);
    service.setStatusName(statusName);
    service.setTasksCounterName(tasksCounterName);
    service.setTasksName(tasksName);
    service.setTerminationTopicName(terminationTopic.getChannelNames().get(0));
    service.setSchedulerChannelName(schedulerChannelName);
    service.setSchedulerQueueName(schedulerQueueName);
    service.setTasksExpirationTimeName(tasksExpirationTimeName);
    service.setTasksRetryIntervalName(tasksRetryIntervalName);
    service.setBeanFactory(options.getBeanFactory());
    
    ExecutorService es = commandExecutor.getConnectionManager().getExecutor();
    if (options.getExecutorService() != null) {
        es = options.getExecutorService();
    }

    remoteService.setListeners(options.getListeners());
    remoteService.setTaskTimeout(options.getTaskTimeout());
    remoteService.register(RemoteExecutorService.class, service, options.getWorkers(), es);
    workersGroupListenerId = workersTopic.addListener(String.class, new MessageListener<String>() {
        @Override
        public void onMessage(CharSequence channel, String id) {
            redisson.getAtomicLong(workersCounterName + ":" + id).getAndAdd(options.getWorkers());
            redisson.getSemaphore(workersSemaphoreName + ":" + id).release();
        }
    });
}
 
源代码20 项目: redisson   文件: PubSubMessageListener.java
public PubSubMessageListener(Class<V> type, MessageListener<V> listener, String name) {
    super();
    this.type = type;
    this.listener = listener;
    this.name = name;
}
 
源代码21 项目: redisson   文件: PubSubMessageListener.java
public MessageListener<V> getListener() {
    return listener;
}
 
源代码22 项目: redisson   文件: RedissonTopic.java
@Override
public <M> int addListener(Class<M> type, MessageListener<? extends M> listener) {
    PubSubMessageListener<M> pubSubListener = new PubSubMessageListener<M>(type, (MessageListener<M>) listener, name);
    return addListener(pubSubListener);
}
 
源代码23 项目: redisson   文件: RedissonTopic.java
@Override
public <M> RFuture<Integer> addListenerAsync(Class<M> type, MessageListener<M> listener) {
    PubSubMessageListener<M> pubSubListener = new PubSubMessageListener<M>(type, listener, name);
    return addListenerAsync(pubSubListener);
}
 
源代码24 项目: redisson   文件: RedissonTopicTest.java
@Test
public void testSubscribeLimit() throws Exception {
    RedisProcess runner = new RedisRunner()
            .port(RedisRunner.findFreePort())
            .nosave()
            .randomDir()
            .run();
    
    int connection = 10;
    int subscription = 5;
    Config config = new Config();
    config.useSingleServer()
            .setAddress("redis://localhost:" + runner.getRedisServerPort())
            .setSubscriptionConnectionPoolSize(connection)
            .setSubscriptionsPerConnection(subscription);
    RedissonClient redissonClient = Redisson.create(config);
    final Queue<RTopic> queue = new LinkedList<>();
    int i = 0;
    boolean timeout = false;
    while (true) {
       try{
          if (timeout) {
              System.out.println("destroy");
              queue.poll().removeAllListeners();
          }
          RTopic topic = redissonClient.getTopic(++i + "");
            topic.addListener(Object.class, new MessageListener<Object>() {
                @Override
                public void onMessage(CharSequence channel, Object msg) {
                    // TODO Auto-generated method stub

                }
            });
          queue.offer(topic);
          if (i > 1000) {
              break;
          }
          System.out.println(i + " - " + queue.size());
       }catch(Exception e){
            timeout = true;
            e.printStackTrace();
       }
    }
    
    redissonClient.shutdown();
    runner.stop();
}
 
源代码25 项目: redisson   文件: RTopicAsync.java
/**
 * Subscribes to this topic.
 * <code>MessageListener.onMessage</code> is called when any message
 * is published on this topic.
 *
 * @param <M> type of message
 * @param type - type of message
 * @param listener for messages
 * @return locally unique listener id
 * @see org.redisson.api.listener.MessageListener
 */
<M> RFuture<Integer> addListenerAsync(Class<M> type, MessageListener<M> listener);
 
源代码26 项目: redisson   文件: RTopicAsync.java
/**
 * Removes the listener by its instance
 *
 * @param listener - listener instance
 * @return void
 */
RFuture<Void> removeListenerAsync(MessageListener<?> listener);
 
源代码27 项目: redisson   文件: RTopic.java
/**
 * Subscribes to this topic.
 * <code>MessageListener.onMessage</code> is called when any message
 * is published on this topic.
 *
 * @param <M> - type of message
 * @param type - type of message
 * @param listener for messages
 * @return locally unique listener id
 * @see org.redisson.api.listener.MessageListener
 */
<M> int addListener(Class<M> type, MessageListener<? extends M> listener);
 
源代码28 项目: redisson   文件: RTopic.java
/**
 * Removes the listener by its instance
 *
 * @param listener - listener instance
 */
void removeListener(MessageListener<?> listener);
 
源代码29 项目: redisson   文件: RTopicRx.java
/**
 * Subscribes to this topic.
 * <code>MessageListener.onMessage</code> is called when any message
 * is published on this topic.
 *
 * @param <M> - type of message
 * @param type - type of message
 * @param listener for messages
 * @return locally unique listener id
 * @see org.redisson.api.listener.MessageListener
 */
<M> Single<Integer> addListener(Class<M> type, MessageListener<M> listener);
 
源代码30 项目: redisson   文件: RTopicRx.java
/**
 * Removes the listener by <code>instance</code> for listening this topic
 *
 * @param listener - message listener
 * @return void
 */
Completable removeListener(MessageListener<?> listener);