下面列出了com.google.common.io.Closer#register ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
@NonNull
public Properties loadProperties(@NonNull File file) {
Properties props = new Properties();
Closer closer = Closer.create();
try {
FileInputStream fis = closer.register(new FileInputStream(file));
props.load(fis);
} catch (IOException ignore) {
} finally {
try {
closer.close();
} catch (IOException e) {
}
}
return props;
}
@Override
@NonNull
public Properties loadProperties(@NonNull File file) {
Properties props = new Properties();
Closer closer = Closer.create();
try {
FileInputStream fis = closer.register(new FileInputStream(file));
props.load(fis);
} catch (IOException ignore) {
} finally {
try {
closer.close();
} catch (IOException e) {
}
}
return props;
}
/**
* Update the log4j configuration.
*
* @param targetClass the target class used to get the original log4j configuration file as a resource
* @param log4jPath the custom log4j configuration properties file path
* @param log4jFileName the custom log4j configuration properties file name
* @throws IOException if there's something wrong with updating the log4j configuration
*/
public static void updateLog4jConfiguration(Class<?> targetClass, String log4jPath, String log4jFileName)
throws IOException {
Closer closer = Closer.create();
try {
InputStream fileInputStream = closer.register(new FileInputStream(log4jPath));
InputStream inputStream = closer.register(targetClass.getResourceAsStream("/" + log4jFileName));
Properties customProperties = new Properties();
customProperties.load(fileInputStream);
Properties originalProperties = new Properties();
originalProperties.load(inputStream);
for (Entry<Object, Object> entry : customProperties.entrySet()) {
originalProperties.setProperty(entry.getKey().toString(), entry.getValue().toString());
}
LogManager.resetConfiguration();
PropertyConfigurator.configure(originalProperties);
} catch (Throwable t) {
throw closer.rethrow(t);
} finally {
closer.close();
}
}
private static void zipFiles(List<File> files, String basepath, OutputStream os) throws IOException {
Closer closer = Closer.create();
try {
ZipOutputStream zos = new ZipOutputStream(os);
closer.register(zos);
for (File file : files) {
if (file.isFile()) {
FileInputStream inputStream = new FileInputStream(file);
closer.register(inputStream);
writeZipEntry(zipEntry(basepath, file), inputStream, zos);
} else {
ZipEntry ze = zipEntry(basepath, file);
logger.trace("Adding directory zip entry: " + ze.toString());
zos.putNextEntry(ze);
}
}
} catch (IOException ioe) {
throw closer.rethrow(ioe);
} finally {
closer.close();
}
}
private static ImmutableMap<String, LazyClassEntry> buildClassIndex(
ImmutableSet<Path> jars, Closer closer, Predicate<Path> isDirect) throws IOException {
HashMap<String, LazyClassEntry> result = new HashMap<>();
for (Path jarPath : jars) {
boolean jarIsDirect = isDirect.test(jarPath);
try {
ZipFile zipFile = closer.register(new ZipFile(jarPath.toFile()));
zipFile
.stream()
.forEach(
entry -> {
String name = entry.getName();
if (!name.endsWith(".class")) {
return; // Not a class file.
}
String internalName = name.substring(0, name.lastIndexOf('.'));
result.computeIfAbsent(
internalName,
key -> new LazyClassEntry(key, zipFile, jarPath, jarIsDirect));
});
} catch (Throwable e) {
throw new RuntimeException("Error in reading zip file " + jarPath, e);
}
}
return ImmutableMap.copyOf(result);
}
@Test
public void testTarGzCopy() throws Exception {
Closer closer = Closer.create();
try {
JobLauncher jobLauncher = closer.register(JobLauncherFactory.newJobLauncher(gobblinProps, jobProps));
jobLauncher.launchJob(null);
String file1Path =
gobblinProps.getProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR) + "/LogData/sub1/sub2/text1.txt";
String file2Path =
gobblinProps.getProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR) + "/LogData/sub1/sub2/text2.txt";
FileSystem fs = FileSystem.getLocal(new Configuration());
Assert.assertEquals(IOUtils.toString(closer.register(fs.open(new Path(file1Path)))), "text1");
Assert.assertEquals(IOUtils.toString(closer.register(fs.open(new Path(file2Path)))), "text2");
} finally {
closer.close();
}
}
@Test(dependsOnMethods = {"testSetAndGet"})
public void testSerDe()
throws IOException {
Closer closer = Closer.create();
try {
ByteArrayOutputStream baos = closer.register(new ByteArrayOutputStream());
DataOutputStream dos = closer.register(new DataOutputStream(baos));
this.jobState.write(dos);
ByteArrayInputStream bais = closer.register((new ByteArrayInputStream(baos.toByteArray())));
DataInputStream dis = closer.register((new DataInputStream(bais)));
JobState newJobState = new JobState();
newJobState.readFields(dis);
doAsserts(newJobState, true, false);
} catch (Throwable t) {
throw closer.rethrow(t);
} finally {
closer.close();
}
}
/**
* Read data from the original input stream and pipe it to the compressing stream until fully read.
*/
private void streamAndCompressInput() {
try {
byte[] newline = "\n".getBytes(Charsets.UTF_8);
while (!_closed && fetchNextRow()) {
_rawOut.write(_buffer.array(), 0, _buffer.limit());
_rawOut.write(newline);
}
_rawOut.close();
} catch (Exception e) {
try {
Closer closer = Closer.create();
closer.register(_rawOut);
closer.register(_gzipIn);
closer.close();
} catch (IOException ignore) {
// Ignore exceptions closing, don't mask the original exception.
}
if (!_closed) {
_inputException = e instanceof IOException ? (IOException ) e : new IOException(e);
}
}
}
public static void zip(FileObject root, List<FileObject> files, OutputStream out) throws IOException {
String basePath = root.getName().getPath();
Closer closer = Closer.create();
try {
ZipOutputStream zos = new ZipOutputStream(out);
closer.register(zos);
for (FileObject fileToCopy : files) {
ZipEntry zipEntry = zipEntry(basePath, fileToCopy);
zos.putNextEntry(zipEntry);
copyFileContents(fileToCopy, zos);
zos.flush();
zos.closeEntry();
}
} catch (IOException e) {
throw closer.rethrow(e);
} finally {
closer.close();
}
}
@Test
public void testServer() throws StreamException, IOException {
Closer closer = Closer.create();
try {
File tmpFile = new File("./target/tmp/StreamServerTest");
tmpFile.mkdirs();
IndexServer indexServer = new TestIndexServer();
StreamProcessor streamProcessor = new StreamProcessor(indexServer, tmpFile);
int timeout = 3000000;
String classLoaderId = UUID.randomUUID().toString();
StreamServer server = closer.register(new StreamServer(0, 100, streamProcessor));
server.start();
int port = server.getPort();
StreamClient client = closer.register(new StreamClient("localhost", port, timeout));
assertFalse(client.isClassLoaderAvailable(classLoaderId));
client.loadJars(classLoaderId, getTestJar());
String table = "test";
String shard = "shard";
String user = "test";
Map<String, String> userAttributes = new HashMap<String, String>();
StreamSplit split = new StreamSplit(table, shard, classLoaderId, user, userAttributes);
Iterable<String> it = client.executeStream(split, new StreamFunction<String>() {
@Override
public void call(IndexContext indexContext, StreamWriter<String> writer) throws Exception {
writer.write("test");
}
});
Iterator<String> iterator = it.iterator();
assertTrue(iterator.hasNext());
assertEquals("test", iterator.next());
assertFalse(iterator.hasNext());
} finally {
closer.close();
}
}
private static @Nullable Manifest getManifest(@Nullable URL url) throws IOException {
if (url == null) {
return null;
}
// Closer is used to simulate Java 7 try-with-resources
Closer closer = Closer.create();
try {
InputStream manifestIn = closer.register(url.openStream());
return new Manifest(manifestIn);
} catch (Throwable t) {
throw closer.rethrow(t);
} finally {
closer.close();
}
}
@Test(dependsOnMethods = {"testSetAndGet"})
public void testSerDe()
throws IOException {
Closer closer = Closer.create();
try {
ByteArrayOutputStream baos = closer.register(new ByteArrayOutputStream());
DataOutputStream dos = closer.register(new DataOutputStream(baos));
this.taskState.write(dos);
ByteArrayInputStream bais = closer.register((new ByteArrayInputStream(baos.toByteArray())));
DataInputStream dis = closer.register((new DataInputStream(bais)));
TaskState newTaskState = new TaskState();
newTaskState.readFields(dis);
Assert.assertEquals(newTaskState.getJobId(), "Job-1");
Assert.assertEquals(newTaskState.getTaskId(), "Task-1");
Assert.assertEquals(this.taskState.getHighWaterMark(), 2000);
Assert.assertEquals(newTaskState.getStartTime(), this.startTime);
Assert.assertEquals(newTaskState.getEndTime(), this.startTime + 1000);
Assert.assertEquals(newTaskState.getTaskDuration(), 1000);
Assert.assertEquals(newTaskState.getWorkingState(), WorkUnitState.WorkingState.COMMITTED);
Assert.assertEquals(newTaskState.getProp("foo"), "bar");
} catch (Throwable t) {
throw closer.rethrow(t);
} finally {
closer.close();
}
}
@Override
public void saveProperties(
@NonNull File file,
@NonNull Properties props,
@NonNull String comments) throws IOException {
Closer closer = Closer.create();
try {
OutputStream fos = closer.register(newFileOutputStream(file));
props.store(fos, comments);
} catch (Throwable e) {
throw closer.rethrow(e);
} finally {
closer.close();
}
}
@Override
public void close() throws IOException {
final Closer closer = Closer.create();
closer.register(reader);
closer.register(recordFile);
closer.close();
}
@GuardedBy("this")
private void loadClassNamesFromJarFile(File jarFile, Location location,
Multimap<String, Location> newClassNameLocations) throws IOException {
Closer closer = Closer.create();
try {
InputStream in = closer.register(new FileInputStream(jarFile));
JarInputStream jarIn = closer.register(new JarInputStream(in));
loadClassNamesFromManifestClassPath(jarIn, jarFile, newClassNameLocations);
loadClassNamesFromJarInputStream(jarIn, "", location, newClassNameLocations);
} catch (Throwable t) {
throw closer.rethrow(t);
} finally {
closer.close();
}
}
@Override
public void run(String[] args) throws Exception {
if (args.length < 1 || args.length > 2) {
printUsage();
}
Closer closer = Closer.create();
try {
CompositeConfiguration config = new CompositeConfiguration();
config.addConfiguration(new SystemConfiguration());
if (args.length == 2) {
config.addConfiguration(new PropertiesConfiguration(args[1]));
}
Properties properties = getProperties(config);
DatabaseJobHistoryStoreSchemaManager schemaManager =
closer.register(DatabaseJobHistoryStoreSchemaManager.builder(properties).build());
if (String.CASE_INSENSITIVE_ORDER.compare("migrate", args[0]) == 0) {
schemaManager.migrate();
} else if (String.CASE_INSENSITIVE_ORDER.compare("info", args[0]) == 0) {
schemaManager.info();
} else {
printUsage();
}
} catch (Throwable t) {
throw closer.rethrow(t);
} finally {
closer.close();
}
}
private static void writeConfigSyncedFile(File file, String agentId) throws IOException {
Closer closer = Closer.create();
try {
PrintWriter out = closer.register(new PrintWriter(file, UTF_8.name()));
out.println("# this file is created after the agent has pushed its local configuration"
+ " to the central collector");
out.println("#");
out.println("# when this file is present (and the agent.id below matches the running"
+ " agent's agent.id), the agent");
out.println("# will overwrite its local configuration with the agent configuration it"
+ " retrieves from the central");
out.println("# collector on JVM startup");
out.println("#");
out.println("# when this file is not present (or the agent.id below does not match the"
+ " running agent's agent.id),");
out.println("# the agent will push its local configuration to the central collector on"
+ " JVM startup (overwriting");
out.println("# any existing remote configuration), after which the agent will"
+ " (re-)create this file using the");
out.println("# running agent's agent.id");
out.println("");
out.println("agent.id=" + agentId);
} catch (Throwable t) {
throw closer.rethrow(t);
} finally {
closer.close();
}
}
public void runTest(Properties jobProps) throws Exception {
String jobName = jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
String jobId = JobLauncherUtils.newJobId(jobName);
jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, jobId);
JobContext jobContext = null;
Closer closer = Closer.create();
try {
JobLauncher jobLauncher = closer.register(JobLauncherFactory.newJobLauncher(this.launcherProps, jobProps));
jobLauncher.launchJob(null);
jobContext = ((AbstractJobLauncher) jobLauncher).getJobContext();
} finally {
closer.close();
}
Assert.assertTrue(jobContext.getJobMetricsOptional().isPresent());
String jobMetricContextTags = jobContext.getJobMetricsOptional().get().getMetricContext().getTags().toString();
Assert.assertTrue(jobMetricContextTags.contains(ClusterNameTags.CLUSTER_IDENTIFIER_TAG_NAME),
ClusterNameTags.CLUSTER_IDENTIFIER_TAG_NAME + " tag missing in job metric context tags.");
List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, sanitizeJobNameForDatasetStore(jobId) + ".jst");
DatasetState datasetState = datasetStateList.get(0);
Assert.assertEquals(datasetState.getState(), JobState.RunningState.COMMITTED);
Assert.assertEquals(datasetState.getCompletedTasks(), 4);
Assert.assertEquals(datasetState.getJobFailures(), 0);
for (TaskState taskState : datasetState.getTaskStates()) {
Assert.assertEquals(taskState.getWorkingState(), WorkUnitState.WorkingState.COMMITTED);
Assert.assertEquals(taskState.getPropAsLong(ConfigurationKeys.WRITER_RECORDS_WRITTEN),
TestExtractor.TOTAL_RECORDS);
// if the addition of the task timestamp is configured then validate that the file name has the expected format
if (Boolean.valueOf(taskState.getProp(ConfigurationKeys.WRITER_ADD_TASK_TIMESTAMP, "false"))) {
String pattern = ".*part.task_.*_(\\d+)_\\d+_(\\d+)_\\d+.avro";
String value = taskState.getProp(ConfigurationKeys.WRITER_FINAL_OUTPUT_FILE_PATHS);
Pattern r = Pattern.compile(pattern);
Matcher m = r.matcher(taskState.getProp(ConfigurationKeys.WRITER_FINAL_OUTPUT_FILE_PATHS));
long timeBuffer = 5 * 60 * 1000;
if (!m.matches()) {
Assert.fail("no matches for " + value);
}
Long currentTime = System.currentTimeMillis();
Assert.assertTrue(Long.valueOf(m.group(1)) > currentTime - timeBuffer);
Assert.assertTrue(Long.valueOf(m.group(1)) < currentTime);
// the task time should be after the job time
Assert.assertTrue(Long.valueOf(m.group(1)) < Long.valueOf(m.group(2)));
Assert.assertTrue(Long.valueOf(m.group(2)) > currentTime - timeBuffer);
Assert.assertTrue(Long.valueOf(m.group(2)) < currentTime);
}
}
}
private void addFolder(@NonNull File folder,
@NonNull String path) throws IOException, ZipAbortException {
logger.verbose("addFolder(%1$s, %2$s)", folder, path);
File[] files = folder.listFiles();
if (files != null) {
for (File file : files) {
if (file.isFile()) {
String entryPath = path + file.getName();
if (filter == null || filter.checkEntry(entryPath)) {
logger.verbose("addFolder(%1$s, %2$s): entry %3$s",
folder,
path,
entryPath);
duplicates.put(path + file.getName(), folder.getAbsolutePath());
if (duplicates.get(path + file.getName()).size() > 1) {
logger.info("[Duplicated]" +
path +
file.getName() +
":" +
folder.getAbsolutePath() +
":" +
duplicates.get(path + file.getName()));
continue;
}
// new entry
jarOutputStream.putNextEntry(new JarEntry(entryPath));
// put the file content
Closer localCloser = Closer.create();
try {
FileInputStream fis = localCloser.register(new FileInputStream(file));
int count;
while ((count = fis.read(buffer)) != -1) {
jarOutputStream.write(buffer, 0, count);
}
} finally {
localCloser.close();
}
// close the entry
jarOutputStream.closeEntry();
}
} else if (file.isDirectory()) {
addFolder(file, path + file.getName() + "/");
}
}
}
}
/**
* Create and write an index index based on the input Cassandra Index.db file. Read the Index.db and generate chunks
* (splits) based on the configured chunk size.
*
* @param fileSystem Hadoop file system.
* @param sstablePath SSTable Index.db.
* @throws IOException
*/
public static void writeIndex(final FileSystem fileSystem, final Path sstablePath) throws IOException {
final Configuration configuration = fileSystem.getConf();
final long splitSize = configuration.getLong(HadoopSSTableConstants.HADOOP_SSTABLE_SPLIT_MB,
HadoopSSTableConstants.DEFAULT_SPLIT_MB) * 1024 * 1024;
final Closer closer = Closer.create();
final Path outputPath = sstablePath.suffix(SSTABLE_INDEX_SUFFIX);
final Path inProgressOutputPath = sstablePath.suffix(SSTABLE_INDEX_IN_PROGRESS_SUFFIX);
boolean success = false;
try {
final FSDataOutputStream os = closer.register(fileSystem.create(inProgressOutputPath));
final TLongArrayList splitOffsets = new TLongArrayList();
long currentStart = 0;
long currentEnd = 0;
final IndexOffsetScanner index = closer.register(new IndexOffsetScanner(sstablePath, fileSystem));
while (index.hasNext()) {
// NOTE: This does not give an exact size of this split in bytes but a rough estimate.
// This should be good enough since it's only used for sorting splits by size in hadoop land.
while (currentEnd - currentStart < splitSize && index.hasNext()) {
currentEnd = index.next();
splitOffsets.add(currentEnd);
}
// Record the split
final long[] offsets = splitOffsets.toArray();
os.writeLong(offsets[0]); // Start
os.writeLong(offsets[offsets.length - 1]); // End
// Clear the offsets
splitOffsets.clear();
if (index.hasNext()) {
currentStart = index.next();
currentEnd = currentStart;
splitOffsets.add(currentStart);
}
}
success = true;
} finally {
closer.close();
if (!success) {
fileSystem.delete(inProgressOutputPath, false);
} else {
fileSystem.rename(inProgressOutputPath, outputPath);
}
}
}