下面列出了com.google.common.io.Closeables#close ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public String toCqlScript() {
try {
final InputStream cqlStream = CreateKeyspacesCommand.class.getResourceAsStream(_templateResourceName);
if (cqlStream == null) {
throw new IllegalStateException("couldn't find " + _templateResourceName + " in classpath");
}
String cql;
try {
cql = CharStreams.toString(new InputStreamReader(cqlStream, "UTF-8"));
} finally {
Closeables.close(cqlStream, true);
}
// replace bindings
for (Map.Entry<String, String> binding : _bindings.entrySet()) {
cql = cql.replace("${" + binding.getKey() + "}", binding.getValue());
}
return cql;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* In order to fail fast create the initial iterator on instantiation. This way exceptions such as
* TableNotStashedException will be thrown immediately and not deferred until the first iteration.
*/
public StashRowIterable() {
_initialIterator = createStashRowIterator();
// Force the first record to be evaluated by calling hasNext()
try {
_initialIterator.hasNext();
_openIterators.add(_initialIterator);
} catch (Exception e) {
try {
Closeables.close(_initialIterator, true);
} catch (IOException e2) {
// Already caught and logged
}
throw Throwables.propagate(e);
}
}
@Override
public byte[] toByteArray(int options) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
LittleEndianDataOutputStream payload = new LittleEndianDataOutputStream(baos);
try {
for (StringPoolSpan span : spans()) {
byte[] encodedSpan = span.toByteArray(options);
if (encodedSpan.length != StringPoolSpan.SPAN_LENGTH) {
throw new IllegalStateException("Encountered a span of invalid length.");
}
payload.write(encodedSpan);
}
payload.writeInt(RES_STRING_POOL_SPAN_END);
} finally {
Closeables.close(payload, true);
}
return baos.toByteArray();
}
private static String getStringValue(@NonNull IAbstractFile file, @NonNull String xPath)
throws StreamException, XPathExpressionException {
XPath xpath = AndroidXPathFactory.newXPath();
InputStream is = null;
try {
is = file.getContents();
return xpath.evaluate(xPath, new InputSource(is));
} finally {
try {
Closeables.close(is, true /* swallowIOException */);
} catch (IOException e) {
// cannot happen
}
}
}
private StreamSupplier streamSupplier(final BlobRequest request, final BlobResponse response) {
return new StreamSupplier() {
@Override
public void writeTo(OutputStream out) throws IOException {
InputStream in = response.getInputStream();
if (in == null) {
// The original stream has already been consumed. Re-open a new stream from the server.
in = get(request).getInputStream();
}
try {
ByteStreams.copy(in, out);
} finally {
Closeables.close(in, true);
}
}
};
}
/**
* Converts this chunk into an array of bytes representation. Normally you will not need to
* override this method unless your header changes based on the contents / size of the payload.
*/
@Override
public final byte[] toByteArray(int options) throws IOException {
ByteBuffer header = ByteBuffer.allocate(getHeaderSize()).order(ByteOrder.LITTLE_ENDIAN);
writeHeader(header, 0); // The chunk size isn't known yet. This will be filled in later.
ByteArrayOutputStream baos = new ByteArrayOutputStream();
LittleEndianDataOutputStream payload = new LittleEndianDataOutputStream(baos);
try {
writePayload(payload, header, options);
} finally {
Closeables.close(payload, true);
}
byte[] payloadBytes = baos.toByteArray();
int chunkSize = getHeaderSize() + payloadBytes.length;
header.putInt(CHUNK_SIZE_OFFSET, chunkSize);
// Combine results
ByteBuffer result = ByteBuffer.allocate(chunkSize).order(ByteOrder.LITTLE_ENDIAN);
result.put(header.array());
result.put(payloadBytes);
return result.array();
}
public static boolean addNewKey(KeyStore ks, File storeFile, char[] storePassword, DN dn) {
try {
Pair<PrivateKey, X509Certificate> generated = generateKeyAndCertificate("RSA", "SHA1withRSA", dn.validityYears, encodeDN(dn));
ks.setKeyEntry(dn.alias, generated.getFirst(), dn.password, new Certificate[]{generated.getSecond()});
FileOutputStream fos = new FileOutputStream(storeFile);
boolean threw = true;
try {
ks.store(fos, storePassword);
threw = false;
} finally {
Closeables.close(fos, threw);
}
} catch (KeyStoreException | IOException | NoSuchAlgorithmException | CertificateException | OperatorCreationException e) {
return false;
}
return true;
}
static List<PendingUpload> readPendingCommits(FileSystem fs,
Path pendingCommitsFile)
throws IOException {
List<PendingUpload> commits = Lists.newArrayList();
ObjectInputStream in = new ObjectInputStream(fs.open(pendingCommitsFile));
boolean threw = true;
try {
for (PendingUpload commit : new ObjectIterator<PendingUpload>(in)) {
commits.add(commit);
}
threw = false;
} finally {
Closeables.close(in, threw);
}
return commits;
}
public void testTrans() {// 0.304秒
Jedis jedis = new Jedis("120.25.241.144", 6379);
jedis.auth("b840fc02d52404542994");
long start = System.currentTimeMillis();
Transaction tx = jedis.multi();
for (int i = 0; i < 1000; i++) {
tx.set("n" + i, "n" + i);
System.out.println(i);
}
tx.exec();
long end = System.currentTimeMillis();
System.out.println("共花费:" + (end - start) / 1000.0 + "秒");
jedis.disconnect();
try {
Closeables.close(jedis, true);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* reset metrics report
*/
public static synchronized void stop() {
if(!isStart()){
return;
}
try {
if (METRICS_REPORTER_INSTANCE._jmxReporter != null) {
Closeables.close(METRICS_REPORTER_INSTANCE._jmxReporter, true);
}
if (METRICS_REPORTER_INSTANCE._graphiteReporter != null) {
Closeables.close(METRICS_REPORTER_INSTANCE._graphiteReporter, true);
}
} catch (Exception e) {
LOGGER.error("Error while closing Jmx and Graphite reporters.", e);
}
DID_INIT = false;
METRICS_REPORTER_INSTANCE = null;
}
@Override
public void writeSnapshot(TransactionSnapshot snapshot) throws IOException {
// save the snapshot to a temporary file
File snapshotTmpFile = new File(snapshotDir, TMP_SNAPSHOT_FILE_PREFIX + snapshot.getTimestamp());
LOG.debug("Writing snapshot to temporary file {}", snapshotTmpFile);
OutputStream out = Files.newOutputStreamSupplier(snapshotTmpFile).getOutput();
boolean threw = true;
try {
codecProvider.encode(out, snapshot);
threw = false;
} finally {
Closeables.close(out, threw);
}
// move the temporary file into place with the correct filename
File finalFile = new File(snapshotDir, SNAPSHOT_FILE_PREFIX + snapshot.getTimestamp());
if (!snapshotTmpFile.renameTo(finalFile)) {
throw new IOException("Failed renaming temporary snapshot file " + snapshotTmpFile.getName() + " to " +
finalFile.getName());
}
LOG.debug("Completed snapshot to file {}", finalFile);
}
@Override
public void close() throws IOException {
closed.set(true);
consumerPollThread.shutdown();
offsetFetcherThread.shutdown();
boolean isShutdown = false;
// Wait for threads to shutdown. Trying this as a loop to handle a tiny race where poll thread
// might block to enqueue right after availableRecordsQueue.poll() below.
while (!isShutdown) {
if (consumer != null) {
consumer.wakeup();
}
if (offsetConsumer != null) {
offsetConsumer.wakeup();
}
availableRecordsQueue.poll(); // drain unread batch, this unblocks consumer thread.
try {
isShutdown =
consumerPollThread.awaitTermination(10, TimeUnit.SECONDS)
&& offsetFetcherThread.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e); // not expected
}
if (!isShutdown) {
LOG.warn("An internal thread is taking a long time to shutdown. will retry.");
}
}
Closeables.close(keyDeserializerInstance, true);
Closeables.close(valueDeserializerInstance, true);
Closeables.close(offsetConsumer, true);
Closeables.close(consumer, true);
}
protected void writeStyleSheet() throws IOException {
if (USE_HOLO_STYLE) {
mWriter.write(
"<link rel=\"stylesheet\" type=\"text/css\" " + //$NON-NLS-1$
"href=\"http://fonts.googleapis.com/css?family=Roboto\" />\n" );//$NON-NLS-1$
}
URL cssUrl = HtmlReporter.class.getResource(CSS);
if (mSimpleFormat) {
// Inline the CSS
mWriter.write("<style>\n"); //$NON-NLS-1$
InputStream input = cssUrl.openStream();
byte[] bytes = ByteStreams.toByteArray(input);
try {
Closeables.close(input, true /* swallowIOException */);
} catch (IOException e) {
// cannot happen
}
String css = new String(bytes, Charsets.UTF_8);
mWriter.write(css);
mWriter.write("</style>\n"); //$NON-NLS-1$
} else {
String ref = addLocalResources(cssUrl);
if (ref != null) {
mWriter.write(
"<link rel=\"stylesheet\" type=\"text/css\" href=\"" //$NON-NLS-1$
+ ref + "\" />\n"); //$NON-NLS-1$
}
}
}
@Override
public void close()
throws IOException {
for (TableBlock block : _blocks) {
Closeables.close(block, true);
}
}
private void closeCurrentSplit() {
if (_currentIterator != null) {
try {
Closeables.close(_currentIterator, true);
} catch (IOException e) {
// Already logged and caught
}
_currentIterator = null;
}
}
@Override
public BaseRecordReader getBaseRecordReader(Configuration config, Path path, int splitSize)
throws IOException {
String split = getSplitName(_rootPath, path);
if (isEmptySplit(split)) {
return getEmptySplitRecordReader();
}
final String reason = "record reader for split " + path;
final StashSplit stashSplit = fromSplitFile(split);
// Increment the s3 client reference count so it stays open at least until the returned reader is closed.
addS3ClientReference(reason);
return new BaseRecordReader(splitSize) {
private StashRowIterator _iterator;
@Override
protected Iterator<Map<String, Object>> getRowIterator() throws IOException {
_iterator = _stashReader.getSplit(stashSplit);
return _iterator;
}
@Override
protected void closeOnce() throws IOException {
try {
Closeables.close(_iterator, false);
} finally {
maybeCloseS3Client(reason);
}
}
};
}
@Override
protected void writePayload(DataOutput output, ByteBuffer header, int options)
throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ByteBuffer offsets = ByteBuffer.allocate(getOffsetSize()).order(ByteOrder.LITTLE_ENDIAN);
LittleEndianDataOutputStream payload = new LittleEndianDataOutputStream(baos);
try {
writeEntries(payload, offsets, options);
} finally {
Closeables.close(payload, true);
}
output.write(offsets.array());
output.write(baos.toByteArray());
}
@Override
protected void finalize()
throws Throwable {
super.finalize();
Closeables.close(this, true);
}
public static void main(String... args) throws Exception {
final String DROPWIZARD_PROPERTY_PREFIX = "dw";
// Load the config.yaml file specified as the first argument.
ConfigurationFactory<EmoConfiguration> configFactory = new ConfigurationFactory(
EmoConfiguration.class, Validation.buildDefaultValidatorFactory().getValidator(), Jackson.newObjectMapper(), DROPWIZARD_PROPERTY_PREFIX);
EmoConfiguration configuration = configFactory.build(new File(args[0]));
int numWriterThreads = Integer.parseInt(args[1]);
int numReaderThreads = Integer.parseInt(args[2]);
String apiKey = configuration.getAuthorizationConfiguration().getAdminApiKey();
MetricRegistry metricRegistry = new MetricRegistry();
new LoggingFactory().configure(metricRegistry, "stress");
CuratorFramework curator = configuration.getZooKeeperConfiguration().newCurator();
curator.start();
DataStoreClientFactory dataStoreFactory = DataStoreClientFactory.forClusterAndHttpConfiguration(
configuration.getCluster(), configuration.getHttpClientConfiguration(), metricRegistry);
AuthDataStore authDataStore = ServicePoolBuilder.create(AuthDataStore.class)
.withServiceFactory(dataStoreFactory)
.withHostDiscovery(new ZooKeeperHostDiscovery(curator, dataStoreFactory.getServiceName(), metricRegistry))
.withMetricRegistry(metricRegistry)
.withCachingPolicy(ServiceCachingPolicyBuilder.getMultiThreadedClientPolicy())
.buildProxy(new ExponentialBackoffRetry(5, 50, 1000, TimeUnit.MILLISECONDS));
DataStore dataStore = DataStoreAuthenticator.proxied(authDataStore).usingCredentials(apiKey);
DatabusClientFactory databusFactory = DatabusClientFactory.forClusterAndHttpConfiguration(
configuration.getCluster(), configuration.getHttpClientConfiguration(), metricRegistry);
AuthDatabus authDatabus = ServicePoolBuilder.create(AuthDatabus.class)
.withServiceFactory(databusFactory)
.withHostDiscovery(new ZooKeeperHostDiscovery(curator, databusFactory.getServiceName(), metricRegistry))
.withMetricRegistry(metricRegistry)
.withCachingPolicy(ServiceCachingPolicyBuilder.getMultiThreadedClientPolicy())
.buildProxy(new ExponentialBackoffRetry(5, 50, 1000, TimeUnit.MILLISECONDS));
Databus databus = DatabusAuthenticator.proxied(authDatabus).usingCredentials(apiKey);
final SorStressTest stressTest = new SorStressTest(dataStore, databus);
if (!dataStore.getTableExists(TABLE)) {
TableOptions options = new TableOptionsBuilder().setPlacement("ugc_global:ugc").build();
dataStore.createTable(TABLE, options, ImmutableMap.of("table", TABLE), new AuditBuilder().setLocalHost().build());
}
databus.subscribe(SUBSCRIPTION, Conditions.alwaysTrue(), Duration.ofDays(7), Duration.ofDays(1));
ThreadFactory writerFactory = new ThreadFactoryBuilder().setNameFormat("SoR Writer-%d").build();
for (int i = 0; i < numWriterThreads; i++) {
writerFactory.newThread(new Runnable() {
@Override
public void run() {
stressTest.writeDeltas();
}
}).start();
}
ThreadFactory readerFactory = new ThreadFactoryBuilder().setNameFormat("Databus Reader-%d").build();
for (int i = 0; i < numReaderThreads; i++) {
readerFactory.newThread(new Runnable() {
@Override
public void run() {
stressTest.readDatabus();
}
}).start();
}
ThreadFactory reportFactory = new ThreadFactoryBuilder().setNameFormat("Report-%d").build();
Executors.newScheduledThreadPool(1, reportFactory).scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
stressTest.report();
}
}, 1, 1, TimeUnit.SECONDS);
ServicePoolProxies.close(dataStore);
Closeables.close(curator, true);
}
/**
* <p>
* This method has been added because Guava has removed the
* {@code closeQuietly()} method from {@code Closeables} in v16.0. It's
* tempting simply to replace calls to {@code closeQuietly(closeable)}
* with calls to {@code close(closeable, true)} to close
* {@code Closeable}s while swallowing {@code IOException}s, but
* {@code close()} is declared as {@code throws IOException} whereas
* {@code closeQuietly()} is not, so it's not a drop-in replacement.
* </p>
* <p>
* On the whole, Guava is very backwards compatible. By fixing this nit,
* Curator can continue to support newer versions of Guava without having
* to bump its own dependency version.
* </p>
* <p>
* See <a href="https://issues.apache.org/jira/browse/CURATOR-85">https://issues.apache.org/jira/browse/CURATOR-85</a>
* </p>
*/
public static void closeQuietly(Closeable closeable)
{
try
{
// Here we've instructed Guava to swallow the IOException
Closeables.close(closeable, true);
}
catch ( IOException e )
{
// We instructed Guava to swallow the IOException, so this should
// never happen. Since it did, log it.
log.error("IOException should not have been thrown.", e);
}
}