下面列出了怎么用org.apache.hadoop.util.LineReader的API类实例代码及写法,或者点击链接到github查看源代码。
public void testUTF8() throws Exception {
LineReader in = null;
try {
in = makeStream("abcd\u20acbdcd\u20ac");
Text line = new Text();
in.readLine(line);
assertEquals("readLine changed utf8 characters",
"abcd\u20acbdcd\u20ac", line.toString());
in = makeStream("abc\u200axyz");
in.readLine(line);
assertEquals("split on fake newline", "abc\u200axyz", line.toString());
} finally {
if (in != null) {
in.close();
}
}
}
public void testNewLines() throws Exception {
LineReader in = null;
try {
in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");
Text out = new Text();
in.readLine(out);
assertEquals("line1 length", 1, out.getLength());
in.readLine(out);
assertEquals("line2 length", 2, out.getLength());
in.readLine(out);
assertEquals("line3 length", 0, out.getLength());
in.readLine(out);
assertEquals("line4 length", 3, out.getLength());
in.readLine(out);
assertEquals("line5 length", 4, out.getLength());
in.readLine(out);
assertEquals("line5 length", 5, out.getLength());
assertEquals("end of file", 0, in.readLine(out));
} finally {
if (in != null) {
in.close();
}
}
}
/**
* Test readLine for correct interpretation of maxLineLength
* (returned string should be clipped at maxLineLength, and the
* remaining bytes on the same line should be thrown out).
* Also check that returned value matches the string length.
* Varies buffer size to stress test.
*
* @throws Exception
*/
@Test (timeout=5000)
public void testMaxLineLength() throws Exception {
final String STR = "a\nbb\n\nccc\rdddd\r\neeeee";
final int STRLENBYTES = STR.getBytes().length;
Text out = new Text();
for (int bufsz = 1; bufsz < STRLENBYTES+1; ++bufsz) {
LineReader in = makeStream(STR, bufsz);
int c = 0;
c += in.readLine(out, 1);
assertEquals("line1 length, bufsz: "+bufsz, 1, out.getLength());
c += in.readLine(out, 1);
assertEquals("line2 length, bufsz: "+bufsz, 1, out.getLength());
c += in.readLine(out, 1);
assertEquals("line3 length, bufsz: "+bufsz, 0, out.getLength());
c += in.readLine(out, 3);
assertEquals("line4 length, bufsz: "+bufsz, 3, out.getLength());
c += in.readLine(out, 10);
assertEquals("line5 length, bufsz: "+bufsz, 4, out.getLength());
c += in.readLine(out, 8);
assertEquals("line5 length, bufsz: "+bufsz, 5, out.getLength());
assertEquals("end of file, bufsz: " +bufsz, 0, in.readLine(out));
assertEquals("total bytes, bufsz: "+bufsz, c, STRLENBYTES);
}
}
@Test
public void testNewLines() throws Exception {
LineReader in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");
Text out = new Text();
in.readLine(out);
assertEquals("line1 length", 1, out.getLength());
in.readLine(out);
assertEquals("line2 length", 2, out.getLength());
in.readLine(out);
assertEquals("line3 length", 0, out.getLength());
in.readLine(out);
assertEquals("line4 length", 3, out.getLength());
in.readLine(out);
assertEquals("line5 length", 4, out.getLength());
in.readLine(out);
assertEquals("line5 length", 5, out.getLength());
assertEquals("end of file", 0, in.readLine(out));
}
public CombineFileLineRecordReader(CombineFileSplit split,
TaskAttemptContext context, Integer index) throws IOException {
this.path = split.getPath(index);
fs = this.path.getFileSystem(context.getConfiguration());
this.startOffset = split.getOffset(index);
this.end = startOffset + split.getLength(index);
boolean skipFirstLine = false;
//open the file
fileIn = fs.open(path);
if (startOffset != 0) {
skipFirstLine = true;
--startOffset;
fileIn.seek(startOffset);
}
reader = new LineReader(fileIn);
if (skipFirstLine) { // skip first line and re-establish "startOffset".
startOffset += reader.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end - startOffset));
}
this.pos = startOffset;
}
public void testUTF8() throws Exception {
LineReader in = null;
try {
in = makeStream("abcd\u20acbdcd\u20ac");
Text line = new Text();
in.readLine(line);
assertEquals("readLine changed utf8 characters",
"abcd\u20acbdcd\u20ac", line.toString());
in = makeStream("abc\u200axyz");
in.readLine(line);
assertEquals("split on fake newline", "abc\u200axyz", line.toString());
} finally {
if (in != null) {
in.close();
}
}
}
public void testNewLines() throws Exception {
LineReader in = null;
try {
in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");
Text out = new Text();
in.readLine(out);
assertEquals("line1 length", 1, out.getLength());
in.readLine(out);
assertEquals("line2 length", 2, out.getLength());
in.readLine(out);
assertEquals("line3 length", 0, out.getLength());
in.readLine(out);
assertEquals("line4 length", 3, out.getLength());
in.readLine(out);
assertEquals("line5 length", 4, out.getLength());
in.readLine(out);
assertEquals("line5 length", 5, out.getLength());
assertEquals("end of file", 0, in.readLine(out));
} finally {
if (in != null) {
in.close();
}
}
}
/**
* Test readLine for correct interpretation of maxLineLength
* (returned string should be clipped at maxLineLength, and the
* remaining bytes on the same line should be thrown out).
* Also check that returned value matches the string length.
* Varies buffer size to stress test.
*
* @throws Exception
*/
@Test (timeout=5000)
public void testMaxLineLength() throws Exception {
final String STR = "a\nbb\n\nccc\rdddd\r\neeeee";
final int STRLENBYTES = STR.getBytes().length;
Text out = new Text();
for (int bufsz = 1; bufsz < STRLENBYTES+1; ++bufsz) {
LineReader in = makeStream(STR, bufsz);
int c = 0;
c += in.readLine(out, 1);
assertEquals("line1 length, bufsz: "+bufsz, 1, out.getLength());
c += in.readLine(out, 1);
assertEquals("line2 length, bufsz: "+bufsz, 1, out.getLength());
c += in.readLine(out, 1);
assertEquals("line3 length, bufsz: "+bufsz, 0, out.getLength());
c += in.readLine(out, 3);
assertEquals("line4 length, bufsz: "+bufsz, 3, out.getLength());
c += in.readLine(out, 10);
assertEquals("line5 length, bufsz: "+bufsz, 4, out.getLength());
c += in.readLine(out, 8);
assertEquals("line5 length, bufsz: "+bufsz, 5, out.getLength());
assertEquals("end of file, bufsz: " +bufsz, 0, in.readLine(out));
assertEquals("total bytes, bufsz: "+bufsz, c, STRLENBYTES);
}
}
@Test
public void testNewLines() throws Exception {
LineReader in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");
Text out = new Text();
in.readLine(out);
assertEquals("line1 length", 1, out.getLength());
in.readLine(out);
assertEquals("line2 length", 2, out.getLength());
in.readLine(out);
assertEquals("line3 length", 0, out.getLength());
in.readLine(out);
assertEquals("line4 length", 3, out.getLength());
in.readLine(out);
assertEquals("line5 length", 4, out.getLength());
in.readLine(out);
assertEquals("line5 length", 5, out.getLength());
assertEquals("end of file", 0, in.readLine(out));
}
public CombineFileLineRecordReader(CombineFileSplit split,
TaskAttemptContext context, Integer index) throws IOException {
this.path = split.getPath(index);
fs = this.path.getFileSystem(context.getConfiguration());
this.startOffset = split.getOffset(index);
this.end = startOffset + split.getLength(index);
boolean skipFirstLine = false;
//open the file
fileIn = fs.open(path);
if (startOffset != 0) {
skipFirstLine = true;
--startOffset;
fileIn.seek(startOffset);
}
reader = new LineReader(fileIn);
if (skipFirstLine) { // skip first line and re-establish "startOffset".
startOffset += reader.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end - startOffset));
}
this.pos = startOffset;
}
/**
* Returns a split for each store files directory using the block location
* of each file as locality reference.
*/
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
List<InputSplit> splits = new ArrayList<>();
List<FileStatus> files = listStatus(job);
Text key = new Text();
for (FileStatus file: files) {
Path path = file.getPath();
FileSystem fs = path.getFileSystem(job.getConfiguration());
LineReader reader = new LineReader(fs.open(path));
long pos = 0;
int n;
try {
while ((n = reader.readLine(key)) > 0) {
String[] hosts = getStoreDirHosts(fs, path);
splits.add(new FileSplit(path, pos, n, hosts));
pos += n;
}
} finally {
reader.close();
}
}
return splits;
}
public void testNewLines() throws Exception {
LineReader in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");
Text out = new Text();
in.readLine(out);
assertEquals("line1 length", 1, out.getLength());
in.readLine(out);
assertEquals("line2 length", 2, out.getLength());
in.readLine(out);
assertEquals("line3 length", 0, out.getLength());
in.readLine(out);
assertEquals("line4 length", 3, out.getLength());
in.readLine(out);
assertEquals("line5 length", 4, out.getLength());
in.readLine(out);
assertEquals("line5 length", 5, out.getLength());
assertEquals("end of file", 0, in.readLine(out));
}
/**
* Test readLine for correct interpretation of maxLineLength
* (returned string should be clipped at maxLineLength, and the
* remaining bytes on the same line should be thrown out).
* Also check that returned value matches the string length.
* Varies buffer size to stress test.
*
* @throws Exception
*/
public void testMaxLineLength() throws Exception {
final String STR = "a\nbb\n\nccc\rdddd\r\neeeee";
final int STRLENBYTES = STR.getBytes().length;
Text out = new Text();
for (int bufsz = 1; bufsz < STRLENBYTES+1; ++bufsz) {
LineReader in = makeStream(STR, bufsz);
int c = 0;
c += in.readLine(out, 1);
assertEquals("line1 length, bufsz: "+bufsz, 1, out.getLength());
c += in.readLine(out, 1);
assertEquals("line2 length, bufsz: "+bufsz, 1, out.getLength());
c += in.readLine(out, 1);
assertEquals("line3 length, bufsz: "+bufsz, 0, out.getLength());
c += in.readLine(out, 3);
assertEquals("line4 length, bufsz: "+bufsz, 3, out.getLength());
c += in.readLine(out, 10);
assertEquals("line5 length, bufsz: "+bufsz, 4, out.getLength());
c += in.readLine(out, 8);
assertEquals("line5 length, bufsz: "+bufsz, 5, out.getLength());
assertEquals("end of file, bufsz: " +bufsz, 0, in.readLine(out));
assertEquals("total bytes, bufsz: "+bufsz, c, STRLENBYTES);
}
}
/**
* Constructor that reads the contents of the index file.
* @param in An input stream to the index file.
* @param max The size of the index file.
* @throws IOException
*/
public HarIndex(InputStream in, long max) throws IOException {
LineReader lineReader = new LineReader(in);
Text text = new Text();
long nread = 0;
while (nread < max) {
int n = lineReader.readLine(text);
nread += n;
String line = text.toString();
try {
parseLine(line);
} catch (UnsupportedEncodingException e) {
throw new IOException("UnsupportedEncodingException after reading " +
nread + "bytes");
}
}
}
public int getHarVersion() throws IOException {
FSDataInputStream masterIn = fs.open(masterIndex);
LineReader lmaster = new LineReader(masterIn, getConf());
Text line = new Text();
lmaster.readLine(line);
try {
masterIn.close();
} catch(IOException e){
//disregard it.
// its a read.
}
String versionLine = line.toString();
String[] arr = versionLine.split(" ");
int version = Integer.parseInt(arr[0]);
return version;
}
public void testNewLines() throws Exception {
LineReader in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");
Text out = new Text();
in.readLine(out);
assertEquals("line1 length", 1, out.getLength());
in.readLine(out);
assertEquals("line2 length", 2, out.getLength());
in.readLine(out);
assertEquals("line3 length", 0, out.getLength());
in.readLine(out);
assertEquals("line4 length", 3, out.getLength());
in.readLine(out);
assertEquals("line5 length", 4, out.getLength());
in.readLine(out);
assertEquals("line5 length", 5, out.getLength());
assertEquals("end of file", 0, in.readLine(out));
}
/**
* Test readLine for correct interpretation of maxLineLength
* (returned string should be clipped at maxLineLength, and the
* remaining bytes on the same line should be thrown out).
* Also check that returned value matches the string length.
* Varies buffer size to stress test.
*
* @throws Exception
*/
public void testMaxLineLength() throws Exception {
final String STR = "a\nbb\n\nccc\rdddd\r\neeeee";
final int STRLENBYTES = STR.getBytes().length;
Text out = new Text();
for (int bufsz = 1; bufsz < STRLENBYTES+1; ++bufsz) {
LineReader in = makeStream(STR, bufsz);
int c = 0;
c += in.readLine(out, 1);
assertEquals("line1 length, bufsz: "+bufsz, 1, out.getLength());
c += in.readLine(out, 1);
assertEquals("line2 length, bufsz: "+bufsz, 1, out.getLength());
c += in.readLine(out, 1);
assertEquals("line3 length, bufsz: "+bufsz, 0, out.getLength());
c += in.readLine(out, 3);
assertEquals("line4 length, bufsz: "+bufsz, 3, out.getLength());
c += in.readLine(out, 10);
assertEquals("line5 length, bufsz: "+bufsz, 4, out.getLength());
c += in.readLine(out, 8);
assertEquals("line5 length, bufsz: "+bufsz, 5, out.getLength());
assertEquals("end of file, bufsz: " +bufsz, 0, in.readLine(out));
assertEquals("total bytes, bufsz: "+bufsz, c, STRLENBYTES);
}
}
@Override
public void run() {
try {
in_ = connectInputStream();
LineReader lineReader = new LineReader((InputStream)in_, conf_);
Text line = new Text();
while (lineReader.readLine(line) > 0) {
buf_.append(line.toString());
buf_.append('\n');
line.clear();
}
lineReader.close();
in_.close();
} catch (IOException io) {
throw new RuntimeException(io);
}
}
public int getHarVersion() throws IOException {
FSDataInputStream masterIn = fs.open(masterIndex);
LineReader lmaster = new LineReader(masterIn, getConf());
Text line = new Text();
lmaster.readLine(line);
try {
masterIn.close();
} catch(IOException e){
//disregard it.
// its a read.
}
String versionLine = line.toString();
String[] arr = versionLine.split(" ");
int version = Integer.parseInt(arr[0]);
return version;
}
/**
* Parse the command line arguments into lines and display the result.
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
for(String arg: args) {
System.out.println("Working on " + arg);
LineReader reader = makeStream(unquote(arg));
Text line = new Text();
int size = reader.readLine(line);
while (size > 0) {
System.out.println("Got: " + line.toString());
size = reader.readLine(line);
}
reader.close();
}
}
@Test (timeout=5000)
public void testUTF8() throws Exception {
LineReader in = makeStream("abcd\u20acbdcd\u20ac");
Text line = new Text();
in.readLine(line);
assertEquals("readLine changed utf8 characters",
"abcd\u20acbdcd\u20ac", line.toString());
in = makeStream("abc\u200axyz");
in.readLine(line);
assertEquals("split on fake newline", "abc\u200axyz", line.toString());
}
/**
* Test readLine for various kinds of line termination sequneces.
* Varies buffer size to stress test. Also check that returned
* value matches the string length.
*
* @throws Exception
*/
@Test (timeout=5000)
public void testNewLines() throws Exception {
final String STR = "a\nbb\n\nccc\rdddd\r\r\r\n\r\neeeee";
final int STRLENBYTES = STR.getBytes().length;
Text out = new Text();
for (int bufsz = 1; bufsz < STRLENBYTES+1; ++bufsz) {
LineReader in = makeStream(STR, bufsz);
int c = 0;
c += in.readLine(out); //"a"\n
assertEquals("line1 length, bufsz:"+bufsz, 1, out.getLength());
c += in.readLine(out); //"bb"\n
assertEquals("line2 length, bufsz:"+bufsz, 2, out.getLength());
c += in.readLine(out); //""\n
assertEquals("line3 length, bufsz:"+bufsz, 0, out.getLength());
c += in.readLine(out); //"ccc"\r
assertEquals("line4 length, bufsz:"+bufsz, 3, out.getLength());
c += in.readLine(out); //dddd\r
assertEquals("line5 length, bufsz:"+bufsz, 4, out.getLength());
c += in.readLine(out); //""\r
assertEquals("line6 length, bufsz:"+bufsz, 0, out.getLength());
c += in.readLine(out); //""\r\n
assertEquals("line7 length, bufsz:"+bufsz, 0, out.getLength());
c += in.readLine(out); //""\r\n
assertEquals("line8 length, bufsz:"+bufsz, 0, out.getLength());
c += in.readLine(out); //"eeeee"EOF
assertEquals("line9 length, bufsz:"+bufsz, 5, out.getLength());
assertEquals("end of file, bufsz: "+bufsz, 0, in.readLine(out));
assertEquals("total bytes, bufsz: "+bufsz, c, STRLENBYTES);
}
}
/**
* Parse the command line arguments into lines and display the result.
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
for(String arg: args) {
System.out.println("Working on " + arg);
LineReader reader = makeStream(unquote(arg));
Text line = new Text();
int size = reader.readLine(line);
while (size > 0) {
System.out.println("Got: " + line.toString());
size = reader.readLine(line);
}
reader.close();
}
}
@Test
public void testUTF8() throws Exception {
LineReader in = makeStream("abcd\u20acbdcd\u20ac");
Text line = new Text();
in.readLine(line);
assertEquals("readLine changed utf8 characters",
"abcd\u20acbdcd\u20ac", line.toString());
in = makeStream("abc\u200axyz");
in.readLine(line);
assertEquals("split on fake newline", "abc\u200axyz", line.toString());
}
/**
* Can this parser parse the input?
*
* @param input
* @return Whether this parser can parse the input.
* @throws IOException
*
* We will deem a stream to be a good 0.20 job history stream if the
* first line is exactly "Meta VERSION=\"1\" ."
*/
public static boolean canParse(InputStream input) throws IOException {
try {
LineReader reader = new LineReader(input);
Text buffer = new Text();
return reader.readLine(buffer) != 0
&& buffer.toString().equals("Meta VERSION=\"1\" .");
} catch (EOFException e) {
return false;
}
}
private LineReader maybeUncompressedPath(Path p)
throws FileNotFoundException, IOException {
CompressionCodecFactory codecs = new CompressionCodecFactory(getConf());
inputCodec = codecs.getCodec(p);
FileSystem fs = p.getFileSystem(getConf());
FSDataInputStream fileIn = fs.open(p);
if (inputCodec == null) {
return new LineReader(fileIn, getConf());
} else {
inputDecompressor = CodecPool.getDecompressor(inputCodec);
return new LineReader(inputCodec.createInputStream(fileIn,
inputDecompressor), getConf());
}
}
@Override
public void initialize(PipeMapRed pipeMapRed) throws IOException {
super.initialize(pipeMapRed);
clientIn = pipeMapRed.getClientInput();
conf = pipeMapRed.getConfiguration();
numKeyFields = pipeMapRed.getNumOfKeyFields();
separator = pipeMapRed.getFieldSeparator();
lineReader = new LineReader((InputStream)clientIn, conf);
key = new Text();
value = new Text();
line = new Text();
}
@Override
public void initialize(PipeMapRed pipeMapRed) throws IOException {
super.initialize(pipeMapRed);
clientIn = pipeMapRed.getClientInput();
conf = pipeMapRed.getConfiguration();
lineReader = new LineReader((InputStream)clientIn, conf);
key = new Text();
line = new Text();
}
/**
* Parse the command line arguments into lines and display the result.
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
for(String arg: args) {
System.out.println("Working on " + arg);
LineReader reader = makeStream(unquote(arg));
Text line = new Text();
int size = reader.readLine(line);
while (size > 0) {
System.out.println("Got: " + line.toString());
size = reader.readLine(line);
}
reader.close();
}
}
@Test (timeout=5000)
public void testUTF8() throws Exception {
LineReader in = makeStream("abcd\u20acbdcd\u20ac");
Text line = new Text();
in.readLine(line);
assertEquals("readLine changed utf8 characters",
"abcd\u20acbdcd\u20ac", line.toString());
in = makeStream("abc\u200axyz");
in.readLine(line);
assertEquals("split on fake newline", "abc\u200axyz", line.toString());
}