下面列出了org.apache.hadoop.fs.FSDataOutputStream#getWrappedStream ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void run() {
System.out.println("Workload starting ");
for (int i = 0; i < numberOfFiles; i++) {
Path filename = new Path(id + "." + i);
try {
System.out.println("Workload processing file " + filename);
FSDataOutputStream stm = createFile(fs, filename, replication);
DFSOutputStream dfstream = (DFSOutputStream)
(stm.getWrappedStream());
dfstream.setArtificialSlowdown(1000);
writeFile(stm, myseed);
stm.close();
checkFile(fs, filename, replication, numBlocks, fileSize, myseed);
} catch (Throwable e) {
System.out.println("Workload exception " + e);
assertTrue(e.toString(), false);
}
// increment the stamp to indicate that another file is done.
synchronized (this) {
stamp++;
}
}
}
@Override
public void run() {
System.out.println("Workload starting ");
for (int i = 0; i < numberOfFiles; i++) {
Path filename = new Path(id + "." + i);
try {
System.out.println("Workload processing file " + filename);
FSDataOutputStream stm = createFile(fs, filename, replication);
DFSOutputStream dfstream = (DFSOutputStream)
(stm.getWrappedStream());
dfstream.setArtificialSlowdown(1000);
writeFile(stm, myseed);
stm.close();
checkFile(fs, filename, replication, numBlocks, fileSize, myseed);
} catch (Throwable e) {
System.out.println("Workload exception " + e);
assertTrue(e.toString(), false);
}
// increment the stamp to indicate that another file is done.
synchronized (this) {
stamp++;
}
}
}
public void run() {
System.out.println("Workload starting ");
for (int i = 0; i < numberOfFiles; i++) {
Path filename = new Path(id + "." + i);
try {
System.out.println("Workload processing file " + filename);
FSDataOutputStream stm = createFile(fs, filename, replication);
DFSOutputStream dfstream = (DFSOutputStream)(stm.getWrappedStream());
dfstream.setArtificialSlowdown(1000);
writeFile(stm, myseed);
stm.close();
checkFile(fs, filename, replication, numBlocks, fileSize, myseed);
} catch (Throwable e) {
System.out.println("Workload exception " + e);
assertTrue(e.toString(), false);
}
// increment the stamp to indicate that another file is done.
synchronized (this) {
stamp++;
}
}
}
public void run() {
System.out.println("Workload starting ");
for (int i = 0; i < numberOfFiles; i++) {
Path filename = new Path(id + "." + i);
try {
System.out.println("Workload processing file " + filename);
FSDataOutputStream stm = createFile(fs, filename, replication);
DFSClient.DFSOutputStream dfstream = (DFSClient.DFSOutputStream)
(stm.getWrappedStream());
dfstream.setArtificialSlowdown(1000);
writeFile(stm, myseed);
stm.close();
checkFile(fs, filename, replication, numBlocks, fileSize, myseed);
} catch (Throwable e) {
System.out.println("Workload exception " + e);
assertTrue(e.toString(), false);
}
// increment the stamp to indicate that another file is done.
synchronized (this) {
stamp++;
}
}
}
private void hsync(FSDataOutputStream fsOut) throws Exception {
// 调用hsync时,必须设置SyncFlag.UPDATE_LENGTH,否则RDD或者MR任务读取不到写入的数据
// 参见:
// https://issues.cloudera.org/browse/DISTRO-696;
// http://www.hypertable.com/documentation/administrator_guide/hdfs_and_durability
// https://blog.csdn.net/leen0304/article/details/77854052?locationNum=10&fps=1
// https://issues.apache.org/jira/browse/HDFS-11915
if (fsOut instanceof HdfsDataOutputStream) {
((HdfsDataOutputStream) fsOut).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
} else if (fsOut.getWrappedStream() instanceof DFSOutputStream) {
((DFSOutputStream) fsOut.getWrappedStream()).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
} else {
fsOut.hsync();
}
}
/**
* Assert that the no. of partitions written matches expectations
* @param action operation (for use in the assertions)
* @param out output stream
* @param expected expected no. of partitions
*/
protected void assertPartitionsWritten(String action, FSDataOutputStream out,
long expected) {
OutputStream nativeStream = out.getWrappedStream();
int written = getPartitionsWritten(out);
if(written !=expected) {
Assert.fail(action + ": " +
TestSwiftFileSystemPartitionedUploads.WRONG_PARTITION_COUNT
+ " + expected: " + expected + " actual: " + written
+ " -- " + nativeStream);
}
}
/**
* Assert that the no. of partitions written matches expectations
* @param action operation (for use in the assertions)
* @param out output stream
* @param expected expected no. of partitions
*/
protected void assertPartitionsWritten(String action, FSDataOutputStream out,
long expected) {
OutputStream nativeStream = out.getWrappedStream();
int written = getPartitionsWritten(out);
if(written !=expected) {
Assert.fail(action + ": " +
TestSwiftFileSystemPartitionedUploads.WRONG_PARTITION_COUNT
+ " + expected: " + expected + " actual: " + written
+ " -- " + nativeStream);
}
}
public static void abortForTest(FSDataOutputStream out) throws IOException {
OutputStream stream = out.getWrappedStream();
if (stream instanceof DFSOutputStream) {
DFSOutputStream dfsOutputStream =
(DFSOutputStream) stream;
dfsOutputStream.abortForTests();
}
//no-op otherwise
}
/**
* Assert that the no. of partitions written matches expectations
* @param action operation (for use in the assertions)
* @param out output stream
* @param expected expected no. of partitions
*/
protected void assertPartitionsWritten(String action, FSDataOutputStream out,
long expected) {
OutputStream nativeStream = out.getWrappedStream();
int written = getPartitionsWritten(out);
if(written !=expected) {
Assert.fail(action + ": " +
TestSwiftFileSystemPartitionedUploads.WRONG_PARTITION_COUNT
+ " + expected: " + expected + " actual: " + written
+ " -- " + nativeStream);
}
}
/**
* Tests that round-trip acks in a datanode write pipeline are correctly
* measured.
*/
@Test
public void testRoundTripAckMetric() throws Exception {
final int datanodeCount = 2;
final int interval = 1;
Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
datanodeCount).build();
try {
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
// Open a file and get the head of the pipeline
Path testFile = new Path("/testRoundTripAckMetric.txt");
FSDataOutputStream fsout = fs.create(testFile, (short) datanodeCount);
DFSOutputStream dout = (DFSOutputStream) fsout.getWrappedStream();
// Slow down the writes to catch the write pipeline
dout.setChunksPerPacket(5);
dout.setArtificialSlowdown(3000);
fsout.write(new byte[10000]);
DatanodeInfo[] pipeline = null;
int count = 0;
while (pipeline == null && count < 5) {
pipeline = dout.getPipeline();
System.out.println("Waiting for pipeline to be created.");
Thread.sleep(1000);
count++;
}
// Get the head node that should be receiving downstream acks
DatanodeInfo headInfo = pipeline[0];
DataNode headNode = null;
for (DataNode datanode : cluster.getDataNodes()) {
if (datanode.getDatanodeId().equals(headInfo)) {
headNode = datanode;
break;
}
}
assertNotNull("Could not find the head of the datanode write pipeline",
headNode);
// Close the file and wait for the metrics to rollover
Thread.sleep((interval + 1) * 1000);
// Check the ack was received
MetricsRecordBuilder dnMetrics = getMetrics(headNode.getMetrics()
.name());
assertTrue("Expected non-zero number of acks",
getLongCounter("PacketAckRoundTripTimeNanosNumOps", dnMetrics) > 0);
assertQuantileGauges("PacketAckRoundTripTimeNanos" + interval
+ "s", dnMetrics);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
private static SwiftNativeOutputStream getSwiftNativeOutputStream(
FSDataOutputStream outputStream) {
OutputStream wrappedStream = outputStream.getWrappedStream();
return (SwiftNativeOutputStream) wrappedStream;
}
/**
* Tests that round-trip acks in a datanode write pipeline are correctly
* measured.
*/
@Test
public void testRoundTripAckMetric() throws Exception {
final int datanodeCount = 2;
final int interval = 1;
Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
datanodeCount).build();
try {
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
// Open a file and get the head of the pipeline
Path testFile = new Path("/testRoundTripAckMetric.txt");
FSDataOutputStream fsout = fs.create(testFile, (short) datanodeCount);
DFSOutputStream dout = (DFSOutputStream) fsout.getWrappedStream();
// Slow down the writes to catch the write pipeline
dout.setChunksPerPacket(5);
dout.setArtificialSlowdown(3000);
fsout.write(new byte[10000]);
DatanodeInfo[] pipeline = null;
int count = 0;
while (pipeline == null && count < 5) {
pipeline = dout.getPipeline();
System.out.println("Waiting for pipeline to be created.");
Thread.sleep(1000);
count++;
}
// Get the head node that should be receiving downstream acks
DatanodeInfo headInfo = pipeline[0];
DataNode headNode = null;
for (DataNode datanode : cluster.getDataNodes()) {
if (datanode.getDatanodeId().equals(headInfo)) {
headNode = datanode;
break;
}
}
assertNotNull("Could not find the head of the datanode write pipeline",
headNode);
// Close the file and wait for the metrics to rollover
Thread.sleep((interval + 1) * 1000);
// Check the ack was received
MetricsRecordBuilder dnMetrics = getMetrics(headNode.getMetrics()
.name());
assertTrue("Expected non-zero number of acks",
getLongCounter("PacketAckRoundTripTimeNanosNumOps", dnMetrics) > 0);
assertQuantileGauges("PacketAckRoundTripTimeNanos" + interval
+ "s", dnMetrics);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
private static SwiftNativeOutputStream getSwiftNativeOutputStream(
FSDataOutputStream outputStream) {
OutputStream wrappedStream = outputStream.getWrappedStream();
return (SwiftNativeOutputStream) wrappedStream;
}
private static SwiftNativeOutputStream getSwiftNativeOutputStream(
FSDataOutputStream outputStream) {
OutputStream wrappedStream = outputStream.getWrappedStream();
return (SwiftNativeOutputStream) wrappedStream;
}
/**
* Currently, we need to expose the writer's OutputStream to tests so that they can manipulate the
* default behavior (such as setting the maxRecoveryErrorCount value). This is
* done using reflection on the underlying HDFS OutputStream. NOTE: This could be removed once Hadoop1 support is
* removed.
* @return null if underlying stream is not ready.
*/
@VisibleForTesting
OutputStream getOutputStream() {
FSDataOutputStream fsdos = this.hdfs_out;
return fsdos != null ? fsdos.getWrappedStream() : null;
}