下面列出了com.google.common.util.concurrent.SettableFuture#addListener ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public ListenableFuture<T> createNewListener()
{
SettableFuture<T> listener = SettableFuture.create();
synchronized (listeners) {
listeners.add(listener);
}
// remove the listener when the future completes
listener.addListener(
() -> {
synchronized (listeners) {
listeners.remove(listener);
}
},
directExecutor());
return listener;
}
public ListenableFuture<PlatformMessage> request(PlatformMessage msg, Predicate<PlatformMessage> matcher, int timeoutSecs) {
if(timeoutSecs < 0) {
timeoutSecs = defaultTimeoutSecs;
}
final Address addr = msg.getDestination();
final SettableFuture<PlatformMessage> future = SettableFuture.create();
future.addListener(() -> { futures.remove(addr); }, workerPool);
Predicate<PlatformMessage> pred = (pm) -> { return Objects.equals(msg.getCorrelationId(), pm.getCorrelationId()) && msg.isError(); };
pred = matcher.or(pred);
Pair<Predicate<PlatformMessage>, SettableFuture<PlatformMessage>> pair = new ImmutablePair<>(matcher, future);
futures.put(addr, pair);
bus.send(msg);
timeoutPool.newTimeout((timer) -> {
if(!future.isDone()) {
future.setException(new TimeoutException("future timed out"));
}
}, timeoutSecs, TimeUnit.SECONDS);
return future;
}
@NonNull
public ListenableFuture<PlatformMessage> request(@NonNull final PlatformMessage msg) {
Preconditions.checkNotNull(msg);
Preconditions.checkNotNull(msg.getCorrelationId(), "requests must have a correlation id");
final SettableFuture<PlatformMessage> future = SettableFuture.create();
SettableFuture<PlatformMessage> existing = requests.putIfAbsent(msg.getCorrelationId(), future);
if(existing != null) {
throw new RuntimeException("request with id " + msg.getCorrelationId() + " already exists");
}
future.addListener(() -> requests.remove(msg.getCorrelationId()), executor);
bus.send(msg);
timeoutTimer.newTimeout((timer) -> {
if(!future.isDone()) {
future.setException(new TimeoutException());
}
}, ttl(msg), TimeUnit.MILLISECONDS);
return future;
}
private void producePerformance(int times, String topic, final CountDownLatch latch) {
Random random = new Random();
for (int i = 0; i < times; i++) {
String uuid = UUID.randomUUID().toString();
String msg = uuid;
boolean priority = random.nextBoolean();
SettableFuture<SendResult> future;
if (priority) {
future = (SettableFuture<SendResult>) Producer.getInstance().message(topic, null, msg + " priority")
.withRefKey(uuid).withPriority().send();
} else {
future = (SettableFuture<SendResult>) Producer.getInstance().message(topic, null, msg + " non-priority")
.withRefKey(uuid).send();
}
future.addListener(new Runnable() {
@Override
public void run() {
latch.countDown();
}
}, MoreExecutors.directExecutor());
}
}
@Override
public void processResponseStream(StreamContext<ProcessEvent> context) {
Consumer<ProcessEvent> watcher = context.getWatcher();
InputStream response = context.getStream();
SettableFuture<Boolean> interrupter = context.getInterrupter();
interrupter.addListener(() -> Thread.currentThread().interrupt(), MoreExecutors.directExecutor());
try (FrameReader frameReader = new FrameReader(response)) {
Frame frame = frameReader.readFrame();
while (frame != null && !interrupter.isDone()) {
try {
ProcessEvent.watchRaw(watcher, frame.getMessage(), false);
} catch (Exception e) {
log.error("Cannot read body", e);
} finally {
frame = frameReader.readFrame();
}
}
} catch (Exception t) {
log.error("Cannot close reader", t);
}
}
/**
* Returns a listener that completes when the minimum number of workers for the cluster has been met.
* Note: caller should not add a listener using the direct executor, as this can delay the
* notifications for other listeners.
*/
public synchronized ListenableFuture<?> waitForMinimumWorkers(int executionMinCount, Duration executionMaxWait)
{
checkArgument(executionMinCount > 0, "executionMinCount should be greater than 0");
requireNonNull(executionMaxWait, "executionMaxWait is null");
if (currentCount >= executionMinCount) {
return immediateFuture(null);
}
SettableFuture<?> future = SettableFuture.create();
MinNodesFuture minNodesFuture = new MinNodesFuture(executionMinCount, future);
futuresQueue.add(minNodesFuture);
// if future does not finish in wait period, complete with an exception
ScheduledFuture<?> timeoutTask = executor.schedule(
() -> {
synchronized (this) {
future.setException(new PrestoException(
GENERIC_INSUFFICIENT_RESOURCES,
format("Insufficient active worker nodes. Waited %s for at least %s workers, but only %s workers are active", executionMaxWait, executionMinCount, currentCount)));
}
},
executionMaxWait.toMillis(),
MILLISECONDS);
// remove future if finished (e.g., canceled, timed out)
future.addListener(() -> {
timeoutTask.cancel(true);
removeFuture(minNodesFuture);
}, executor);
return future;
}
public ListenableFuture<PlatformMessage> request(PlatformMessage msg, int timeoutSecs) {
Preconditions.checkNotNull(msg.getCorrelationId(), "correlationId is required");
final SettableFuture<PlatformMessage> future = SettableFuture.create();
future.addListener(() -> { futures.remove(msg.getCorrelationId()); }, executor);
futures.put(msg.getCorrelationId(), future);
bus.send(msg);
timeoutPool.newTimeout((timer) -> {
if(!future.isDone()) {
future.setException(new TimeoutException("future timed out"));
}
}, timeoutSecs, TimeUnit.SECONDS);
return future;
}
@Test
public void testMysqlBroker() throws Exception {
startBroker();
final CountDownLatch latch = new CountDownLatch(MESSAGE_COUNT);
Producer p = Producer.getInstance();
p.message("order_new", "0", 1233213423L).withRefKey("key").withPriority().send();
long startTime = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
SettableFuture<SendResult> future = (SettableFuture<SendResult>) p.message("order_new", "0", 1233213423L)
.withRefKey("key").withPriority().send();
future.addListener(new Runnable() {
@Override
public void run() {
latch.countDown();
}
}, MoreExecutors.sameThreadExecutor());
}
boolean isDone = latch.await(30, TimeUnit.SECONDS);
assertTrue(isDone);
long endTime = System.currentTimeMillis();
outputResult(endTime - startTime);
}
public synchronized SettableFuture<Boolean> commitAsync(final OffsetCommitCallback callback,
ExecutorService callbackExecutor) {
m_done.set(true);
SettableFuture<Boolean> future = SettableFuture.create();
if (callback != null) {
future.addListener(new Runnable() {
@Override
public void run() {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (Map.Entry<Integer, List<OffsetRecord>> entry : getOffsetRecords().entrySet()) {
TopicPartition tp = new TopicPartition(m_topic, entry.getKey());
List<OffsetRecord> records = entry.getValue();
if (records != null && !records.isEmpty()) {
OffsetAndMetadata offsetMeta = offsets.get(tp);
if (offsetMeta == null) {
offsetMeta = new OffsetAndMetadata();
offsets.put(tp, offsetMeta);
}
for (OffsetRecord rec : entry.getValue()) {
if (rec.isNack()) {
// TODO add nack info to OffsetAndMetadata
} else {
if (rec.isResend()) {
offsetMeta.setResendOffset(rec.getOffset());
} else {
if (rec.isPriority()) {
offsetMeta.setPriorityOffset(rec.getOffset());
} else {
offsetMeta.setNonPriorityOffset(rec.getOffset());
}
}
}
}
}
}
callback.onComplete(offsets, null);
}
}, callbackExecutor);
}
m_future = future;
m_committer.scanAndCommitAsync();
return future;
}