下面列出了怎么用com.google.common.io.Closer的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Parses a {@link org.apache.gobblin.metrics.MetricReport} from a byte array Avro serialization.
* @param reuse MetricReport to reuse.
* @param bytes Input bytes.
* @param schemaId Expected schemaId.
* @return MetricReport.
* @throws java.io.IOException
*/
public synchronized static MetricReport deserializeReportFromAvroSerialization(MetricReport reuse, byte[] bytes,
@Nullable String schemaId)
throws IOException {
if (!READER.isPresent()) {
READER = Optional.of(new SpecificDatumReader<>(MetricReport.class));
}
Closer closer = Closer.create();
try {
DataInputStream inputStream = closer.register(new DataInputStream(new ByteArrayInputStream(bytes)));
if (schemaId != null) {
readAndVerifySchemaId(inputStream, schemaId);
} else {
readAndVerifySchemaVersion(inputStream);
}
// Decode the rest
Decoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
return READER.get().read(reuse, decoder);
} catch (Throwable t) {
throw closer.rethrow(t);
} finally {
closer.close();
}
}
@Override
public void close()
{
try (Closer closer = Closer.create()) {
if (hashAggregationBuilder != null) {
closer.register(hashAggregationBuilder::close);
}
merger.ifPresent(closer::register);
spiller.ifPresent(closer::register);
mergeHashSort.ifPresent(closer::register);
closer.register(() -> localUserMemoryContext.setBytes(0));
closer.register(() -> localRevocableMemoryContext.setBytes(0));
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void close()
{
if (state == State.CLOSED) {
return;
}
// close() can be called in any state, due for example to query failure, and must clean resource up unconditionally
lookupSourceSupplier = null;
state = State.CLOSED;
finishMemoryRevoke = finishMemoryRevoke.map(ifPresent -> () -> {});
try (Closer closer = Closer.create()) {
closer.register(index::clear);
spiller.ifPresent(closer::register);
closer.register(() -> localUserMemoryContext.setBytes(0));
closer.register(() -> localRevocableMemoryContext.setBytes(0));
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void close()
throws IOException
{
try (Closer closer = Closer.create()) {
closer.register(() -> {
if (isDirectory(baseDataDir) && !preserveData) {
deleteRecursively(baseDataDir, ALLOW_INSECURE);
}
});
closer.register(() -> {
if (lifeCycleManager != null) {
lifeCycleManager.stop();
}
});
}
}
/**
* Get the latest avro schema for a directory
* @param directory the input dir that contains avro files
* @param fs the {@link FileSystem} for the given directory.
* @param latest true to return latest schema, false to return oldest schema
* @return the latest/oldest schema in the directory
* @throws IOException
*/
public static Schema getDirectorySchema(Path directory, FileSystem fs, boolean latest) throws IOException {
Schema schema = null;
try (Closer closer = Closer.create()) {
List<FileStatus> files = getDirectorySchemaHelper(directory, fs);
if (files == null || files.size() == 0) {
LOG.warn("There is no previous avro file in the directory: " + directory);
} else {
FileStatus file = latest ? files.get(0) : files.get(files.size() - 1);
LOG.debug("Path to get the avro schema: " + file);
FsInput fi = new FsInput(file.getPath(), fs.getConf());
GenericDatumReader<GenericRecord> genReader = new GenericDatumReader<>();
schema = closer.register(new DataFileReader<>(fi, genReader)).getSchema();
}
} catch (IOException ioe) {
throw new IOException("Cannot get the schema for directory " + directory, ioe);
}
return schema;
}
@SuppressWarnings("unchecked")
private Optional<CommitSequence.Builder> generateCommitSequenceBuilder(JobState.DatasetState datasetState,
Collection<TaskState> taskStates) {
try (Closer closer = Closer.create()) {
Class<? extends CommitSequencePublisher> dataPublisherClass = (Class<? extends CommitSequencePublisher>) Class
.forName(datasetState
.getProp(ConfigurationKeys.DATA_PUBLISHER_TYPE, ConfigurationKeys.DEFAULT_DATA_PUBLISHER_TYPE));
CommitSequencePublisher publisher = (CommitSequencePublisher) closer
.register(DataPublisher.getInstance(dataPublisherClass, this.jobContext.getJobState()));
publisher.publish(taskStates);
return publisher.getCommitSequenceBuilder();
} catch (Throwable t) {
log.error("Failed to generate commit sequence", t);
setTaskFailureException(datasetState.getTaskStates(), t);
throw Throwables.propagate(t);
}
}
private void publishTaskData()
throws IOException {
Closer closer = Closer.create();
try {
Class<? extends DataPublisher> dataPublisherClass = getTaskPublisherClass();
SingleTaskDataPublisher publisher =
closer.register(SingleTaskDataPublisher.getInstance(dataPublisherClass, this.taskState));
LOG.info("Publishing data from task " + this.taskId);
publisher.publish(this.taskState);
} catch (ClassCastException e) {
LOG.error(String.format("To publish data in task, the publisher class must extend %s",
SingleTaskDataPublisher.class.getSimpleName()), e);
this.taskState.setTaskFailureException(e);
throw closer.rethrow(e);
} catch (Throwable t) {
this.taskState.setTaskFailureException(t);
throw closer.rethrow(t);
} finally {
closer.close();
}
}
@AfterClass
public void cleanup()
{
if (servers != null) {
try (Closer closer = Closer.create()) {
for (DriftServer server : servers) {
closer.register(() -> server.shutdown());
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
servers = null;
}
}
@PreDestroy
public void stopRubix()
throws IOException
{
try (Closer closer = Closer.create()) {
closer.register(() -> {
if (bookKeeperServer != null) {
// This might throw NPE if Thrift server hasn't started yet (it's initialized
// asynchronously from BookKeeperServer thread).
// TODO: improve stopping of BookKeeperServer server in Rubix
bookKeeperServer.stopServer();
bookKeeperServer = null;
}
});
closer.register(LocalDataTransferServer::stopServer);
}
}
/**
* Update the log4j configuration.
*
* @param targetClass the target class used to get the original log4j configuration file as a resource
* @param log4jPath the custom log4j configuration properties file path
* @param log4jFileName the custom log4j configuration properties file name
* @throws IOException if there's something wrong with updating the log4j configuration
*/
public static void updateLog4jConfiguration(Class<?> targetClass, String log4jPath, String log4jFileName)
throws IOException {
Closer closer = Closer.create();
try {
InputStream fileInputStream = closer.register(new FileInputStream(log4jPath));
InputStream inputStream = closer.register(targetClass.getResourceAsStream("/" + log4jFileName));
Properties customProperties = new Properties();
customProperties.load(fileInputStream);
Properties originalProperties = new Properties();
originalProperties.load(inputStream);
for (Entry<Object, Object> entry : customProperties.entrySet()) {
originalProperties.setProperty(entry.getKey().toString(), entry.getValue().toString());
}
LogManager.resetConfiguration();
PropertyConfigurator.configure(originalProperties);
} catch (Throwable t) {
throw closer.rethrow(t);
} finally {
closer.close();
}
}
/**
* Load stopwords from the specified file located in the classpath.
* <p>
* If a directory name is specified, e.g: <code>tmp/stopwords.txt</code> that path will be used when searching for the resource. Otherwise, the package
* contianing the DefaultTokenSearch class may be used.
* <p>
* The current thread's context classloader will be used to load the specified filename as a resource.
*
* @param filename
* the filename containing the stoplist to load, located using the rules described above.
* @return a lucene {@code CharArraySet} containing the stopwords. This is configured to be case insensitive.
* @throws IOException
* if there is a problem finding or loading the specified stop word file..
*/
public static CharArraySet loadStopWords(String filename) throws IOException {
Closer closer = Closer.create();
try {
CharArraySet stopSet = new CharArraySet(16, true /* ignore case */);
String pkg = Factory.class.getPackage().getName().replace('.', '/');
String resource = filename.indexOf("/") > -1 ? filename : (pkg + "/" + filename);
InputStream resourceStream = Thread.currentThread().getContextClassLoader().getResourceAsStream(resource);
logger.info("Loading stopwords file " + filename + " from resource " + resource);
if (resourceStream == null) {
throw new FileNotFoundException("Unable to load stopword file as resource " + filename);
}
Reader reader = IOUtils.getDecodingReader(resourceStream, StandardCharsets.UTF_8);
closer.register(reader);
CharArraySet set = WordlistLoader.getWordSet(reader, "#", stopSet);
logger.info("Loaded " + set.size() + " stopwords from " + filename + " (" + resource + ")");
return set;
} finally {
closer.close();
}
}
private ImmutableList<AbstractProfileFileElement> loadProfileFiles(Multimap<Path, String> pathsToAttempt)
throws IOException {
ImmutableList.Builder<AbstractProfileFileElement> result = ImmutableList.builder();
try (Closer closer = Closer.create()) {
for (Map.Entry<Path, Collection<String>> entry : pathsToAttempt.asMap().entrySet()) {
Path base = entry.getKey();
if (Files.isRegularFile(base)) {
FileSystem sourceFs = FileSystems.newFileSystem(base, getClass().getClassLoader());
closer.register(sourceFs);
base = getOnlyElement(sourceFs.getRootDirectories());
}
for (String path : entry.getValue()) {
AbstractProfileFileElement element = loadProfileFile(base, path);
if (element != null) {
result.add(element);
}
}
}
}
return result.build();
}
public Schema load() throws IOException {
if (sources.isEmpty()) {
throw new IllegalStateException("No sources added.");
}
try (Closer closer = Closer.create()) {
// Map the physical path to the file system root. For regular directories the key and the
// value are equal. For ZIP files the key is the path to the .zip, and the value is the root
// of the file system within it.
Map<Path, Path> directories = new LinkedHashMap<>();
for (Path source : sources) {
if (Files.isRegularFile(source)) {
FileSystem sourceFs = FileSystems.newFileSystem(source, getClass().getClassLoader());
closer.register(sourceFs);
directories.put(source, getOnlyElement(sourceFs.getRootDirectories()));
} else {
directories.put(source, source);
}
}
return loadFromDirectories(directories);
}
}
@Override
@NonNull
public Properties loadProperties(@NonNull File file) {
Properties props = new Properties();
Closer closer = Closer.create();
try {
FileInputStream fis = closer.register(new FileInputStream(file));
props.load(fis);
} catch (IOException ignore) {
} finally {
try {
closer.close();
} catch (IOException e) {
}
}
return props;
}
private static ImmutableMap<String, LazyClassEntry> buildClassIndex(
ImmutableSet<Path> jars, Closer closer, Predicate<Path> isDirect) throws IOException {
HashMap<String, LazyClassEntry> result = new HashMap<>();
for (Path jarPath : jars) {
boolean jarIsDirect = isDirect.test(jarPath);
try {
ZipFile zipFile = closer.register(new ZipFile(jarPath.toFile()));
zipFile
.stream()
.forEach(
entry -> {
String name = entry.getName();
if (!name.endsWith(".class")) {
return; // Not a class file.
}
String internalName = name.substring(0, name.lastIndexOf('.'));
result.computeIfAbsent(
internalName,
key -> new LazyClassEntry(key, zipFile, jarPath, jarIsDirect));
});
} catch (Throwable e) {
throw new RuntimeException("Error in reading zip file " + jarPath, e);
}
}
return ImmutableMap.copyOf(result);
}
/**
* Retrives a {@link Token} from a given sequence file for a specified user. The sequence file should contain a list
* of key, value pairs where each key corresponds to a user and each value corresponds to a {@link Token} for that
* user.
*
* @param userNameKey The name of the user to retrieve a {@link Token} for
* @param tokenFilePath The path to the sequence file containing the {@link Token}s
*
* @return A {@link Token} for the given user name
*/
public static Optional<Token<?>> getTokenFromSeqFile(String userNameKey, Path tokenFilePath) throws IOException {
log.info("Reading tokens from sequence file " + tokenFilePath);
try (Closer closer = Closer.create()) {
FileSystem localFs = FileSystem.getLocal(new Configuration());
@SuppressWarnings("deprecation")
SequenceFile.Reader tokenReader =
closer.register(new SequenceFile.Reader(localFs, tokenFilePath, localFs.getConf()));
Text key = new Text();
Token<?> value = new Token<>();
while (tokenReader.next(key, value)) {
log.debug("Found token for user: " + key);
if (key.toString().equals(userNameKey)) {
return Optional.<Token<?>> of(value);
}
}
}
log.warn("Did not find any tokens for user " + userNameKey);
return Optional.absent();
}
public static List<String> loadFile(String fileName) throws IOException {
ArrayList<String> strings = Lists.newArrayList();
Closer closer = Closer.create();
try {
InputStream inputStream = ConfigUtils.class.getResourceAsStream(fileName);
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
closer.register(bufferedReader);
String line;
while ((line = bufferedReader.readLine()) != null) {
if (Strings.isNullOrEmpty(line) || line.startsWith("#")) {
continue;
}
strings.add(line);
}
} catch (IOException e) {
logger.error("loadFile {} error. error is {}.", fileName, e);
throw e;
} finally {
closer.close();
}
return strings;
}
private static DockerRunner createDockerRunner(
String id,
Environment environment,
StateManager stateManager,
Stats stats,
Debug debug,
Set<String> secretWhitelist,
Time time) {
final Config config = environment.config();
final Closer closer = environment.closer();
final String styxEnvironment = config.getString(STYX_ENVIRONMENT);
final NamespacedKubernetesClient kubernetes = closer.register(getKubernetesClient(config, id));
final ServiceAccountKeyManager serviceAccountKeyManager = createServiceAccountKeyManager();
var fabric8Client = TracingProxy.instrument(Fabric8KubernetesClient.class,
MeteredFabric8KubernetesClientProxy.instrument(
Fabric8KubernetesClient.of(kubernetes), stats, time));
return closer.register(DockerRunner.kubernetes(id, fabric8Client, stateManager, stats,
serviceAccountKeyManager, debug, styxEnvironment, secretWhitelist));
}
/**
* Read data from the original input stream and pipe it to the compressing stream until fully read.
*/
private void streamAndCompressInput() {
try {
byte[] newline = "\n".getBytes(Charsets.UTF_8);
while (!_closed && fetchNextRow()) {
_rawOut.write(_buffer.array(), 0, _buffer.limit());
_rawOut.write(newline);
}
_rawOut.close();
} catch (Exception e) {
try {
Closer closer = Closer.create();
closer.register(_rawOut);
closer.register(_gzipIn);
closer.close();
} catch (IOException ignore) {
// Ignore exceptions closing, don't mask the original exception.
}
if (!_closed) {
_inputException = e instanceof IOException ? (IOException ) e : new IOException(e);
}
}
}
public void closeTest() throws IOException, FileBasedHelperException {
State state = new State();
setUp();
GoogleDriveFsHelper fsHelper = new GoogleDriveFsHelper(state, client, Closer.create());
Get getResult = mock(Get.class);
InputStream is = mock(InputStream.class);
when(client.files()).thenReturn(files);
when(files.get(anyString())).thenReturn(getResult);
when(getResult.executeMediaAsInputStream()).thenReturn(is);
fsHelper.getFileStream("test");
fsHelper.close();
verify(is, times(1)).close();
}
@Test
public void testSendShutdownRequest() throws Exception {
Logger log = LoggerFactory.getLogger("testSendShutdownRequest");
Closer closer = Closer.create();
try {
CuratorFramework curatorFramework = TestHelper.createZkClient(this.testingZKServer, closer);
final GetInstanceMessageNumFunc getMessageNumFunc =
new GetInstanceMessageNumFunc(GobblinClusterManagerTest.class.getSimpleName(),
curatorFramework);
AssertWithBackoff assertWithBackoff =
AssertWithBackoff.create().logger(log).timeoutMs(30000);
this.gobblinClusterManager.sendShutdownRequest();
Assert.assertEquals(curatorFramework.checkExists().forPath(String
.format("/%s/INSTANCES/%s/MESSAGES", GobblinClusterManagerTest.class.getSimpleName(),
TestHelper.TEST_HELIX_INSTANCE_NAME)).getVersion(), 0);
assertWithBackoff.assertEquals(getMessageNumFunc, 1, "1 message queued");
// Give Helix sometime to handle the message
assertWithBackoff.assertEquals(getMessageNumFunc, 0, "all messages processed");
} finally {
closer.close();
}
}
private void pauseTransactionAndProcessMemberRepositories(final StorageTx tx,
final List<Repository> members,
final Closer closer,
final ArrayList<JsonParser> parsers)
throws IOException
{
UnitOfWork groupWork = UnitOfWork.pause();
try {
tx.commit();
for (Repository repository : Lists.reverse(members)) {
processMember(closer, parsers, repository);
}
}
finally {
UnitOfWork.resume(groupWork);
tx.begin();
}
}
private ByteBuffer getSecurityTokens() throws IOException {
Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
Closer closer = Closer.create();
try {
DataOutputBuffer dataOutputBuffer = closer.register(new DataOutputBuffer());
credentials.writeTokenStorageToStream(dataOutputBuffer);
// Remove the AM->RM token so that containers cannot access it
Iterator<Token<?>> tokenIterator = credentials.getAllTokens().iterator();
while (tokenIterator.hasNext()) {
Token<?> token = tokenIterator.next();
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
tokenIterator.remove();
}
}
return ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength());
} catch (Throwable t) {
throw closer.rethrow(t);
} finally {
closer.close();
}
}
private static void zipFiles(List<File> files, String basepath, OutputStream os) throws IOException {
Closer closer = Closer.create();
try {
ZipOutputStream zos = new ZipOutputStream(os);
closer.register(zos);
for (File file : files) {
if (file.isFile()) {
FileInputStream inputStream = new FileInputStream(file);
closer.register(inputStream);
writeZipEntry(zipEntry(basepath, file), inputStream, zos);
} else {
ZipEntry ze = zipEntry(basepath, file);
logger.trace("Adding directory zip entry: " + ze.toString());
zos.putNextEntry(ze);
}
}
} catch (IOException ioe) {
throw closer.rethrow(ioe);
} finally {
closer.close();
}
}
public static void unzip(InputStream is, File outFile) throws IOException {
Closer closer = Closer.create();
try {
ZipInputStream zis = new ZipInputStream(is);
closer.register(zis);
ZipEntry zipEntry = zis.getNextEntry();
while (zipEntry != null) {
File entryFile = new File(outFile, zipEntry.getName());
File entryContainer = entryFile.getParentFile();
if (!entryContainer.exists()) {
entryContainer.mkdirs();
}
if (!entryFile.isDirectory()) {
FileOutputStream outputStream = new FileOutputStream(entryFile);
closer.register(outputStream);
Zipper.ZIP.unzipEntry(zis, outputStream);
}
zipEntry = zis.getNextEntry();
}
} catch (IOException ioe) {
throw closer.rethrow(ioe);
} finally {
closer.close();
}
}
@Override
public Closeable acquirePermits(long permits) throws InterruptedException {
Closer closer = Closer.create();
for (Limiter limiter : this.underlyingLimiters) {
Closeable permit = limiter.acquirePermits(permits);
if (permit == null) {
try {
closer.close();
} catch (IOException ioe) {
throw new RuntimeException("Could not return intermediate permits.");
}
return null;
}
closer.register(permit);
}
return closer;
}
/**
* Automatically detects charset of the input stream, reads it, decodes it, and returns the
* resulting string with a newline appended if the original stream is non-empty. Does not close
* the provided input stream.
*
* @throws IOException if there is an error
*/
@SuppressWarnings("PMD.CloseResource") // PMD does not understand Closer.
static @Nonnull String decodeStreamAndAppendNewline(@Nonnull InputStream inputStream)
throws IOException {
byte[] rawBytes = IOUtils.toByteArray(inputStream);
Charset cs = Charset.forName(new CharsetDetector().setText(rawBytes).detect().getName());
try (Closer closer = Closer.create()) {
InputStream inputByteStream =
closer.register(bomInputStream(new ByteArrayInputStream(rawBytes)));
InputStream finalInputStream =
closer.register(
rawBytes.length > 0
? new SequenceInputStream(
inputByteStream,
closer.register(bomInputStream(new ByteArrayInputStream("\n".getBytes(cs)))))
: inputByteStream);
return new String(IOUtils.toByteArray(finalInputStream), cs);
}
}
@Test
public void testTarGzCopy() throws Exception {
Closer closer = Closer.create();
try {
JobLauncher jobLauncher = closer.register(JobLauncherFactory.newJobLauncher(gobblinProps, jobProps));
jobLauncher.launchJob(null);
String file1Path =
gobblinProps.getProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR) + "/LogData/sub1/sub2/text1.txt";
String file2Path =
gobblinProps.getProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR) + "/LogData/sub1/sub2/text2.txt";
FileSystem fs = FileSystem.getLocal(new Configuration());
Assert.assertEquals(IOUtils.toString(closer.register(fs.open(new Path(file1Path)))), "text1");
Assert.assertEquals(IOUtils.toString(closer.register(fs.open(new Path(file2Path)))), "text2");
} finally {
closer.close();
}
}
public static String download(String parentPath, String fileName, InputStream in) {
Closer closer = Closer.create();
try {
File imageDir = new File(parentPath);
if(!imageDir.exists()) {
imageDir.mkdirs();
}
File imageFile = new File(imageDir, fileName);
Files.write(ByteStreams.toByteArray(in), imageFile);
return imageFile.getAbsolutePath();
} catch(Exception ex) {
ex.printStackTrace();
return null;
} finally {
try {
closer.close();
} catch (IOException e) {
closer = null;
}
}
}
public void close()
{
try (Closer closer = Closer.create()) {
closer.register(userAggregateMemoryContext::close);
closer.register(revocableAggregateMemoryContext::close);
closer.register(systemAggregateMemoryContext::close);
closer.register(userLocalMemoryContext::close);
closer.register(revocableLocalMemoryContext::close);
closer.register(systemLocalMemoryContext::close);
}
catch (IOException e) {
throw new RuntimeException("Exception closing memory tracking context", e);
}
}