下面列出了怎么用com.google.common.util.concurrent.FluentFuture的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public synchronized FluentFuture<? extends CommitInfo> closeServiceInstance() {
LOG.info("BMP Monitor Singleton Service {} instance closed, Monitor Id {}",
getIdentifier().getName(), this.monitorId.getValue());
if (this.channel != null) {
this.channel.close().addListener((ChannelFutureListener) future -> {
Preconditions.checkArgument(future.isSuccess(),
"Channel failed to close: %s", future.cause());
BmpMonitoringStationImpl.this.sessionManager.close();
});
}
final DOMDataTreeWriteTransaction wTx = this.domDataBroker.newWriteOnlyTransaction();
wTx.delete(LogicalDatastoreType.OPERATIONAL, this.yangMonitorId);
LOG.info("BMP monitoring station {} closed.", this.monitorId.getValue());
return wTx.commit();
}
/**
* Destroy the current operational topology data. Note a valid transaction must be provided.
*/
private synchronized FluentFuture<? extends CommitInfo> destroyOperationalTopology() {
requireNonNull(this.chain, "A valid transaction chain must be provided.");
final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
trans.delete(LogicalDatastoreType.OPERATIONAL, getInstanceIdentifier());
final FluentFuture<? extends CommitInfo> future = trans.commit();
future.addCallback(new FutureCallback<CommitInfo>() {
@Override
public void onSuccess(final CommitInfo result) {
LOG.trace("Operational topology removed {}", AbstractTopologyBuilder.this.topology);
}
@Override
public void onFailure(final Throwable throwable) {
LOG.error("Unable to reset operational topology {} (transaction {})",
AbstractTopologyBuilder.this.topology, trans.getIdentifier(), throwable);
}
}, MoreExecutors.directExecutor());
clearTopology();
return future;
}
@Override
public synchronized FluentFuture<? extends CommitInfo> close() {
setActive(false);
if (this.registration != null) {
this.registration.close();
this.registration = null;
}
if (this.adjRibInWriter != null) {
this.adjRibInWriter.releaseChain();
}
if (this.effectiveRibInWriter != null) {
this.effectiveRibInWriter.close();
}
final FluentFuture<? extends CommitInfo> future;
future = removePeer(this.peerPath);
closeDomChain();
if (this.trackerRegistration != null) {
this.trackerRegistration.close();
this.trackerRegistration = null;
}
return future;
}
void removeRoutes(final MpUnreachNlri nlri) {
final TablesKey key = new TablesKey(nlri.getAfi(), nlri.getSafi());
final TableContext ctx = this.tables.get(key);
if (ctx == null) {
LOG.debug("No table for {}, not accepting NLRI {}", key, nlri);
return;
}
LOG.trace("Removing routes {}", nlri);
final DOMDataTreeWriteTransaction tx = this.chain.getDomChain().newWriteOnlyTransaction();
ctx.removeRoutes(tx, nlri);
final FluentFuture<? extends CommitInfo> future = tx.commit();
this.submitted = future;
future.addCallback(new FutureCallback<CommitInfo>() {
@Override
public void onSuccess(final CommitInfo result) {
LOG.trace("Removing routes {}, succeed", nlri);
}
@Override
public void onFailure(final Throwable throwable) {
LOG.error("Removing routes failed", throwable);
}
}, MoreExecutors.directExecutor());
}
@Override
public synchronized FluentFuture<? extends CommitInfo> releaseConnection() {
LOG.info("Closing session with peer");
this.sessionUp = false;
this.adjRibOutListenerSet.values().forEach(AdjRibOutListener::close);
this.adjRibOutListenerSet.clear();
final FluentFuture<? extends CommitInfo> future;
if (!isRestartingGracefully()) {
future = terminateConnection();
} else {
final Set<TablesKey> gracefulTables = getGracefulTables();
this.ribWriter.storeStaleRoutes(gracefulTables);
future = this.ribWriter.clearTables(Sets.difference(this.tables, gracefulTables));
if (isPeerRestarting()) {
this.peerRestartStopwatch = Stopwatch.createStarted();
handleRestartTimer();
}
}
releaseBindingChain();
closeSession();
return future;
}
private synchronized FluentFuture<? extends CommitInfo> terminateConnection() {
final FluentFuture<? extends CommitInfo> future;
if (this.trackerRegistration != null) {
this.trackerRegistration.close();
this.trackerRegistration = null;
}
if (this.rpcRegistration != null) {
this.rpcRegistration.close();
}
this.ribWriter.releaseChain();
if (this.effRibInWriter != null) {
this.effRibInWriter.close();
}
this.tables = ImmutableSet.of();
this.addPathTableMaps = Collections.emptyMap();
future = removePeer(this.peerPath);
resetState();
return future;
}
final synchronized FluentFuture<? extends CommitInfo> removePeer(final @Nullable YangInstanceIdentifier peerPath) {
if (peerPath == null) {
return CommitInfo.emptyFluentFuture();
}
LOG.info("Closed per Peer {} removed", peerPath);
final DOMDataTreeWriteTransaction tx = this.domChain.newWriteOnlyTransaction();
tx.delete(LogicalDatastoreType.OPERATIONAL, peerPath);
final FluentFuture<? extends CommitInfo> future = tx.commit();
future.addCallback(new FutureCallback<CommitInfo>() {
@Override
public void onSuccess(final CommitInfo result) {
LOG.debug("Peer {} removed", peerPath);
}
@Override
public void onFailure(final Throwable throwable) {
LOG.error("Failed to remove Peer {}", peerPath, throwable);
}
}, MoreExecutors.directExecutor());
return future;
}
/**
* Destroy the current operational topology data. Note a valid transaction must be provided.
*/
private synchronized FluentFuture<? extends CommitInfo> destroyOperationalGraphModel() {
requireNonNull(this.chain, "A valid transaction chain must be provided.");
final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
trans.delete(LogicalDatastoreType.OPERATIONAL, this.graphTopologyIdentifier);
trans.delete(LogicalDatastoreType.CONFIGURATION, this.graphTopologyIdentifier);
final FluentFuture<? extends CommitInfo> future = trans.commit();
future.addCallback(new FutureCallback<CommitInfo>() {
@Override
public void onSuccess(final CommitInfo result) {
LOG.trace("Operational GraphModel removed {}", ConnectedGraphServer.this.graphTopologyIdentifier);
}
@Override
public void onFailure(final Throwable throwable) {
LOG.error("Unable to reset operational GraphModel {} (transaction {})",
ConnectedGraphServer.this.graphTopologyIdentifier, trans.getIdentifier(), throwable);
}
}, MoreExecutors.directExecutor());
/* Clear Connected Graph */
for (ConnectedGraph graph : graphs.values()) {
((ConnectedGraphImpl) graph).clear();
}
return future;
}
@SuppressWarnings("ProtoParseWithRegistry")
public static ListenableFuture<ActionResult> downloadAsActionResult(
ActionKey actionDigest,
BiFunction<Digest, OutputStream, ListenableFuture<Void>> downloadFunction) {
ByteArrayOutputStream data = new ByteArrayOutputStream(/* size= */ 1024);
ListenableFuture<Void> download = downloadFunction.apply(actionDigest.getDigest(), data);
return FluentFuture.from(download)
.transformAsync(
(v) -> {
try {
return Futures.immediateFuture(ActionResult.parseFrom(data.toByteArray()));
} catch (InvalidProtocolBufferException e) {
return Futures.immediateFailedFuture(e);
}
},
MoreExecutors.directExecutor())
.catching(CacheNotFoundException.class, (e) -> null, MoreExecutors.directExecutor());
}
/**
* Executes read as non blocking transaction and assign a default callback
* to close the transaction.
*
* @param store
* {@link LogicalDatastoreType} to read
* @param path
* {@link InstanceIdentifier} for path to read
* @param <D>
* The data object type
* @return The {@link FluentFuture} object to which you can assign a
* callback
*/
public <D extends DataObject> FluentFuture<Optional<D>> read(
final LogicalDatastoreType store,
final InstanceIdentifier<D> path) {
final ReadTransaction transaction = databroker.newReadOnlyTransaction();
final FluentFuture<Optional<D>> future = transaction.read(store, path);
final FutureCallback<Optional<D>> closeTransactionCallback = new FutureCallback<Optional<D>>() {
@Override
public void onSuccess(final Optional<D> result) {
transaction.close();
}
@Override
public void onFailure(final Throwable ex) {
transaction.close();
}
};
future.addCallback(closeTransactionCallback, MoreExecutors.directExecutor());
return future;
}
public boolean exists(
final LogicalDatastoreType store, final InstanceIdentifier<? extends DataObject> path) {
int trialNo = 0;
ReadTransaction transaction = databroker.newReadOnlyTransaction();
do {
try {
FluentFuture<Boolean> result = transaction.exists(store, path);
transaction.close();
return result.get().booleanValue();
} catch (InterruptedException | ExecutionException e) {
if (trialNo == 0) {
logReadFailureError(path, " mdsal Read failed exception retrying the read after sleep");
}
try {
transaction.close();
Thread.sleep(MDSAL_READ_SLEEP_INTERVAL_MS);
transaction = databroker.newReadOnlyTransaction();
} catch (InterruptedException e1) {
logReadFailureError(path, " Sleep interrupted");
}
}
} while (trialNo++ < MDSAL_MAX_READ_TRIALS);
logReadFailureError(path, " All read trials exceeded");
return false;
}
public OvsdbConnectionInstance getConnectionInstance(final InstanceIdentifier<Node> nodePath) {
if (nodeIdVsConnectionInstance.get(nodePath) != null) {
return nodeIdVsConnectionInstance.get(nodePath);
}
try {
ReadTransaction transaction = db.newReadOnlyTransaction();
FluentFuture<Optional<Node>> nodeFuture = transaction.read(
LogicalDatastoreType.OPERATIONAL, nodePath);
transaction.close();
Optional<Node> optional = nodeFuture.get();
if (optional.isPresent()) {
return this.getConnectionInstance(optional.get());
} else {
LOG.debug("Node was not found on the path in the operational DS: {}", nodePath);
return null;
}
} catch (InterruptedException | ExecutionException e) {
LOG.warn("Failed to get Ovsdb Node {}",nodePath, e);
return null;
}
}
private void initializeOvsdbTopology(final LogicalDatastoreType type) {
InstanceIdentifier<Topology> path = InstanceIdentifier
.create(NetworkTopology.class)
.child(Topology.class, new TopologyKey(SouthboundConstants.OVSDB_TOPOLOGY_ID));
ReadWriteTransaction transaction = db.newReadWriteTransaction();
FluentFuture<Boolean> ovsdbTp = transaction.exists(type, path);
try {
if (!ovsdbTp.get().booleanValue()) {
TopologyBuilder tpb = new TopologyBuilder();
tpb.setTopologyId(SouthboundConstants.OVSDB_TOPOLOGY_ID);
transaction.mergeParentStructurePut(type, path, tpb.build());
transaction.commit();
} else {
transaction.cancel();
}
} catch (InterruptedException | ExecutionException e) {
LOG.error("Error initializing ovsdb topology", e);
}
}
@Before
public void setUp() throws Exception {
NodeKey nodeKey = new NodeKey(new NodeId(new Uri(NODE_ID)));
iid = SouthboundMapper.createInstanceIdentifier(nodeKey.getNodeId());
SouthboundProvider.setBridgesReconciliationInclusionList(Arrays.asList(BR_INT));
Node brIntNode = createBridgeNode(NODE_ID + "/bridge/" + BR_INT);
Optional<Node> nodeOptional = Optional.of(brIntNode);
FluentFuture<Optional<Node>> readNodeFuture =
FluentFutures.immediateFluentFuture(nodeOptional);
when(reconciliationManager.getDb()).thenReturn(db);
ReadTransaction tx = mock(ReadTransaction.class);
Mockito.when(db.newReadOnlyTransaction()).thenReturn(tx);
Mockito.when(tx.read(any(LogicalDatastoreType.class),any(InstanceIdentifier.class)))
.thenReturn(readNodeFuture);
when(topology.getNode()).thenReturn(Map.of(brIntNode.key(), brIntNode));
configurationReconciliationTask =
new BridgeConfigReconciliationTask(reconciliationManager, ovsdbConnectionManager, iid,
ovsdbConnectionInstance, mock(InstanceIdentifierCodec.class));
}
@Test
public void testConnected() throws Exception {
OvsdbConnectionInstance client = mock(OvsdbConnectionInstance.class);
suppress(MemberMatcher.method(OvsdbConnectionManager.class, "connectedButCallBacksNotRegistered",
OvsdbClient.class));
when(ovsdbConnManager.connectedButCallBacksNotRegistered(any(OvsdbClient.class))).thenReturn(client);
doNothing().when(client).registerCallbacks(any());
//TODO: Write unit tests for EntityOwnershipService
when(client.getInstanceIdentifier()).thenReturn(mock(InstanceIdentifier.class));
field(OvsdbConnectionManager.class, "entityConnectionMap").set(ovsdbConnManager, entityConnectionMap);
suppress(MemberMatcher.method(OvsdbConnectionManager.class, "getEntityFromConnectionInstance",
OvsdbConnectionInstance.class));
//TODO: Write unit tests for entity ownership service related code.
suppress(MemberMatcher.method(OvsdbConnectionManager.class, "registerEntityForOwnership",
OvsdbConnectionInstance.class));
ReadTransaction tx = mock(ReadTransaction.class);
when(db.newReadOnlyTransaction()).thenReturn(tx);
when(tx.read(any(LogicalDatastoreType.class), any(InstanceIdentifier.class)))
.thenReturn(mock(FluentFuture.class));
when(client.getInstanceIdentifier()).thenReturn(mock(InstanceIdentifier.class));
ovsdbConnManager.connected(externalClient);
}
private void initializeHwvtepTopology(final LogicalDatastoreType type) {
InstanceIdentifier<Topology> path = InstanceIdentifier
.create(NetworkTopology.class)
.child(Topology.class, new TopologyKey(HwvtepSouthboundConstants.HWVTEP_TOPOLOGY_ID));
ReadWriteTransaction transaction = dataBroker.newReadWriteTransaction();
FluentFuture<Boolean> hwvtepTp = transaction.exists(type, path);
try {
if (!hwvtepTp.get().booleanValue()) {
TopologyBuilder tpb = new TopologyBuilder();
tpb.setTopologyId(HwvtepSouthboundConstants.HWVTEP_TOPOLOGY_ID);
transaction.mergeParentStructurePut(type, path, tpb.build());
transaction.commit();
} else {
transaction.cancel();
}
} catch (InterruptedException | ExecutionException e) {
LOG.error("Error initializing hwvtep topology", e);
}
}
private synchronized void executeCommand(final TransactionCommand command) {
final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
transactionInFlight = transaction;
recordPendingTransaction(command, transaction);
command.execute(transaction);
FluentFuture<?> ft = transaction.commit();
command.setTransactionResultFuture(ft);
ft.addCallback(new FutureCallback<Object>() {
@Override
public void onSuccess(final Object result) {
forgetSuccessfulTransaction(transaction);
}
@Override
public void onFailure(final Throwable throwable) {
// NOOP - handled by failure of transaction chain
}
}, MoreExecutors.directExecutor());
}
@Override
public synchronized FluentFuture<? extends CommitInfo> closeServiceInstance() {
LOG.info("Close Service Instance PCEP Tunnel Topology Provider Singleton Service {}",
getIdentifier().getName());
this.reg.close();
this.tp.close();
this.ttp.close();
return CommitInfo.emptyFluentFuture();
}
private ListenableFuture<OperationResult> triggerLspSyncronization(final TriggerSyncArgs input) {
LOG.trace("Trigger Lsp Resynchronization {}", input);
// Make sure the LSP exists
final InstanceIdentifier<ReportedLsp> lsp = lspIdentifier(input.getName());
final FluentFuture<Optional<ReportedLsp>> f = readOperationalData(lsp);
if (f == null) {
return OperationResults.createUnsent(PCEPErrors.LSP_INTERNAL_ERROR).future();
}
return Futures.transformAsync(f, new ResyncLspFunction(input), MoreExecutors.directExecutor());
}
public FluentFuture<? extends CommitInfo> closeServiceInstance() {
//FIXME return also channelClose once ListenableFuture implements wildcard
this.channel.close().addListener((ChannelFutureListener) future ->
checkArgument(future.isSuccess(), "Channel failed to close: %s", future.cause()));
if (this.network != null) {
this.network.close();
this.network = null;
}
if (this.element != null) {
this.element.close();
this.element = null;
}
return this.manager.closeServiceInstance();
}
@Override
public synchronized FluentFuture<? extends CommitInfo> closeServiceInstance() {
LOG.info("Close PCEP Topology Provider Singleton Service {}", getIdentifier().getName());
if (this.serviceInstantiated) {
this.serviceInstantiated = false;
return this.pcepTopoProvider.closeServiceInstance();
}
return CommitInfo.emptyFluentFuture();
}
synchronized FluentFuture<? extends CommitInfo> closeServiceInstance() {
if (this.isClosed.getAndSet(true)) {
LOG.error("Session Manager has already been closed.");
return CommitInfo.emptyFluentFuture();
}
for (final TopologySessionListener node : this.nodes.values()) {
node.close();
}
this.nodes.clear();
for (final TopologyNodeState topologyNodeState : this.state.values()) {
topologyNodeState.close();
}
this.state.clear();
final WriteTransaction t = this.dependenciesProvider.getDataBroker().newWriteOnlyTransaction();
t.delete(LogicalDatastoreType.OPERATIONAL, this.topology);
final FluentFuture<? extends CommitInfo> future = t.commit();
future.addCallback(new FutureCallback<CommitInfo>() {
@Override
public void onSuccess(final CommitInfo result) {
LOG.debug("Topology {} removed", ServerSessionManager.this.topology);
}
@Override
public void onFailure(final Throwable throwable) {
LOG.warn("Failed to remove Topology {}", ServerSessionManager.this.topology, throwable);
}
}, MoreExecutors.directExecutor());
return future;
}
public final synchronized FluentFuture<? extends CommitInfo> close() {
if (this.closed.getAndSet(true)) {
LOG.trace("Transaction chain was already closed.");
return CommitInfo.emptyFluentFuture();
}
LOG.info("Shutting down builder for {}", getInstanceIdentifier());
unregisterDataChangeListener();
final FluentFuture<? extends CommitInfo> future = destroyOperationalTopology();
destroyTransactionChain();
return future;
}
@Override
public void setTransactionResultFuture(final FluentFuture future) {
future.addCallback(new FutureCallback<>() {
@Override
public void onSuccess(final Object notUsed) {
ft.set(null);
}
@Override
public void onFailure(final Throwable throwable) {
ft.setException(throwable);
}
}, MoreExecutors.directExecutor());
}
void removeStaleRoutes(final TablesKey tableKey) {
final TableContext ctx = this.tables.get(tableKey);
if (ctx == null) {
LOG.debug("No table for {}, not removing any stale routes", tableKey);
return;
}
final Collection<NodeIdentifierWithPredicates> routeKeys = this.staleRoutesRegistry.get(tableKey);
if (routeKeys == null || routeKeys.isEmpty()) {
LOG.debug("No stale routes present in table {}", tableKey);
return;
}
LOG.trace("Removing routes {}", routeKeys);
final DOMDataTreeWriteTransaction tx = this.chain.getDomChain().newWriteOnlyTransaction();
routeKeys.forEach(routeKey -> {
tx.delete(LogicalDatastoreType.OPERATIONAL, ctx.routePath(routeKey));
});
final FluentFuture<? extends CommitInfo> future = tx.commit();
this.submitted = future;
future.addCallback(new FutureCallback<CommitInfo>() {
@Override
public void onSuccess(final CommitInfo result) {
LOG.trace("Removing routes {}, succeed", routeKeys);
synchronized (AdjRibInWriter.this.staleRoutesRegistry) {
staleRoutesRegistry.remove(tableKey);
}
}
@Override
public void onFailure(final Throwable throwable) {
LOG.warn("Removing routes {}, failed", routeKeys, throwable);
}
}, MoreExecutors.directExecutor());
}
FluentFuture<? extends CommitInfo> clearTables(final Set<TablesKey> tablesToClear) {
if (tablesToClear == null || tablesToClear.isEmpty()) {
return CommitInfo.emptyFluentFuture();
}
final DOMDataTreeWriteTransaction wtx = this.chain.getDomChain().newWriteOnlyTransaction();
tablesToClear.forEach(tableKey -> {
final TableContext ctx = this.tables.get(tableKey);
wtx.delete(LogicalDatastoreType.OPERATIONAL, ctx.routesPath().getParent());
});
return wtx.commit();
}
public synchronized FluentFuture<? extends CommitInfo> closeServiceInstance() {
if (!this.isServiceInstantiated) {
LOG.trace("RIB {} already closed", this.ribId.getValue());
return CommitInfo.emptyFluentFuture();
}
LOG.info("Close RIB {}", this.ribId.getValue());
this.isServiceInstantiated = false;
setActive(false);
this.txChainToLocRibWriter.values().forEach(LocRibWriter::close);
this.txChainToLocRibWriter.clear();
final DOMDataTreeWriteTransaction t = this.domChain.newWriteOnlyTransaction();
t.delete(LogicalDatastoreType.OPERATIONAL, getYangRibId());
final FluentFuture<? extends CommitInfo> cleanFuture = t.commit();
cleanFuture.addCallback(new FutureCallback<CommitInfo>() {
@Override
public void onSuccess(final CommitInfo result) {
LOG.info("RIB cleaned {}", RIBImpl.this.ribId.getValue());
}
@Override
public void onFailure(final Throwable throwable) {
LOG.error("Failed to clean RIB {}",
RIBImpl.this.ribId.getValue(), throwable);
}
}, MoreExecutors.directExecutor());
this.domChain.close();
return cleanFuture;
}
@Override
public synchronized FluentFuture<? extends CommitInfo> close() {
final FluentFuture<? extends CommitInfo> future = releaseConnection();
closeDomChain();
setActive(false);
return future;
}
@Override
public final synchronized <C extends Routes & DataObject & ChoiceIn<Tables>, S extends ChildOf<? super C>,
R extends Route & ChildOf<? super S> & Identifiable<I>,
I extends Identifier<R>> void refreshRibOut(final RouteEntryDependenciesContainer entryDep,
final List<StaleBestPathRoute<C, S, R, I>> staleRoutes, final List<AdvertizedRoute<C, S, R, I>> newRoutes) {
if (this.bindingChain == null) {
LOG.debug("Session closed, skip changes to peer AdjRibsOut {}", getPeerId());
return;
}
final WriteTransaction tx = this.bindingChain.newWriteOnlyTransaction();
final RIBSupport<C, S, R, I> ribSupport = entryDep.getRIBSupport();
deleteRouteRibOut(ribSupport, staleRoutes, tx);
installRouteRibOut(entryDep, newRoutes, tx);
final FluentFuture<? extends CommitInfo> future = tx.commit();
this.submitted = future;
future.addCallback(new FutureCallback<CommitInfo>() {
@Override
public void onSuccess(final CommitInfo result) {
LOG.trace("Successful update commit");
}
@Override
public void onFailure(final Throwable trw) {
LOG.error("Failed update commit", trw);
}
}, MoreExecutors.directExecutor());
}
@Override
public synchronized FluentFuture<? extends CommitInfo> closeServiceInstance() {
if (this.bgpAppPeerSingletonService != null) {
return this.bgpAppPeerSingletonService.closeServiceInstance();
}
return CommitInfo.emptyFluentFuture();
}