下面列出了org.apache.hadoop.fs.s3.S3Exception#org.apache.hadoop.fs.FSExceptionMessages 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public synchronized void seek(long targetPos) throws IOException {
checkNotClosed();
// Do not allow negative seek
if (targetPos < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK
+ " " + targetPos);
}
if (contentLength <= 0) {
return;
}
// Lazy seek
nextReadPos = targetPos;
}
/**
* Verify that the output stream is open. Non blocking; this gives
* the last state of the volatile {@link #closed} field.
* @throws IOException if the connection is closed.
*/
private void checkNotClosed() throws IOException {
if (closed) {
throw new IOException(
": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: "
+ blockOutputStreamEntryPool.getKeyName());
}
}
/**
* Verify that the input stream is open. Non blocking; this gives
* the last state of the volatile {@link #closed} field.
* @throws IOException if the connection is closed.
*/
private void checkOpen() throws IOException {
if (closed) {
throw new IOException(
": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + key);
}
}
@Override
public void seek(long pos) throws IOException {
if (pos < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
}
if (in == null) {
throw new SwiftConnectionClosedException(FSExceptionMessages.STREAM_IS_CLOSED);
}
super.seek(pos);
}
@Override
public synchronized void seek(long newpos) throws IOException {
if (newpos < 0) {
throw new EOFException(
FSExceptionMessages.NEGATIVE_SEEK);
}
if (pos != newpos) {
// the seek is attempting to move the current position
reopen(newpos);
}
}
private synchronized void reopen(long pos) throws IOException {
if (wrappedStream != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Aborting old stream to open at pos " + pos);
}
wrappedStream.abort();
}
if (pos < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK
+" " + pos);
}
if (contentLength > 0 && pos > contentLength-1) {
throw new EOFException(
FSExceptionMessages.CANNOT_SEEK_PAST_EOF
+ " " + pos);
}
LOG.debug("Actually opening file " + key + " at pos " + pos);
GetObjectRequest request = new GetObjectRequest(bucket, key);
request.setRange(pos, contentLength-1);
wrappedStream = client.getObject(request).getObjectContent();
if (wrappedStream == null) {
throw new IOException("Null IO stream");
}
this.pos = pos;
}
@Override
public void seek(long pos) throws IOException {
if (pos < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
}
if (in == null) {
throw new SwiftConnectionClosedException(FSExceptionMessages.STREAM_IS_CLOSED);
}
super.seek(pos);
}
@Override
public synchronized void seek(long newpos) throws IOException {
if (newpos < 0) {
throw new EOFException(
FSExceptionMessages.NEGATIVE_SEEK);
}
if (pos != newpos) {
// the seek is attempting to move the current position
reopen(newpos);
}
}
private synchronized void reopen(long pos) throws IOException {
if (wrappedStream != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Aborting old stream to open at pos " + pos);
}
wrappedStream.abort();
}
if (pos < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK
+" " + pos);
}
if (contentLength > 0 && pos > contentLength-1) {
throw new EOFException(
FSExceptionMessages.CANNOT_SEEK_PAST_EOF
+ " " + pos);
}
LOG.debug("Actually opening file " + key + " at pos " + pos);
GetObjectRequest request = new GetObjectRequest(bucket, key);
request.setRange(pos, contentLength-1);
wrappedStream = client.getObject(request).getObjectContent();
if (wrappedStream == null) {
throw new IOException("Null IO stream");
}
this.pos = pos;
}
/**
* Seek to an offset. If the data is already in the buffer, move to it
* @param targetPos target position
* @throws IOException on any problem
*/
@Override
public synchronized void seek(long targetPos) throws IOException {
if (targetPos < 0) {
throw new EOFException(
FSExceptionMessages.NEGATIVE_SEEK);
}
//there's some special handling of near-local data
//as the seek can be omitted if it is in/adjacent
long offset = targetPos - pos;
if (LOG.isDebugEnabled()) {
LOG.debug("Seek to " + targetPos + "; current pos =" + pos
+ "; offset="+offset);
}
if (offset == 0) {
LOG.debug("seek is no-op");
return;
}
if (offset < 0) {
LOG.debug("seek is backwards");
} else if ((rangeOffset + offset < bufferSize)) {
//if the seek is in range of that requested, scan forwards
//instead of closing and re-opening a new HTTP connection
SwiftUtils.debug(LOG,
"seek is within current stream"
+ "; pos= %d ; targetPos=%d; "
+ "offset= %d ; bufferOffset=%d",
pos, targetPos, offset, rangeOffset);
try {
LOG.debug("chomping ");
chompBytes(offset);
} catch (IOException e) {
//this is assumed to be recoverable with a seek -or more likely to fail
LOG.debug("while chomping ",e);
}
if (targetPos - pos == 0) {
LOG.trace("chomping successful");
return;
}
LOG.trace("chomping failed");
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Seek is beyond buffer size of " + bufferSize);
}
}
innerClose("seeking to " + targetPos);
fillBuffer(targetPos);
}
private void checkNotClosed() throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
}
/**
* Seek to an offset. If the data is already in the buffer, move to it
* @param targetPos target position
* @throws IOException on any problem
*/
@Override
public synchronized void seek(long targetPos) throws IOException {
if (targetPos < 0) {
throw new EOFException(
FSExceptionMessages.NEGATIVE_SEEK);
}
//there's some special handling of near-local data
//as the seek can be omitted if it is in/adjacent
long offset = targetPos - pos;
if (LOG.isDebugEnabled()) {
LOG.debug("Seek to " + targetPos + "; current pos =" + pos
+ "; offset="+offset);
}
if (offset == 0) {
LOG.debug("seek is no-op");
return;
}
if (offset < 0) {
LOG.debug("seek is backwards");
} else if ((rangeOffset + offset < bufferSize)) {
//if the seek is in range of that requested, scan forwards
//instead of closing and re-opening a new HTTP connection
SwiftUtils.debug(LOG,
"seek is within current stream"
+ "; pos= %d ; targetPos=%d; "
+ "offset= %d ; bufferOffset=%d",
pos, targetPos, offset, rangeOffset);
try {
LOG.debug("chomping ");
chompBytes(offset);
} catch (IOException e) {
//this is assumed to be recoverable with a seek -or more likely to fail
LOG.debug("while chomping ",e);
}
if (targetPos - pos == 0) {
LOG.trace("chomping successful");
return;
}
LOG.trace("chomping failed");
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Seek is beyond buffer size of " + bufferSize);
}
}
innerClose("seeking to " + targetPos);
fillBuffer(targetPos);
}
private void checkNotClosed() throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
}
/**
* Verify that the input stream is open. Non blocking; this gives
* the last state of the volatile {@link #closed} field.
* @throws IOException if the connection is closed
*/
private void checkNotClosed() throws IOException {
if (closed) {
throw new IOException(uri + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
}
}