下面列出了com.amazonaws.services.s3.model.SSEAlgorithm#com.amazonaws.services.s3.model.GetObjectRequest 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Fetch error info.
*
* @param datasource the datasource
* @param errorList the error list
*/
private void fetchErrorInfo(String datasource, List<Map<String,String>> errorList){
if(errorInfo==null){
ObjectMapper objectMapper = new ObjectMapper();
List<Map<String, String>> inventoryErrors = new ArrayList<>();
AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(new CredentialProvider().getCredentials(s3Account,s3Role))).withRegion(s3Region).build();
try {
S3Object inventoryErrorData = s3Client.getObject(new GetObjectRequest(bucketName,dataPath+"/"+datasource+"-loaderror.data"));
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inventoryErrorData.getObjectContent()))) {
inventoryErrors = objectMapper.readValue(reader.lines().collect(Collectors.joining("\n")),new TypeReference<List<Map<String, String>>>() {});
}
} catch (IOException e) {
LOGGER.error("Exception in collecting inventory error data",e);
Map<String,String> errorMap = new HashMap<>();
errorMap.put(ERROR, "Exception in collecting inventory error data");
errorMap.put(ERROR_TYPE, WARN);
errorMap.put(EXCEPTION, e.getMessage());
errorList.add(errorMap);
}
errorInfo = inventoryErrors.parallelStream().collect(Collectors.groupingBy(obj -> obj.get("type")));
}
}
@Override
public void downloadFile(final Path localPath, final RemoteObjectReference objectReference) throws Exception {
final GetObjectRequest getObjectRequest = new GetObjectRequest(request.storageLocation.bucket, objectReference.canonicalPath);
Files.createDirectories(localPath.getParent());
final Optional<AmazonClientException> exception = ofNullable(transferManager.download(getObjectRequest,
localPath.toFile(),
new DownloadProgressListener(objectReference)).waitForException());
if (exception.isPresent()) {
if (exception.get() instanceof AmazonS3Exception && ((AmazonS3Exception) exception.get()).getStatusCode() == 404) {
logger.error("Remote object reference {} does not exist.", objectReference);
}
throw exception.get();
}
}
/**
* Fetch error info.
*
* @param datasource the datasource
* @param errorList the error list
*/
private void fetchErrorInfo(List<Map<String,String>> errorList){
if(errorInfo==null){
ObjectMapper objectMapper = new ObjectMapper();
List<Map<String, String>> inventoryErrors = new ArrayList<>();
AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(new CredentialProvider().getCredentials(s3Account,s3Role))).withRegion(s3Region).build();
try {
S3Object inventoryErrorData = s3Client.getObject(new GetObjectRequest(bucketName,dataPath+"/"+dataSource+"-loaderror.data"));
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inventoryErrorData.getObjectContent()))) {
inventoryErrors = objectMapper.readValue(reader.lines().collect(Collectors.joining("\n")),new TypeReference<List<Map<String, String>>>() {});
}
} catch (IOException e) {
LOGGER.error("Exception in collecting inventory error data",e);
Map<String,String> errorMap = new HashMap<>();
errorMap.put(ERROR, "Exception in collecting inventory error data");
errorMap.put(ERROR_TYPE, WARN);
errorMap.put(EXCEPTION, e.getMessage());
errorList.add(errorMap);
}
errorInfo = inventoryErrors.parallelStream().collect(Collectors.groupingBy(obj -> obj.get("type")));
}
}
/**
* Get the original inventory report from S3, unzip it, and transfer it into a String format.
* @return inventReport String
* @throws IOException when getting object from S3 fails
* or the checksum of the inventory report and the checksum specified in the manifest file not match
*/
public String getInventoryReportToString() throws IOException {
String inventReportKey = locator.getKey();
String bucketName = inventoryManifest.getSourceBucket();
try (S3Object s3InventoryReport = s3Client.getObject(
new GetObjectRequest(bucketName, inventReportKey))) {
InputStream objectData = s3InventoryReport.getObjectContent();
byte[] zippedData = IOUtils.toByteArray(objectData);
String actualChecksum = DigestUtils.md5Hex(zippedData);
String expectedChecksum = locator.getMD5checksum();
if (!actualChecksum.equals(expectedChecksum)) {
throw new ChecksumMismatchException (expectedChecksum, actualChecksum);
}
return IOUtils.toString(new GZIPInputStream(new ByteArrayInputStream(zippedData)));
}
}
/**
* Check if the MD5s of manifest.json and manifest.checksum equal
* if so, pull out the manifest file and map it into a POJO
* @return inventoryManifestStorage InventoryManifest, which stores all the elements of the manifest.json file
*/
public InventoryManifest getInventoryManifest() throws Exception {
// Get manifest.json and transfer it to String
GetObjectRequest requestJson = new GetObjectRequest(bucketName, bucketKeyJson);
S3Object jsonObject = s3Client.getObject(requestJson);
String jsonFile = inputStreamToString(jsonObject.getObjectContent());
jsonObject.close();
// Get manifest.checksum and transfer it to String with no whitespace
GetObjectRequest requestChecksum = new GetObjectRequest(bucketName, bucketKeyChecksum);
S3Object checksumObject = s3Client.getObject(requestChecksum);
String expectedChecksum = inputStreamToString(checksumObject.getObjectContent())
.replaceAll("\\s","");
checksumObject.close();
// Compare manifest.json and manifest.checksum's MD5 value
String actualChecksum = DigestUtils.md5Hex(jsonFile);
if (!actualChecksum.equals(expectedChecksum)) {
throw new ChecksumMismatchException (expectedChecksum, actualChecksum);
}
return mapper.readValue(jsonFile, InventoryManifest.class);
}
@Test
public void getInventReportSuccess() throws Exception {
testLocator.setMD5checksum(testMD5);
testManifest.setFileSchema("storageClass, size");
reportRetriever = new InventoryReportRetriever(mockS3Client, testLocator, testManifest);
String expectedInventoryReportString = "testString";
byte[] expectedInventoryReportBytes = inventReportBytes(expectedInventoryReportString);
when(mockS3Object.getObjectContent()).thenReturn(new S3ObjectInputStream(
new ByteArrayInputStream(expectedInventoryReportBytes), null));
when(mockS3Client.getObject(getObjectRequestCaptor.capture())).thenReturn(mockS3Object);
String result = reportRetriever.getInventoryReportToString();
assertThat(result, is(expectedInventoryReportString));
GetObjectRequest request = getObjectRequestCaptor.getValue();
assertThat(request.getBucketName(), is("testBucket"));
assertThat(request.getKey(), is("testInventReportKey"));
}
public void load(Configuration configuration)
{
String destination = "";
setConfiguration(configuration);
try {
for(Mapping mapping : configuration.getMappings())
{
destination = mapping.getSqlFile().contains("/") ? mapping.getSqlFile().substring(mapping.getSqlFile().lastIndexOf("/") + 1, mapping.getSqlFile().length()).replace(".sql", "") : mapping.getSqlFile().replace(".sql", "");
loadSourceFiles(mapping);
df = loadDf(client.getObject(new GetObjectRequest(configuration.getSqlBucket(), mapping.getSqlFile())).getObjectContent());
if(mapping.getOverflowColumns().equals(new ArrayList<String>())) { write(destination); }
else { write(destination, mapping.getOverflowColumns()); }
sns.publishSuccess("SparkBatchLoaderTableCompletion", destination);
}
}
catch (IOException ex) { sns.publishFailure("SparkBatchLoaderTableFailure", destination, ex.toString()); }
}
private void setupS3(String fileName, String version, String propertyContent)
throws UnsupportedEncodingException {
final S3ObjectId s3ObjectId = new S3ObjectId("bucket1", fileName);
final GetObjectRequest request = new GetObjectRequest(s3ObjectId);
final S3Object s3Object = new S3Object();
s3Object.setObjectContent(new StringInputStream(propertyContent));
if (version != null) {
final ObjectMetadata metadata = new ObjectMetadata();
metadata.setHeader("x-amz-version-id", version);
s3Object.setObjectMetadata(metadata);
}
when(s3Client.getObject(argThat(new GetObjectRequestMatcher(request))))
.thenReturn(s3Object);
}
/**
* Test the getFile method for valid s3 path.
*
* @throws GenieException If there is any problem
*/
@Test
public void testGetFileMethodValidS3Path() throws GenieException {
final ObjectMetadata objectMetadata = Mockito.mock(ObjectMetadata.class);
Mockito.when(this.s3Client.getObject(Mockito.any(GetObjectRequest.class), Mockito.any(File.class)))
.thenReturn(objectMetadata);
final ArgumentCaptor<GetObjectRequest> argument = ArgumentCaptor.forClass(GetObjectRequest.class);
s3FileTransfer.getFile(S3_PATH, LOCAL_PATH);
Mockito.verify(this.s3Client).getObject(argument.capture(), Mockito.any());
Assert.assertEquals(S3_BUCKET, argument.getValue().getBucketName());
Assert.assertEquals(S3_KEY, argument.getValue().getKey());
Mockito
.verify(this.downloadTimer, Mockito.times(1))
.record(Mockito.anyLong(), Mockito.eq(TimeUnit.NANOSECONDS));
Mockito
.verify(this.registry, Mockito.times(1))
.timer(Mockito.eq(S3FileTransferImpl.DOWNLOAD_TIMER_NAME), this.tagsCaptor.capture());
Assert.assertEquals(SUCCESS_TAGS, this.tagsCaptor.getValue());
}
@Override
public InputStream downloadFile(String filePath) throws BusinessException {
if (!Optional.ofNullable(filePath).isPresent()) {
throw new BusinessException(Validations.INVALID_PATH.getCode());
}
client.getClient(credentials.getCredentials());
try {
if (filePath.contains("/")) {
filePath = filePath.split("/")[filePath.split("/").length - 1];
}
S3Object object = s3Client.getObject(
new GetObjectRequest(cdnConfig.getName(), filePath));
return object.getObjectContent();
} catch (AmazonServiceException e) {
throw new BusinessException(Validations.INVALID_S3_BUCKET_CREDENTIALS.getCode());
}
}
@Override
public Optional<InputStream> open(String location) {
log.debug("Open S3 file: " + location);
Optional<S3Details> details = S3Details.from(location);
if (details.isPresent()) {
try {
return Optional
.of(amazonS3
.getObject(new GetObjectRequest(details.get().getBucket(), details.get().getKey()))
.getObjectContent());
} catch (Exception e) {
log.error("Cannot open {}", location, e);
}
}
return Optional.empty();
}
public S3Object download(String bucketName, String key) throws AmazonServiceException {
/*
* Download an object - When you download an object, you get all of the
* object's metadata and a stream from which to read the contents. It's
* important to read the contents of the stream as quickly as possibly
* since the data is streamed directly from Amazon S3 and your network
* connection will remain open until you read all the data or close the
* input stream.
*
* GetObjectRequest also supports several other options, including
* conditional downloading of objects based on modification times, ETags,
* and selectively downloading a range of an object.
*/
System.out.println("Downloading an object");
S3Object object = s3Client.getObject(new GetObjectRequest(bucketName, key));
System.out.println("Content-Type: " + object.getObjectMetadata().getContentType());
return object;
}
/**
* S3 block read would be achieved through the AmazonS3 client. Following are the steps to achieve:
* (1) Create the objectRequest from bucketName and filePath.
* (2) Set the range to the above created objectRequest.
* (3) Get the object portion through AmazonS3 client API.
* (4) Get the object content from the above object portion.
* @return the block entity
* @throws IOException
*/
@Override
protected Entity readEntity() throws IOException
{
entity.clear();
GetObjectRequest rangeObjectRequest = new GetObjectRequest(
bucketName, filePath);
rangeObjectRequest.setRange(offset, blockMetadata.getLength() - 1);
S3Object objectPortion = s3Client.getObject(rangeObjectRequest);
S3ObjectInputStream wrappedStream = objectPortion.getObjectContent();
byte[] record = ByteStreams.toByteArray(wrappedStream);
entity.setUsedBytes(record.length);
entity.setRecord(record);
wrappedStream.close();
return entity;
}
/**
* This function retrieves large object from S3
* @param bucketName
* @param keyName
* @param objectClass
* @return
*/
public <T> Object retrieveLargeItem(String bucketName, String keyName, Class<T> objectClass) {
try {
S3Object s3Object = s3.getObject(new GetObjectRequest(bucketName.toLowerCase(), keyName));
String objectContent = IOUtils.toString(s3Object.getObjectContent(), StandardCharsets.UTF_8);
if(objectClass == ReCiterFeature.class) {
ReCiterFeature reCiterFeature = OBJECT_MAPPER.readValue(objectContent, ReCiterFeature.class);
return reCiterFeature;
}
} catch (IOException | AmazonServiceException e) {
log.error(e.getMessage());
}
return null;
}
static S3Object getObject(
AmazonS3 s3Client,
String bucket,
String objectKey,
boolean useSSE,
CredentialValue customerKey,
CredentialValue customerKeyMd5
) throws StageException {
GetObjectRequest getObjectRequest = new GetObjectRequest(bucket, objectKey);
if (useSSE) {
SSECustomerKey sseCustomerKey = new SSECustomerKey(customerKey.get());
sseCustomerKey.setMd5(customerKeyMd5.get());
getObjectRequest.setSSECustomerKey(sseCustomerKey);
}
return s3Client.getObject(getObjectRequest);
}
@Test
public void testExecuteSingleFile() throws FileNotFoundException,
IOException {
UploadFileSetToS3Task task = new UploadFileSetToS3Task();
task.setProject(new Project());
FileSet fileset = new FileSet();
fileset.setDir(testFile1.getParentFile());
fileset.setFile(testFile1);
task.addFileset(fileset);
task.setBucketName(BUCKET_NAME);
task.setKeyPrefix(KEY_PREFIX);
task.execute();
resFile1 = File.createTempFile(RES_FILE_1, TESTFILE_SUFFIX);
client.getObject(new GetObjectRequest(BUCKET_NAME, KEY_PREFIX
+ fileName1), resFile1);
assertTrue(FileUtils.contentEquals(testFile1, resFile1));
}
/**
* Test the getFile method for valid s3 path.
*
* @throws GenieException If there is any problem
*/
@Test(expected = GenieServerException.class)
public void testGetFileMethodFailureToFetch() throws GenieException {
Mockito.when(this.s3Client.getObject(Mockito.any(GetObjectRequest.class), Mockito.any(File.class)))
.thenThrow(new AmazonS3Exception("something"));
final ArgumentCaptor<GetObjectRequest> argument = ArgumentCaptor.forClass(GetObjectRequest.class);
try {
this.s3FileTransfer.getFile(S3_PATH, LOCAL_PATH);
} finally {
Mockito.verify(this.s3Client).getObject(argument.capture(), Mockito.any());
Assert.assertEquals(S3_BUCKET, argument.getValue().getBucketName());
Assert.assertEquals(S3_KEY, argument.getValue().getKey());
Mockito
.verify(this.downloadTimer, Mockito.times(1))
.record(Mockito.anyLong(), Mockito.eq(TimeUnit.NANOSECONDS));
Mockito
.verify(this.registry, Mockito.times(1))
.timer(Mockito.eq(S3FileTransferImpl.DOWNLOAD_TIMER_NAME), this.tagsCaptor.capture());
Assert.assertEquals(
MetricsUtils.newFailureTagsSetForException(new GenieServerException("blah")),
this.tagsCaptor.getValue()
);
}
}
private synchronized void connectToS3() throws IOException {
if (! connected) {
try {
String s3key;
try {
s3key = java.net.URLDecoder.decode(s3uri.getKey(), "UTF-8");
} catch (final UnsupportedEncodingException e) {
LOG.warn("failed to decode key, using raw key instead", e);
// TODO: Better error handling with badly encoded URLs?
s3key = s3uri.getKey();
}
s3object = s3Client.getObject(new GetObjectRequest(s3uri.getBucket(), s3key));
connected = true;
} catch (final AmazonServiceException ase) {
throw new IOException("Amazon S3 service failure for error type " + ase.getErrorType(), ase);
} catch (final AmazonClientException ace) {
throw new IOException("Amazon S3 client failure", ace);
}
}
}
/**
* Connect to S3 directory to look for new or updated table definitions and then
* update the map.
*/
private void updateTablesFromS3()
{
long now = System.currentTimeMillis();
AmazonS3Client s3client = clientManager.getS3Client();
for (S3ObjectSummary summary : getObjectSummaries()) {
if (!descriptors.containsKey(summary.getKey()) || summary.getLastModified().getTime() >= lastCheck) {
// New or updated file, so we must read from AWS
if (summary.getKey().endsWith("/")) {
continue;
}
log.info("Getting : %s - %s", summary.getBucketName(), summary.getKey());
S3Object object = s3client.getObject(new GetObjectRequest(summary.getBucketName(), summary.getKey()));
try (BufferedReader reader = new BufferedReader(new InputStreamReader(object.getObjectContent(), UTF_8))) {
KinesisStreamDescription table = streamDescriptionCodec.fromJson(CharStreams.toString(reader));
descriptors.put(summary.getKey(), table);
log.info("Put table description into the map from %s", summary.getKey());
}
catch (IOException iox) {
log.error("Problem reading input stream from object.", iox);
throwIfUnchecked(iox);
throw new RuntimeException(iox);
}
}
}
log.info("Completed updating table definitions from S3.");
lastCheck = now;
}
@Override
public S3Object getObject(GetObjectRequest getObjectRequest)
{
if (getObjectHttpCode != HTTP_OK) {
AmazonS3Exception exception = new AmazonS3Exception("Failing getObject call with " + getObjectHttpCode);
exception.setStatusCode(getObjectHttpCode);
throw exception;
}
return null;
}
/**
* Load the file or s3 object.
* @param parts String[]. An array of tokens.
* @return String. The message returned from the load command.
* @throws Exception on I/O errirs.
*/
String load(String[] parts) throws Exception {
String otype = null;
String symbolName = null;
String name;
String type = parts[1]; // file or S3
//for (int i=0;i<parts.length;i++) {
// System.out.println("Part[" + i + "] = " + parts[i]);
//}
if (type.equalsIgnoreCase("S3")) {
otype = parts[2]; // bloom, cache, cuckoo.
name = parts[4];
symbolName = parts[3]; // name of the object
if (!symbolName.startsWith("$"))
symbolName = "$" + symbolName;
} else
name = parts[2]; // file name
if (type.equals("file")) {
return Configuration.getInstance().readData(parts[2]);
}
S3Object object = Configuration.s3.getObject(new GetObjectRequest(Configuration.s3_bucket,name));
long size = Configuration.s3.getObjectMetadata(Configuration.s3_bucket, name).getContentLength();
return Configuration.getInstance().readData(otype,symbolName,object, size);
}
@Override
public String downloadFileToString(final Path localPath, final RemoteObjectReference objectReference) throws Exception {
final GetObjectRequest getObjectRequest = new GetObjectRequest(request.storageLocation.bucket, objectReference.canonicalPath);
try (final InputStream is = transferManager.getAmazonS3Client().getObject(getObjectRequest).getObjectContent(); final InputStreamReader isr = new InputStreamReader(is)) {
return CharStreams.toString(isr);
}
}
private Path toPath(String key) throws IOException {
// Expecting key format: "modulescanner-report-2018_08_18_00_58_06.csv"
var year = Integer.valueOf(key.substring(21, 25));
var zipFileSystem = zips.get(year);
if (zipFileSystem != null) {
var zip = zipFileSystem.getPath(key);
if (Files.exists(zip)) {
LOG.log(DEBUG, "Extracting {0} from {1}...", key, zipFileSystem);
return zip;
}
}
var csv = cache.resolve(key);
if (Files.notExists(csv)) {
Files.createDirectories(csv.getParent());
LOG.log(INFO, "Downloading {0} from remote {1}...", key, bucketName);
Files.createDirectories(cache);
try (var object = s3.getObject(new GetObjectRequest(bucketName, key))) {
var length = object.getObjectMetadata().getContentLength();
LOG.log(INFO, "Loading {0} bytes to {1}...", length, csv);
try (var stream = object.getObjectContent().getDelegateStream()) {
Files.copy(stream, csv);
}
}
LOG.log(DEBUG, "Loaded {0} bytes to {1}", Files.size(csv), csv);
}
return csv;
}
private synchronized void reopen(long pos) throws IOException {
if (wrappedStream != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Aborting old stream to open at pos " + pos);
}
wrappedStream.abort();
}
if (pos < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK
+" " + pos);
}
if (contentLength > 0 && pos > contentLength-1) {
throw new EOFException(
FSExceptionMessages.CANNOT_SEEK_PAST_EOF
+ " " + pos);
}
LOG.debug("Actually opening file " + key + " at pos " + pos);
GetObjectRequest request = new GetObjectRequest(bucket, key);
request.setRange(pos, contentLength-1);
wrappedStream = client.getObject(request).getObjectContent();
if (wrappedStream == null) {
throw new IOException("Null IO stream");
}
this.pos = pos;
}
public void readFiles() throws IOException, SQLException
{
String query = "";
S3Object s3object = client.getObject(new GetObjectRequest(bucket, System.getenv("prefix") +"/" + table + ".sql"));
statement = connection.createStatement();
query = IOUtils.toString(s3object.getObjectContent(), "UTF-8");
query = query.replace("${redshift_arn}", System.getenv("redshift_arn"));
query = query.replace("${bucket}", System.getenv("bucket"));
statement.execute(query);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
int parallelism = context.getNumberOfParallelSubtasks();
if (parallelism != 1) {
throw new IllegalStateException("SeedUrlSource only supports a parallelism of 1");
}
if (_terminator == null) {
throw new IllegalStateException("Crawl terminator must be set for the seed URL source");
}
LOGGER.info("Opening seed URL source");
// Open the terminator, so that it knows when we really started running.
_terminator.open();
_urlIndex = 0;
if (useS3File()) {
AmazonS3 s3Client = S3Utils.makeS3Client();
S3Object object = s3Client
.getObject(new GetObjectRequest(_seedUrlsS3Bucket, _seedUrlsS3Path));
_s3FileStream = object.getObjectContent();
}
}
private synchronized void reopen(long pos) throws IOException {
if (wrappedStream != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Aborting old stream to open at pos " + pos);
}
wrappedStream.abort();
}
if (pos < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK
+" " + pos);
}
if (contentLength > 0 && pos > contentLength-1) {
throw new EOFException(
FSExceptionMessages.CANNOT_SEEK_PAST_EOF
+ " " + pos);
}
LOG.debug("Actually opening file " + key + " at pos " + pos);
GetObjectRequest request = new GetObjectRequest(bucket, key);
request.setRange(pos, contentLength-1);
wrappedStream = client.getObject(request).getObjectContent();
if (wrappedStream == null) {
throw new IOException("Null IO stream");
}
this.pos = pos;
}
public RestartingS3InputStream(AmazonS3 s3, String bucket, String key, @Nullable Range<Long> range) {
_s3 = s3;
_bucket = bucket;
_key = key;
S3Object s3Object;
// Get the object synchronously so any immediate S3 errors, such as file not found, are thrown inline.
if (range == null) {
s3Object = _s3.getObject(new GetObjectRequest(_bucket, _key));
_pos = 0;
_length = s3Object.getObjectMetadata().getContentLength();
} else {
long start, end;
if (range.hasLowerBound()) {
start = range.lowerEndpoint() + (range.lowerBoundType() == BoundType.CLOSED ? 0 : 1);
} else {
start = 0;
}
if (range.hasUpperBound()) {
end = range.upperEndpoint() - (range.upperBoundType() == BoundType.CLOSED ? 0 : 1);
} else {
end = Long.MAX_VALUE;
}
s3Object = _s3.getObject(new GetObjectRequest(_bucket, _key).withRange(start, end));
_pos = start;
// The S3 metadata's content length is the length of the data actually being returned by S3.
// Since we effectively skipped the first "start" bytes we need to add them back to the total length
// of data being read to make future calculations using _pos and _length consistent.
_length = start + s3Object.getObjectMetadata().getContentLength();
}
_in = s3Object.getObjectContent();
}
/**
* Re-opens the input stream, starting at the first unread byte.
*/
private void reopenS3InputStream()
throws IOException {
// First attempt to close the existing input stream
try {
closeS3InputStream();
} catch (IOException ignore) {
// Ignore this exception; we're re-opening because there was in issue with the existing stream
// in the first place.
}
InputStream remainingIn = null;
int attempt = 0;
while (remainingIn == null) {
try {
S3Object s3Object = _s3.getObject(
new GetObjectRequest(_bucket, _key)
.withRange(_pos, _length - 1)); // Range is inclusive, hence length-1
remainingIn = s3Object.getObjectContent();
} catch (AmazonClientException e) {
// Allow up to 3 retries
attempt += 1;
if (!e.isRetryable() || attempt == 4) {
throw e;
}
// Back-off on each retry
try {
Thread.sleep(200 * attempt);
} catch (InterruptedException interrupt) {
throw Throwables.propagate(interrupt);
}
}
}
_in = remainingIn;
}
private Matcher<GetObjectRequest> getsObject(final String bucket, final String key) {
return new BaseMatcher<GetObjectRequest>() {
@Override
public boolean matches(Object o) {
GetObjectRequest request = (GetObjectRequest) o;
return request != null && request.getBucketName().equals(bucket) && request.getKey().equals(key);
}
@Override
public void describeTo(Description description) {
description.appendText("gets object s3://").appendText(bucket).appendText("/").appendText(key);
}
};
}