下面列出了怎么用org.apache.hadoop.fs.FSInputStream的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Try to open a file for reading several times.
*
* If we fail because lease recovery hasn't completed, retry the open.
*/
private static FSInputStream dfsOpenFileWithRetries(DistributedFileSystem dfs,
String pathName) throws IOException {
IOException exc = null;
for (int tries = 0; tries < 10; tries++) {
try {
return dfs.dfs.open(pathName);
} catch (IOException e) {
exc = e;
}
if (!exc.getMessage().contains("Cannot obtain " +
"block length for LocatedBlock")) {
throw exc;
}
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {}
}
throw exc;
}
@Test
public void testCopyStreamTargetExists() throws Exception {
FSDataOutputStream out = mock(FSDataOutputStream.class);
whenFsCreate().thenReturn(out);
when(mockFs.getFileStatus(eq(path))).thenReturn(fileStat);
target.refreshStatus(); // so it's updated as existing
cmd.setOverwrite(true);
when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
when(mockFs.delete(eq(path), eq(false))).thenReturn(true);
when(mockFs.rename(eq(tmpPath), eq(path))).thenReturn(true);
FSInputStream in = mock(FSInputStream.class);
when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
tryCopyStream(in, true);
verify(mockFs).delete(eq(path), anyBoolean());
verify(mockFs).rename(eq(tmpPath), eq(path));
verify(mockFs, never()).delete(eq(tmpPath), anyBoolean());
verify(mockFs, never()).close();
}
@Test
public void testInterruptedCopyBytes() throws Exception {
FSDataOutputStream out = mock(FSDataOutputStream.class);
whenFsCreate().thenReturn(out);
when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
FSInputStream in = mock(FSInputStream.class);
// make IOUtils.copyBytes fail
when(in.read(any(byte[].class), anyInt(), anyInt())).thenThrow(
new InterruptedIOException());
tryCopyStream(in, false);
verify(mockFs).delete(eq(tmpPath), anyBoolean());
verify(mockFs, never()).rename(any(Path.class), any(Path.class));
verify(mockFs, never()).delete(eq(path), anyBoolean());
verify(mockFs, never()).close();
}
@Test
public void testInterruptedRename() throws Exception {
FSDataOutputStream out = mock(FSDataOutputStream.class);
whenFsCreate().thenReturn(out);
when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
when(mockFs.rename(eq(tmpPath), eq(path))).thenThrow(
new InterruptedIOException());
FSInputStream in = mock(FSInputStream.class);
when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
tryCopyStream(in, false);
verify(mockFs).delete(eq(tmpPath), anyBoolean());
verify(mockFs).rename(eq(tmpPath), eq(path));
verify(mockFs, never()).delete(eq(path), anyBoolean());
verify(mockFs, never()).close();
}
/**
* Try to open a file for reading several times.
*
* If we fail because lease recovery hasn't completed, retry the open.
*/
private static FSInputStream dfsOpenFileWithRetries(DistributedFileSystem dfs,
String pathName) throws IOException {
IOException exc = null;
for (int tries = 0; tries < 10; tries++) {
try {
return dfs.dfs.open(pathName);
} catch (IOException e) {
exc = e;
}
if (!exc.getMessage().contains("Cannot obtain " +
"block length for LocatedBlock")) {
throw exc;
}
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {}
}
throw exc;
}
@Test
public void testCopyStreamTargetExists() throws Exception {
FSDataOutputStream out = mock(FSDataOutputStream.class);
whenFsCreate().thenReturn(out);
when(mockFs.getFileStatus(eq(path))).thenReturn(fileStat);
target.refreshStatus(); // so it's updated as existing
cmd.setOverwrite(true);
when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
when(mockFs.delete(eq(path), eq(false))).thenReturn(true);
when(mockFs.rename(eq(tmpPath), eq(path))).thenReturn(true);
FSInputStream in = mock(FSInputStream.class);
when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
tryCopyStream(in, true);
verify(mockFs).delete(eq(path), anyBoolean());
verify(mockFs).rename(eq(tmpPath), eq(path));
verify(mockFs, never()).delete(eq(tmpPath), anyBoolean());
verify(mockFs, never()).close();
}
@Test
public void testInterruptedCopyBytes() throws Exception {
FSDataOutputStream out = mock(FSDataOutputStream.class);
whenFsCreate().thenReturn(out);
when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
FSInputStream in = mock(FSInputStream.class);
// make IOUtils.copyBytes fail
when(in.read(any(byte[].class), anyInt(), anyInt())).thenThrow(
new InterruptedIOException());
tryCopyStream(in, false);
verify(mockFs).delete(eq(tmpPath), anyBoolean());
verify(mockFs, never()).rename(any(Path.class), any(Path.class));
verify(mockFs, never()).delete(eq(path), anyBoolean());
verify(mockFs, never()).close();
}
@Test
public void testInterruptedRename() throws Exception {
FSDataOutputStream out = mock(FSDataOutputStream.class);
whenFsCreate().thenReturn(out);
when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
when(mockFs.rename(eq(tmpPath), eq(path))).thenThrow(
new InterruptedIOException());
FSInputStream in = mock(FSInputStream.class);
when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
tryCopyStream(in, false);
verify(mockFs).delete(eq(tmpPath), anyBoolean());
verify(mockFs).rename(eq(tmpPath), eq(path));
verify(mockFs, never()).delete(eq(path), anyBoolean());
verify(mockFs, never()).close();
}
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readBlockLazily, boolean reverseReader) throws IOException {
FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
this.inputStream = new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize));
} else {
// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
// need to wrap in another BufferedFSInputStream the make bufferSize work?
this.inputStream = fsDataInputStream;
}
this.logFile = logFile;
this.readerSchema = readerSchema;
this.readBlockLazily = readBlockLazily;
this.reverseReader = reverseReader;
if (this.reverseReader) {
this.reverseLogFilePosition = this.lastReverseLogFilePosition = fs.getFileStatus(logFile.getPath()).getLen();
}
addShutDownHook();
}
private void verifyDir(DistributedFileSystem dfs, Path dir,
CRC32 overallChecksum) throws IOException {
FileStatus[] fileArr = dfs.listStatus(dir);
TreeMap<Path, Boolean> fileMap = new TreeMap<Path, Boolean>();
for(FileStatus file : fileArr) {
fileMap.put(file.getPath(), Boolean.valueOf(file.isDirectory()));
}
for(Iterator<Path> it = fileMap.keySet().iterator(); it.hasNext();) {
Path path = it.next();
boolean isDir = fileMap.get(path);
String pathName = path.toUri().getPath();
overallChecksum.update(pathName.getBytes());
if ( isDir ) {
verifyDir(dfs, path, overallChecksum);
} else {
// this is not a directory. Checksum the file data.
CRC32 fileCRC = new CRC32();
FSInputStream in = dfsOpenFileWithRetries(dfs, pathName);
byte[] buf = new byte[4096];
int nRead = 0;
while ( (nRead = in.read(buf, 0, buf.length)) > 0 ) {
fileCRC.update(buf, 0, nRead);
}
verifyChecksum(pathName, fileCRC.getValue());
}
}
}
@Test
public void testCopyStreamTarget() throws Exception {
FSDataOutputStream out = mock(FSDataOutputStream.class);
whenFsCreate().thenReturn(out);
when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
when(mockFs.rename(eq(tmpPath), eq(path))).thenReturn(true);
FSInputStream in = mock(FSInputStream.class);
when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
tryCopyStream(in, true);
verify(mockFs, never()).delete(eq(path), anyBoolean());
verify(mockFs).rename(eq(tmpPath), eq(path));
verify(mockFs, never()).delete(eq(tmpPath), anyBoolean());
verify(mockFs, never()).close();
}
private void verifyDir(DistributedFileSystem dfs, Path dir,
CRC32 overallChecksum) throws IOException {
FileStatus[] fileArr = dfs.listStatus(dir);
TreeMap<Path, Boolean> fileMap = new TreeMap<Path, Boolean>();
for(FileStatus file : fileArr) {
fileMap.put(file.getPath(), Boolean.valueOf(file.isDirectory()));
}
for(Iterator<Path> it = fileMap.keySet().iterator(); it.hasNext();) {
Path path = it.next();
boolean isDir = fileMap.get(path);
String pathName = path.toUri().getPath();
overallChecksum.update(pathName.getBytes());
if ( isDir ) {
verifyDir(dfs, path, overallChecksum);
} else {
// this is not a directory. Checksum the file data.
CRC32 fileCRC = new CRC32();
FSInputStream in = dfsOpenFileWithRetries(dfs, pathName);
byte[] buf = new byte[4096];
int nRead = 0;
while ( (nRead = in.read(buf, 0, buf.length)) > 0 ) {
fileCRC.update(buf, 0, nRead);
}
verifyChecksum(pathName, fileCRC.getValue());
}
}
}
@Test
public void testCopyStreamTarget() throws Exception {
FSDataOutputStream out = mock(FSDataOutputStream.class);
whenFsCreate().thenReturn(out);
when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
when(mockFs.rename(eq(tmpPath), eq(path))).thenReturn(true);
FSInputStream in = mock(FSInputStream.class);
when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
tryCopyStream(in, true);
verify(mockFs, never()).delete(eq(path), anyBoolean());
verify(mockFs).rename(eq(tmpPath), eq(path));
verify(mockFs, never()).delete(eq(tmpPath), anyBoolean());
verify(mockFs, never()).close();
}
@Override
public FSDataInputStream open(Path f, int buffersize) throws IOException {
HttpURLConnection connection = null;
connection = openConnection("/data" + f.toUri().getPath(), "ugi=" + ugi);
connection.setRequestMethod("GET");
connection.connect();
final InputStream in = connection.getInputStream();
return new FSDataInputStream(new FSInputStream() {
public int read() throws IOException {
return in.read();
}
public int read(byte[] b, int off, int len) throws IOException {
return in.read(b, off, len);
}
public void close() throws IOException {
in.close();
}
public void seek(long pos) throws IOException {
throw new IOException("Can't seek!");
}
public long getPos() throws IOException {
throw new IOException("Position unknown!");
}
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
});
}
public StrictBufferedFSInputStream(FSInputStream in,
int size) {
super(in, size);
}
public StrictBufferedFSInputStream(FSInputStream in,
int size) {
super(in, size);
}
public StrictBufferedFSInputStream(FSInputStream in,
int size) {
super(in, size);
}