下面列出了com.amazonaws.services.s3.model.GetObjectRequest#setRange ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* S3 block read would be achieved through the AmazonS3 client. Following
* are the steps to achieve: (1) Create the objectRequest from bucketName
* and filePath. (2) Set the range to the above created objectRequest. (3)
* Get the object portion through AmazonS3 client API. (4) Get the object
* content from the above object portion.
*
* @param bytesFromCurrentOffset
* bytes read till now from current offset
* @param bytesToFetch
* the number of bytes to be fetched
* @return the number of bytes read, -1 if 0 bytes read
* @throws IOException
*/
@Override
protected int readData(final long bytesFromCurrentOffset, final int bytesToFetch) throws IOException
{
GetObjectRequest rangeObjectRequest = new GetObjectRequest(s3Params.bucketName, s3Params.filePath);
rangeObjectRequest.setRange(offset + bytesFromCurrentOffset, offset + bytesFromCurrentOffset + bytesToFetch - 1);
S3Object objectPortion = s3Params.s3Client.getObject(rangeObjectRequest);
S3ObjectInputStream wrappedStream = objectPortion.getObjectContent();
buffer = ByteStreams.toByteArray(wrappedStream);
wrappedStream.close();
int bufferLength = buffer.length;
if (bufferLength <= 0) {
return -1;
}
return bufferLength;
}
/**
* S3 block read would be achieved through the AmazonS3 client. Following are the steps to achieve:
* (1) Create the objectRequest from bucketName and filePath.
* (2) Set the range to the above created objectRequest.
* (3) Get the object portion through AmazonS3 client API.
* (4) Get the object content from the above object portion.
* @return the block entity
* @throws IOException
*/
@Override
protected Entity readEntity() throws IOException
{
entity.clear();
GetObjectRequest rangeObjectRequest = new GetObjectRequest(
bucketName, filePath);
rangeObjectRequest.setRange(offset, blockMetadata.getLength() - 1);
S3Object objectPortion = s3Client.getObject(rangeObjectRequest);
S3ObjectInputStream wrappedStream = objectPortion.getObjectContent();
byte[] record = ByteStreams.toByteArray(wrappedStream);
entity.setUsedBytes(record.length);
entity.setRecord(record);
wrappedStream.close();
return entity;
}
private synchronized void reopen(long pos) throws IOException {
if (wrappedStream != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Aborting old stream to open at pos " + pos);
}
wrappedStream.abort();
}
if (pos < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK
+" " + pos);
}
if (contentLength > 0 && pos > contentLength-1) {
throw new EOFException(
FSExceptionMessages.CANNOT_SEEK_PAST_EOF
+ " " + pos);
}
LOG.debug("Actually opening file " + key + " at pos " + pos);
GetObjectRequest request = new GetObjectRequest(bucket, key);
request.setRange(pos, contentLength-1);
wrappedStream = client.getObject(request).getObjectContent();
if (wrappedStream == null) {
throw new IOException("Null IO stream");
}
this.pos = pos;
}
private synchronized void reopen(long pos) throws IOException {
if (wrappedStream != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Aborting old stream to open at pos " + pos);
}
wrappedStream.abort();
}
if (pos < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK
+" " + pos);
}
if (contentLength > 0 && pos > contentLength-1) {
throw new EOFException(
FSExceptionMessages.CANNOT_SEEK_PAST_EOF
+ " " + pos);
}
LOG.debug("Actually opening file " + key + " at pos " + pos);
GetObjectRequest request = new GetObjectRequest(bucket, key);
request.setRange(pos, contentLength-1);
wrappedStream = client.getObject(request).getObjectContent();
if (wrappedStream == null) {
throw new IOException("Null IO stream");
}
this.pos = pos;
}
/**
* Fetch (if needed) and cache the segment data for <indexEntry>.
*
* @param url
* Used for propagating exception if needed.
* @param indexEntry
* Segment entry we need
* @return GZipped byte[] of segment data.
* @throws IOFetchException
*/
private byte[] getSegmentData(URL url, SecondaryIndex indexEntry) throws IOFetchException {
byte[] result = _cache.get(indexEntry.getSegmentId());
if (result != null) {
LOGGER.trace("Found segment #{} in our cache for '{}'", indexEntry.getSegmentId(), url);
return result;
}
long length = indexEntry.getSegmentLength();
result = new byte[(int) length];
String indexFilename = indexEntry.getIndexFilename();
String s3Path = S3Utils.makeS3FilePath(_crawlId, indexFilename);
GetObjectRequest objectRequest = new GetObjectRequest(S3Utils.getBucket(), s3Path);
long offset = indexEntry.getSegmentOffset();
objectRequest.setRange(offset, offset + length);
S3ObjectInputStream is = null;
try (S3Object object = _s3Client.getObject(objectRequest)) {
is = object.getObjectContent();
long startTime = System.currentTimeMillis();
IOUtils.read(is, result);
if (LOGGER.isTraceEnabled()) {
long deltaTime = System.currentTimeMillis() - startTime;
double responseRateExact = (double) length / deltaTime;
// Response rate is bytes/second, not bytes/millisecond
int responseRate = (int) Math.round(responseRateExact * 1000.0);
LOGGER.trace(String.format(
"Read %,d byte segment #%d at %,d offset within %s in %,dms (%,d bytes/sec) for '%s'",
length, indexEntry.getSegmentId(), offset, indexFilename, deltaTime,
responseRate, url));
}
_cache.put(indexEntry.getSegmentId(), result);
return result;
} catch (IOException e) {
throw new IOFetchException(url.toString(), e);
} finally {
if (is != null) {
is.abort();
}
}
}
/**
* Reads an object from S3, including its data.
*
* <p>
* Callers should be very careful when using this method; the returned
* Amazon S3 object contains a direct stream of data from the HTTP connection.
* The underlying HTTP connection cannot be closed until the user
* finishes reading the data and closes the stream. Callers should
* therefore:
* </p>
* <ul>
* <li>Use the data from the input stream in Amazon S3 object as soon as possible,</li>
* <li>Close the input stream in Amazon S3 object as soon as possible.</li>
* </ul>
* <p>
* If callers do not follow those rules, then the client can run out of
* resources if allocating too many open, but unused, HTTP connections.
* </p>
*
* @see AmazonS3#getObject(GetObjectRequest)
*
* @param uri - target URI
* @param service - S3 service
* @param start - byte offset
*
* @return S3Object
*
* @throws IOException
*/
static S3Object getObject(URI uri, AmazonS3 service, long start) throws IOException {
try {
uri = uri.normalize();
String[] path = getPath(uri);
String bucketName = path[0];
if (path.length < 2) {
throw new IOException(StringUtils.isEmpty(bucketName) ? "Cannot read from the root directory" : "Cannot read from bucket root directory");
}
GetObjectRequest request = new GetObjectRequest(bucketName, path[1]);
if (start > 0) {
// CLO-9500:
// TODO replace this with GetObjectRequest.setRange(start) when the library is updated
request.setRange(start, END_OF_FILE);
}
S3Object object = service.getObject(request);
return object;
} catch (Exception e) {
throw S3Utils.getIOException(e);
}
}
/**
* Reads data from S3 starting from startOffset till the endOffset and
* returns the number of bytes read
*
* @param startOffset
* offset from where to read
* @param endOffset
* offset till where to read
* @return number of bytes read
* @throws IOException
*/
protected int readData(long startOffset, long endOffset) throws IOException
{
GetObjectRequest rangeObjectRequest = new GetObjectRequest(s3Params.bucketName, s3Params.filePath);
rangeObjectRequest.setRange(startOffset, endOffset);
S3Object objectPortion = s3Params.s3Client.getObject(rangeObjectRequest);
S3ObjectInputStream wrappedStream = objectPortion.getObjectContent();
buffer = ByteStreams.toByteArray(wrappedStream);
wrappedStream.close();
return buffer.length;
}