com.google.common.base.Optional#or ( )源码实例Demo

下面列出了com.google.common.base.Optional#or ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: rya   文件: PcjIntegrationTestingUtil.java
/**
 * Creates a new PCJ Table in Accumulo and populates it by scanning an
 * instance of Rya for historic matches.
 * <p>
 * If any portion of this operation fails along the way, the partially
 * create PCJ table will be left in Accumulo.
 *
 * @param ryaConn - Connects to the Rya that will be scanned. (not null)
 * @param accumuloConn - Connects to the accumulo that hosts the PCJ results. (not null)
 * @param pcjTableName - The name of the PCJ table that will be created. (not null)
 * @param sparql - The SPARQL query whose results will be loaded into the table. (not null)
 * @param resultVariables - The variables that are included in the query's resulting binding sets. (not null)
 * @param pcjVarOrderFactory - An optional factory that indicates the various variable orders
 *   the results will be stored in. If one is not provided, then {@link ShiftVarOrderFactory}
 *   is used by default. (not null)
 * @throws PcjException The PCJ table could not be create or the values from
 *   Rya were not able to be loaded into it.
 */
public static void createAndPopulatePcj(
        final RepositoryConnection ryaConn,
        final Connector accumuloConn,
        final String pcjTableName,
        final String sparql,
        final String[] resultVariables,
        final Optional<PcjVarOrderFactory> pcjVarOrderFactory) throws PcjException {
    checkNotNull(ryaConn);
    checkNotNull(accumuloConn);
    checkNotNull(pcjTableName);
    checkNotNull(sparql);
    checkNotNull(resultVariables);
    checkNotNull(pcjVarOrderFactory);

    final PcjTables pcj = new PcjTables();
    // Create the PCJ's variable orders.
    final PcjVarOrderFactory varOrderFactory = pcjVarOrderFactory.or(new ShiftVarOrderFactory());
    final Set<VariableOrder> varOrders = varOrderFactory.makeVarOrders( new VariableOrder(resultVariables) );

    // Create the PCJ table in Accumulo.
    pcj.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql);

    // Load historic matches from Rya into the PCJ table.
    populatePcj(accumuloConn, pcjTableName, ryaConn);
}
 
/**
 * Log statistics about this {@link ConcurrentBoundedPriorityIterable}.
 */
public synchronized void logStatistics(Optional<Logger> logger) {
  Logger actualLogger = logger.or(log);
  StringBuilder messageBuilder = new StringBuilder("Statistics for ").
      append(ConcurrentBoundedPriorityIterable.class.getSimpleName()).append(": {");
  messageBuilder.append(this.resourcePool).append(", ");
  messageBuilder.append("totalResourcesUsed: ")
      .append(this.resourcePool.stringifyRequirement(this.currentRequirement)).append(", ");
  messageBuilder.append("maxRequirementPerDimension: ")
      .append(this.resourcePool.stringifyRequirement(this.maxResourceRequirement)).append(", ");
  messageBuilder.append("requestsOffered: ").append(this.requestsOffered).append(", ");
  messageBuilder.append("requestsAccepted: ")
      .append(this.requestsOffered - this.requestsEvicted - this.requestsRefused).append(", ");
  messageBuilder.append("requestsRefused: ").append(this.requestsRefused).append(", ");
  messageBuilder.append("requestsEvicted: ").append(this.requestsEvicted);
  messageBuilder.append("}");
  actualLogger.info(messageBuilder.toString());
}
 
源代码3 项目: arcusplatform   文件: ClusterService.java
/**
 * 
 */
@Inject
public ClusterService(
      ClusterServiceDao clusterServiceDao,
      Optional<Set<ClusterServiceListener>> listeners,
      @Named(NAME_EXECUTOR) ScheduledExecutorService executor,
      ClusterConfig config
) {
   this.clusterServiceDao = clusterServiceDao;
   this.listeners = listeners.or(ImmutableSet.of());
   this.executor = executor;
   this.heartbeatIntervalMs = config.getHeartbeatInterval(TimeUnit.MILLISECONDS);
   this.retryDelayMs = config.getRegistrationRetryDelay(TimeUnit.MILLISECONDS);
   this.retries = config.getRegistrationRetries();
   this.exitOnDeregistered = config.isExitOnDeregistered();
}
 
源代码4 项目: PortEx   文件: SectionLoader.java
/**
 * Assembles the loadInfo object for the dataDirKey.
 * 
 * @param dataDirKey
 *            data directory key
 * @return loading information
 */
private Optional<LoadInfo> maybeGetLoadInfo(DataDirectoryKey dataDirKey) {
    Optional<DataDirEntry> dirEntry = optHeader
            .maybeGetDataDirEntry(dataDirKey);
    if (dirEntry.isPresent()) {
        long virtualAddress = dirEntry.get().getVirtualAddress();
        Optional<Long> maybeOffset = maybeGetFileOffsetFor(dataDirKey);
        if (!maybeOffset.isPresent()) {
            logger.info("unable to get file offset for " + dataDirKey);
        }
        long offset = maybeOffset.or(-1L);
        return Optional.of(new LoadInfo(offset, virtualAddress,
                getMemoryMappedPE(), data, this));

    }
    return Optional.absent();
}
 
源代码5 项目: incubator-gobblin   文件: JobCatalogBase.java
public JobCatalogBase(Optional<Logger> log, Optional<MetricContext> parentMetricContext,
    boolean instrumentationEnabled, Optional<Config> sysConfig) {
  this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
  this.listeners = new JobCatalogListenersList(log);
  if (instrumentationEnabled) {
    MetricContext realParentCtx =
        parentMetricContext.or(Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(), getClass()));
    this.metricContext = realParentCtx.childBuilder(JobCatalog.class.getSimpleName()).build();
    this.metrics = createStandardMetrics(sysConfig);
    this.addListener(this.metrics);
  }
  else {
    this.metricContext = null;
    this.metrics = null;
  }
}
 
public DefaultGobblinInstanceDriverImpl(String instanceName,
    Configurable sysConfig, JobCatalog jobCatalog,
    JobSpecScheduler jobScheduler,
    JobExecutionLauncher jobLauncher,
    Optional<MetricContext> baseMetricContext,
    Optional<Logger> log,
    SharedResourcesBroker<GobblinScopeTypes> instanceBroker) {
  Preconditions.checkNotNull(jobCatalog);
  Preconditions.checkNotNull(jobScheduler);
  Preconditions.checkNotNull(jobLauncher);
  Preconditions.checkNotNull(sysConfig);

  _instanceName = instanceName;
  _log = log.or(LoggerFactory.getLogger(getClass()));
  _metricCtx = baseMetricContext.or(constructMetricContext(sysConfig, _log));
  _instrumentationEnabled = null != _metricCtx && GobblinMetrics.isEnabled(sysConfig.getConfig());
  _jobCatalog = jobCatalog;
  _jobScheduler = jobScheduler;
  _jobLauncher = jobLauncher;
  _sysConfig = sysConfig;
  _instanceCfg = ConfigAccessor.createFromGlobalConfig(_sysConfig.getConfig());
  _callbacksDispatcher = new JobLifecycleListenersList(_jobCatalog, _jobScheduler, _log);
  _instanceBroker = instanceBroker;

  _metrics = new StandardMetrics(this);
}
 
源代码7 项目: Baragon   文件: UpstreamInfo.java
@JsonCreator
public UpstreamInfo(@JsonProperty("upstream") String upstream,
                    @JsonProperty("requestId") Optional<String> requestId,
                    @JsonProperty("rackId") Optional<String> rackId,
                    @JsonProperty("originalPath") Optional<String> originalPath,
                    @JsonProperty("group") Optional<String> group,
                    @JsonProperty("resolvedUpstream") Optional<String> resolvedUpstream) {
  this.upstream = upstream;
  this.requestId = requestId.or("");
  this.rackId = rackId.or("");
  this.originalPath = originalPath;
  this.group = group.or(DEFAULT_GROUP);
  this.resolvedUpstream = resolvedUpstream.or("");
}
 
源代码8 项目: incubator-gobblin   文件: FlowCatalog.java
public FlowCatalog(Config config, Optional<Logger> log, Optional<MetricContext> parentMetricContext,
    boolean instrumentationEnabled) {
  this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
  this.listeners = new SpecCatalogListenersList(log);
  if (instrumentationEnabled) {
    MetricContext realParentCtx =
        parentMetricContext.or(Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(), getClass()));
    this.metricContext = realParentCtx.childBuilder(FlowCatalog.class.getSimpleName()).build();
    this.metrics = new MutableStandardMetrics(this, Optional.of(config));
    this.addListener(this.metrics);
  } else {
    this.metricContext = null;
    this.metrics = null;
  }

  this.aliasResolver = new ClassAliasResolver<>(SpecStore.class);
  try {
    Config newConfig = config;
    if (config.hasPath(FLOWSPEC_STORE_DIR_KEY)) {
      newConfig = config.withValue(FSSpecStore.SPECSTORE_FS_DIR_KEY,
          config.getValue(FLOWSPEC_STORE_DIR_KEY));
    }
    String specStoreClassName = ConfigUtils.getString(config, FLOWSPEC_STORE_CLASS_KEY, DEFAULT_FLOWSPEC_STORE_CLASS);
    this.log.info(String.format("Using class name/alias [%s] for specstore", specStoreClassName));
    String specSerDeClassName = ConfigUtils.getString(config, FLOWSPEC_SERDE_CLASS_KEY, DEFAULT_FLOWSPEC_SERDE_CLASS);
    this.log.info(String.format("Using class name/alias [%s] for spec serde", specSerDeClassName));

    SpecSerDe specSerDe = (SpecSerDe) ConstructorUtils.invokeConstructor(Class.forName(
        new ClassAliasResolver<>(SpecSerDe.class).resolve(specSerDeClassName)));
    this.specStore = (SpecStore) ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(
        specStoreClassName)), newConfig, specSerDe);
  } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException
      | ClassNotFoundException e) {
    throw new RuntimeException(e);
  }
}
 
protected StandardGobblinInstanceLauncher(String name,
    Configurable instanceConf,
    StandardGobblinInstanceDriver.Builder driverBuilder,
    Optional<MetricContext> metricContext,
    Optional<Logger> log,
    SharedResourcesBroker<GobblinScopeTypes> instanceBroker) {
  _log = log.or(LoggerFactory.getLogger(getClass()));
  _name = name;
  _instanceConf = instanceConf;
  _driver = driverBuilder.withInstanceEnvironment(this).build();
  _instrumentationEnabled = metricContext.isPresent();
  _metricContext = metricContext.orNull();
  _instanceBroker = instanceBroker;
}
 
源代码10 项目: heat   文件: RestAssuredRequestMaker.java
private String getRequestDetails(Method httpMethod, String url) {
    Optional<String> requestDetails = Optional.absent();
    try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
        requestDetails = Optional.fromNullable(RequestPrinter.print((FilterableRequestSpecification) requestSpecification, httpMethod.name(), url, LogDetail.ALL,
                new PrintStream(os), true));
    } catch (IOException e) {
        logUtils.error("Unable to log 'Request Details', error occured during retrieving the information");
    }
    return requestDetails.or("");
}
 
源代码11 项目: Baragon   文件: BaragonAgentEc2Metadata.java
public static BaragonAgentEc2Metadata fromEnvironment(Optional<String> privateipOverride, boolean skipPrivateIp) {
  return new BaragonAgentEc2Metadata(
    findInstanceId(),
    findAvailabilityZone(),
    findSubnet(),
    findVpc(),
    skipPrivateIp ? Optional.absent() : privateipOverride.or(findPrivateIp()));
}
 
源代码12 项目: emodb   文件: TemporaryFileScanWriter.java
protected TemporaryFileScanWriter(String type, int taskId, URI baseUri, Compression compression,
                                  MetricRegistry metricRegistry, Optional<Integer> maxOpenShards,
                                  ObjectMapper objectMapper) {
    super(type, taskId, baseUri, compression, metricRegistry);
    checkNotNull(maxOpenShards, "maxOpenShards");

    _maxOpenShards = maxOpenShards.or(DEFAULT_MAX_OPEN_SHARDS);
    checkArgument(_maxOpenShards > 0, "maxOpenShards <= 0");

    _openTransfers = metricRegistry.counter(MetricRegistry.name("bv.emodb.scan", "ScanUploader", "open-transfers"));
    _blockedNewShards = metricRegistry.counter(MetricRegistry.name("bv.emodb.scan", "ScanUploader", "blocked-new-shards"));
    _mapper = checkNotNull(objectMapper, "objectMapper");
}
 
源代码13 项目: emodb   文件: ScanUploadModule.java
@Inject
public QueueScanWorkflowProvider(@Global CuratorFramework curator, @ServerCluster String cluster,
                                 Client client, @Named ("ScannerAPIKey") String apiKey,
                                 @Named ("pendingScanRangeQueueName") Optional<String> pendingScanRangeQueueName,
                                 @Named ("completeScanRangeQueueName") Optional<String> completeScanRangeQueueName,
                                 Environment environment, MetricRegistry metricRegistry) {
    _curator = curator;
    _cluster = cluster;
    _client = client;
    _apiKey = apiKey;
    _environment = environment;
    _metricRegistry = metricRegistry;
    _pendingScanRangeQueueName = pendingScanRangeQueueName.or("emodb-pending-scan-ranges");
    _completeScanRangeQueueName = completeScanRangeQueueName.or("emodb-complete-scan-ranges");
}
 
源代码14 项目: emodb   文件: ScanUploadModule.java
@Inject
public SQSScanWorkflowProvider(@ServerCluster String cluster, AmazonSQS amazonSQS,
                               @Named ("pendingScanRangeQueueName") Optional<String> pendingScanRangeQueueName,
                               @Named ("completeScanRangeQueueName") Optional<String> completeScanRangeQueueName) {
    _amazonSQS = amazonSQS;
    _pendingScanRangeQueueName = pendingScanRangeQueueName.or(String.format("emodb-pending-scan-ranges-%s", cluster));
    _completeScanRangeQueueName = completeScanRangeQueueName.or(String.format("emodb-complete-scan-ranges-%s", cluster));
}
 
public TrackerBasedWatermarkManager(WatermarkStorage storage, FineGrainedWatermarkTracker watermarkTracker,
    long commitIntervalMillis, Optional<Logger> logger) {
  Preconditions.checkArgument(storage != null, "WatermarkStorage cannot be null");
  Preconditions.checkArgument(watermarkTracker != null, "WatermarkTracker cannot be null");
  _watermarkTracker = watermarkTracker;
  _watermarkStorage = storage;
  _commitIntervalMillis = commitIntervalMillis;
  _logger = logger.or(LoggerFactory.getLogger(TrackerBasedWatermarkManager.class));
  _watermarkCommitThreadPool = new ScheduledThreadPoolExecutor(1, ExecutorsUtils.newThreadFactory(logger,
      Optional.of("WatermarkManager-%d")));
  _retrievalStatus = new RetrievalStatus();
  _commitStatus = new CommitStatus();
}
 
private Signature.KeyPair getLocalKeys() throws Exception {
  Optional<String> ssbDir = Optional.fromNullable(System.getenv().get("ssb_dir"));
  Optional<String> homePath =
      Optional.fromNullable(System.getProperty("user.home")).transform(home -> home + "/.ssb");

  Optional<String> path = ssbDir.or(homePath);

  if (!path.isPresent()) {
    throw new Exception("Cannot find ssb directory config value");
  }

  String secretPath = path.get() + "/secret";
  File file = new File(secretPath);

  if (!file.exists()) {
    throw new Exception("Secret file does not exist");
  }

  Scanner s = new Scanner(file, UTF_8.name());
  s.useDelimiter("\n");

  ArrayList<String> list = new ArrayList<String>();
  while (s.hasNext()) {
    String next = s.next();

    // Filter out the comment lines
    if (!next.startsWith("#")) {
      list.add(next);
    }
  }

  String secretJSON = String.join("", list);

  ObjectMapper mapper = new ObjectMapper();

  HashMap<String, String> values = mapper.readValue(secretJSON, new TypeReference<Map<String, String>>() {});
  String pubKey = values.get("public").replace(".ed25519", "");
  String privateKey = values.get("private").replace(".ed25519", "");

  Bytes pubKeyBytes = Base64.decode(pubKey);
  Bytes privKeyBytes = Base64.decode(privateKey);

  Signature.PublicKey pub = Signature.PublicKey.fromBytes(pubKeyBytes);
  Signature.SecretKey secretKey = Signature.SecretKey.fromBytes(privKeyBytes);

  return new Signature.KeyPair(pub, secretKey);
}
 
源代码17 项目: cava   文件: PatchworkIntegrationTest.java
@Disabled
@Test
void runWithPatchWork(@VertxInstance Vertx vertx) throws Exception {
  String host = "localhost";
  int port = 8008;
  LoggerProvider loggerProvider = SimpleLogger.withLogLevel(Level.DEBUG).toPrintWriter(
      new PrintWriter(new BufferedWriter(new OutputStreamWriter(System.out, UTF_8))));
  LoglLogDelegateFactory.setProvider(loggerProvider);

  Optional<String> ssbDir = Optional.fromNullable(System.getenv().get("ssb_dir"));
  Optional<String> homePath =
      Optional.fromNullable(System.getProperty("user.home")).transform(home -> home + "/.ssb");

  Optional<String> path = ssbDir.or(homePath);

  if (!path.isPresent()) {
    throw new Exception("Cannot find ssb directory config value");
  }

  String secretPath = path.get() + "/secret";
  File file = new File(secretPath);

  if (!file.exists()) {
    throw new Exception("Secret file does not exist");
  }

  Scanner s = new Scanner(file, UTF_8.name());
  s.useDelimiter("\n");

  ArrayList<String> list = new ArrayList<String>();
  while (s.hasNext()) {
    String next = s.next();

    // Filter out the comment lines
    if (!next.startsWith("#")) {
      list.add(next);
    }
  }

  String secretJSON = String.join("", list);

  ObjectMapper mapper = new ObjectMapper();

  HashMap<String, String> values = mapper.readValue(secretJSON, new TypeReference<Map<String, String>>() {});
  String pubKey = values.get("public").replace(".ed25519", "");
  String privateKey = values.get("private").replace(".ed25519", "");

  Bytes pubKeyBytes = Base64.decode(pubKey);
  Bytes privKeyBytes = Base64.decode(privateKey);

  Signature.PublicKey pub = Signature.PublicKey.fromBytes(pubKeyBytes);
  Signature.SecretKey secretKey = Signature.SecretKey.fromBytes(privKeyBytes);

  Signature.KeyPair keyPair = new Signature.KeyPair(pub, secretKey);
  String networkKeyBase64 = "1KHLiKZvAvjbY1ziZEHMXawbCEIM6qwjCDm3VYRan/s=";

  String serverPublicKey = pubKey; // TODO use your own identity public key here.
  Signature.PublicKey publicKey = Signature.PublicKey.fromBytes(Base64.decode(serverPublicKey));

  Bytes32 networkKeyBytes32 = Bytes32.wrap(Base64.decode(networkKeyBase64));

  SecureScuttlebuttVertxClient secureScuttlebuttVertxClient =
      new SecureScuttlebuttVertxClient(loggerProvider, vertx, keyPair, networkKeyBytes32);

  AsyncResult<ClientHandler> onConnect =
      secureScuttlebuttVertxClient.connectTo(port, host, publicKey, MyClientHandler::new);

  MyClientHandler clientHandler = (MyClientHandler) onConnect.get();
  assertTrue(onConnect.isDone());
  assertFalse(onConnect.isCompletedExceptionally());
  Thread.sleep(1000);
  assertNotNull(clientHandler);
  // An RPC command that just tells us our public key (like ssb-server whoami on the command line.)
  String rpcRequestBody = "{\"name\": [\"whoami\"],\"type\": \"async\",\"args\":[]}";
  Bytes rpcRequest = RPCCodec.encodeRequest(rpcRequestBody, RPCFlag.BodyType.JSON);

  System.out.println("Attempting RPC request...");
  clientHandler.sendMessage(rpcRequest);
  for (int i = 0; i < 10; i++) {
    clientHandler.sendMessage(RPCCodec.encodeRequest(rpcRequestBody, RPCFlag.BodyType.JSON));
  }

  Thread.sleep(10000);

  secureScuttlebuttVertxClient.stop().join();
}
 
源代码18 项目: nifi   文件: DataDogReportingTask.java
private String buildMetricName(Optional<String> processorName, String metricName) {
    return metricsPrefix + "." + processorName.or("flow") + "." + metricName;
}
 
源代码19 项目: Baragon   文件: FilesystemConfigHelper.java
public void apply(ServiceContext context, Optional<BaragonService> maybeOldService, boolean revertOnFailure, boolean noReload, boolean noValidate, boolean delayReload, Optional<Integer> batchItemNumber) throws InvalidConfigException, LbAdapterExecuteException, IOException, MissingTemplateException, InterruptedException, LockTimeoutException {
  final BaragonService service = context.getService();
  final BaragonService oldService = maybeOldService.or(service);

  LOG.info("Going to apply {}: {}", service.getServiceId(), Joiner.on(", ").join(context.getUpstreams()));
  final boolean oldServiceExists = configsExist(oldService);
  final boolean previousConfigsExist = configsExist(service);

  Collection<BaragonConfigFile> newConfigs = configGenerator.generateConfigsForProject(context);


  if (!agentLock.tryLock(agentLockTimeoutMs, TimeUnit.MILLISECONDS)) {
    LockTimeoutException lte = new LockTimeoutException("Timed out waiting to acquire lock", agentLock);
    LOG.warn("Failed to acquire lock for service config apply ({})", service.getServiceId(), lte);
    throw lte;
  }

  LOG.debug("({}) Acquired agent lock, applying configs", service.getServiceId());

  try {
    if (configsMatch(newConfigs, readConfigs(oldService))) {
      LOG.info("({}) Configs are unchanged, skipping apply", service.getServiceId());
      if (!noReload && !delayReload && batchItemNumber.isPresent() && batchItemNumber.get() > 1) {
        LOG.debug("({}) Item is the last in a batch, reloading configs", service.getServiceId());
        adapter.reloadConfigs();
      }
      return;
    }

    // Backup configs
    LOG.debug("({}) Backing up configs", service.getServiceId());
    if (revertOnFailure) {
      backupConfigs(service);
      if (oldServiceExists) {
        backupConfigs(oldService);
      }
    }

    // Write & check the configs
    if (context.isPresent()) {
      LOG.debug("({}) Writing new configs", service.getServiceId());
      writeConfigs(newConfigs);
      //If the new service id for this base path is different, remove the configs for the old service id
      if (oldServiceExists && !oldService.getServiceId().equals(service.getServiceId())) {
        LOG.debug("({}) Removing old configs from renamed service", service.getServiceId());
        remove(oldService);
      }
    } else {
      LOG.debug("({}) Removing configs from deleted service", service.getServiceId());
      remove(service);
    }

    if (!noValidate) {
      LOG.debug("({}) Checking configs", service.getServiceId());
      adapter.checkConfigs();
    } else {
      LOG.debug("({}) Not validating configs due to 'noValidate' specified in request", service.getServiceId());
    }
    if (!noReload && !delayReload) {
      LOG.debug("({}) Reloading configs", service.getServiceId());
      adapter.reloadConfigs();
    } else {
      LOG.debug("({}) Not reloading configs: {}", service.getServiceId(), noReload ? "'noReload' specified in request" : "Will reload at end of request batch");
    }
  } catch (Exception e) {
    LOG.error("Caught exception while writing configs for {}, reverting to backups!", service.getServiceId(), e);
    saveAsFailed(service);
    // Restore configs
    if (revertOnFailure) {
      if (oldServiceExists && !oldService.equals(service)) {
        restoreConfigs(oldService);
      }
      if (previousConfigsExist) {
        restoreConfigs(service);
      } else {
        remove(service);
      }
    }

    throw new RuntimeException(e);
  } finally {
    agentLock.unlock();
  }

  removeBackupConfigs(oldService);
  LOG.info(String.format("Apply finished for %s", service.getServiceId()));
}
 
/**
 * Create and attach {@link PathAlterationObserverScheduler}s for the given
 * root directory and any nested subdirectories under the root directory to the given
 * {@link PathAlterationObserverScheduler}.
 * @param detector  a {@link PathAlterationObserverScheduler}
 * @param listener a {@link org.apache.gobblin.util.filesystem.PathAlterationListener}
 * @param observerOptional Optional observer object. For testing routine, this has been initialized by user.
 *                         But for general usage, the observer object is created inside this method.
 * @param rootDirPath root directory
 * @throws IOException
 */
public void addPathAlterationObserver(PathAlterationListener listener,
    Optional<PathAlterationObserver> observerOptional, Path rootDirPath)
    throws IOException {
  PathAlterationObserver observer = observerOptional.or(new PathAlterationObserver(rootDirPath));
  observer.addListener(listener);
  addObserver(observer);
}