下面列出了org.apache.hadoop.fs.FSDataInputStream#readLine ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void createCentersSequenceFile (Configuration conf, FileSystem fs, String centroidsPath, String sequenceFilePath) throws Exception {
Path seqFile = new Path (sequenceFilePath);
if (fs.exists(seqFile)) {
fs.delete(seqFile, true);
}
FSDataInputStream inputStream = fs.open(new Path(centroidsPath));
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, seqFile, Centroid.class, IntWritable.class);
IntWritable value = new IntWritable(0);
while (inputStream.available() > 0) {
String line = inputStream.readLine();
StringTokenizer tokenizer = new StringTokenizer(line, " ");
int dim = tokenizer.countTokens() - 1;
int clusterId = Integer.valueOf(tokenizer.nextToken());
double [] coords = new double [dim];
for (int i = 0; i < dim; i++) {
coords[i] = Double.valueOf(tokenizer.nextToken());
}
Centroid cluster = new Centroid(clusterId, new Point(coords));
writer.append(cluster, value);
}
IOUtils.closeStream(writer);
inputStream.close();
}
private void verifyOutput(RunningJob submittedJob, FileSystem fileSystem)
throws Exception {
FSDataInputStream dis = null;
long numValidRecords = 0;
long numInvalidRecords = 0;
long numMappersLaunched = NUM_MAPPERS;
String prevKeyValue = "000000000";
Path[] fileList =
FileUtil.stat2Paths(fileSystem.listStatus(OUTPUT,
new Utils.OutputFileUtils.OutputFilesFilter()));
for (Path outFile : fileList) {
try {
dis = fileSystem.open(outFile);
String record;
while((record = dis.readLine()) != null) {
// Split the line into key and value.
int blankPos = record.indexOf(" ");
String keyString = record.substring(0, blankPos);
String valueString = record.substring(blankPos+1);
// Check for sorted output and correctness of record.
if (keyString.compareTo(prevKeyValue) >= 0
&& keyString.equals(valueString)) {
prevKeyValue = keyString;
numValidRecords++;
} else {
numInvalidRecords++;
}
}
} finally {
if (dis != null) {
dis.close();
dis = null;
}
}
}
// Make sure we got all input records in the output in sorted order.
assertEquals((long)(NUM_MAPPERS*NUM_LINES), numValidRecords);
// Make sure there is no extraneous invalid record.
assertEquals(0, numInvalidRecords);
}
private void verifyOutput(RunningJob submittedJob, FileSystem fileSystem, int numMappers, int numLines)
throws Exception {
FSDataInputStream dis = null;
long numValidRecords = 0;
long numInvalidRecords = 0;
String prevKeyValue = "000000000";
Path[] fileList =
FileUtil.stat2Paths(fileSystem.listStatus(OUTPUT,
new Utils.OutputFileUtils.OutputFilesFilter()));
for (Path outFile : fileList) {
try {
dis = fileSystem.open(outFile);
String record;
while((record = dis.readLine()) != null) {
// Split the line into key and value.
int blankPos = record.indexOf(" ");
String keyString = record.substring(0, blankPos);
String valueString = record.substring(blankPos+1);
// Check for sorted output and correctness of record.
if (keyString.compareTo(prevKeyValue) >= 0
&& keyString.equals(valueString)) {
prevKeyValue = keyString;
numValidRecords++;
} else {
numInvalidRecords++;
}
}
} finally {
if (dis != null) {
dis.close();
dis = null;
}
}
}
// Make sure we got all input records in the output in sorted order.
assertEquals((long)(numMappers * numLines), numValidRecords);
// Make sure there is no extraneous invalid record.
assertEquals(0, numInvalidRecords);
}
private void verifyOutput(RunningJob submittedJob, FileSystem fileSystem)
throws Exception {
FSDataInputStream dis = null;
long numValidRecords = 0;
long numInvalidRecords = 0;
long numMappersLaunched = NUM_MAPPERS;
String prevKeyValue = "000000000";
Path[] fileList =
FileUtil.stat2Paths(fileSystem.listStatus(OUTPUT,
new Utils.OutputFileUtils.OutputFilesFilter()));
for (Path outFile : fileList) {
try {
dis = fileSystem.open(outFile);
String record;
while((record = dis.readLine()) != null) {
// Split the line into key and value.
int blankPos = record.indexOf(" ");
String keyString = record.substring(0, blankPos);
String valueString = record.substring(blankPos+1);
// Check for sorted output and correctness of record.
if (keyString.compareTo(prevKeyValue) >= 0
&& keyString.equals(valueString)) {
prevKeyValue = keyString;
numValidRecords++;
} else {
numInvalidRecords++;
}
}
} finally {
if (dis != null) {
dis.close();
dis = null;
}
}
}
// Make sure we got all input records in the output in sorted order.
assertEquals((long)(NUM_MAPPERS*NUM_LINES), numValidRecords);
// Make sure there is no extraneous invalid record.
assertEquals(0, numInvalidRecords);
}
private void verifyOutput(RunningJob submittedJob, FileSystem fileSystem, int numMappers, int numLines)
throws Exception {
FSDataInputStream dis = null;
long numValidRecords = 0;
long numInvalidRecords = 0;
String prevKeyValue = "000000000";
Path[] fileList =
FileUtil.stat2Paths(fileSystem.listStatus(OUTPUT,
new Utils.OutputFileUtils.OutputFilesFilter()));
for (Path outFile : fileList) {
try {
dis = fileSystem.open(outFile);
String record;
while((record = dis.readLine()) != null) {
// Split the line into key and value.
int blankPos = record.indexOf(" ");
String keyString = record.substring(0, blankPos);
String valueString = record.substring(blankPos+1);
// Check for sorted output and correctness of record.
if (keyString.compareTo(prevKeyValue) >= 0
&& keyString.equals(valueString)) {
prevKeyValue = keyString;
numValidRecords++;
} else {
numInvalidRecords++;
}
}
} finally {
if (dis != null) {
dis.close();
dis = null;
}
}
}
// Make sure we got all input records in the output in sorted order.
assertEquals((long)(numMappers * numLines), numValidRecords);
// Make sure there is no extraneous invalid record.
assertEquals(0, numInvalidRecords);
}
/**
* {@inheritDoc}
*/
@Override
public void parse(byte[] historyFileContents, JobKey jobKey)
throws ProcessingException {
this.jobKey = jobKey;
this.jobKeyBytes = jobKeyConv.toBytes(jobKey);
this.jobDetails = new JobDetails(jobKey);
initializeJobDetails();
setJobId(jobKey.getJobId().getJobIdString());
try {
FSDataInputStream in =
new FSDataInputStream(new ByteArrayWrapper(historyFileContents));
/** first line is the version, ignore it */
String versionIgnore = in.readLine();
/** second line in file is the schema */
this.schema = schema.parse(in.readLine());
/** now figure out the schema */
understandSchema(schema.toString());
/** now read the rest of the file */
this.reader = new GenericDatumReader<GenericRecord>(schema);
this.decoder = DecoderFactory.get().jsonDecoder(schema, in);
GenericRecord record = null;
Hadoop2RecordType recType = null;
try {
while ((record = reader.read(null, decoder)) != null) {
if (record.get(TYPE) != null) {
recType = EVENT_RECORD_NAMES.get(record.get(TYPE).toString());
} else {
throw new ProcessingException("expected one of "
+ Arrays.asList(Hadoop2RecordType.values())
+ " \n but not found, cannot process this record! " + jobKey);
}
if (recType == null) {
throw new ProcessingException("new record type has surfaced: "
+ record.get(TYPE).toString() + " cannot process this record! " + jobKey);
}
// GenericRecord's get returns an Object
Object eDetails = record.get(EVENT);
// confirm that we got an "event" object
if (eDetails != null) {
JSONObject eventDetails = new JSONObject(eDetails.toString());
processRecords(recType, eventDetails);
} else {
throw new ProcessingException("expected event details but not found "
+ record.get(TYPE).toString() + " cannot process this record! " + jobKey);
}
}
} catch (EOFException eof) {
// not an error, simply end of file
LOG.info("Done parsing file, reached eof for " + jobKey);
}
} catch (IOException ioe) {
throw new ProcessingException(" Unable to parse history file in function parse, "
+ "cannot process this record!" + jobKey + " error: ", ioe);
} catch (JSONException jse) {
throw new ProcessingException(" Unable to parse history file in function parse, "
+ "cannot process this record! " + jobKey + " error: ", jse);
} catch (IllegalArgumentException iae) {
throw new ProcessingException(" Unable to parse history file in function parse, "
+ "cannot process this record! " + jobKey + " error: ", iae);
}
/*
* set the job status for this job once the entire file is parsed
* this has to be done separately
* since JOB_FINISHED event is missing the field jobStatus,
* where as JOB_KILLED and JOB_FAILED
* events are not so we need to look through the whole file to confirm
* the job status and then generate the put
*/
Put jobStatusPut = getJobStatusPut();
this.jobPuts.add(jobStatusPut);
// set the hadoop version for this record
Put versionPut = getHadoopVersionPut(JobHistoryFileParserFactory.getHistoryFileVersion2(), this.jobKeyBytes);
this.jobPuts.add(versionPut);
LOG.info("For " + this.jobKey + " #jobPuts " + jobPuts.size() + " #taskPuts: "
+ taskPuts.size());
}