下面列出了org.apache.hadoop.io.MapFile#Reader ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void codecTestMapFile(Class<? extends CompressionCodec> clazz,
CompressionType type, int records) throws Exception {
FileSystem fs = FileSystem.get(conf);
LOG.info("Creating MapFiles with " + records +
" records using codec " + clazz.getSimpleName());
Path path = new Path(new Path(
System.getProperty("test.build.data", "/tmp")),
clazz.getSimpleName() + "-" + type + "-" + records);
LOG.info("Writing " + path);
createMapFile(conf, fs, path, clazz.newInstance(), type, records);
MapFile.Reader reader = new MapFile.Reader(path, conf);
Text key1 = new Text("002");
assertNotNull(reader.get(key1, new Text()));
Text key2 = new Text("004");
assertNotNull(reader.get(key2, new Text()));
}
public MapFileReader(List<String> paths, IndexToKey indexToKey, Class<? extends Writable> recordClass)
throws IOException {
this.indexToKey = indexToKey;
this.recordClass = recordClass;
this.readers = new MapFile.Reader[paths.size()];
SequenceFile.Reader.Option[] opts = new SequenceFile.Reader.Option[0];
Configuration config = new Configuration();
for (int i = 0; i < paths.size(); i++) {
readers[i] = new MapFile.Reader(new Path(paths.get(i)), config, opts);
if (readers[i].getValueClass() != recordClass) {
throw new UnsupportedOperationException("MapFile record class: " + readers[i].getValueClass()
+ ", but got class " + recordClass + ", path = " + paths.get(i));
}
}
recordIndexesEachReader = indexToKey.initialize(readers, recordClass);
}
/** Open the output generated by this format. */
public static MapFile.Reader[] getReaders(FileSystem ignored, Path dir,
Configuration conf)
throws IOException {
FileSystem fs = dir.getFileSystem(conf);
Path[] names = FileUtil.stat2Paths(fs.listStatus(dir));
// sort names, so that hash partitioning works
Arrays.sort(names);
MapFile.Reader[] parts = new MapFile.Reader[names.length];
for (int i = 0; i < names.length; i++) {
parts[i] = new MapFile.Reader(fs, names[i].toString(), conf);
}
return parts;
}
private static void closeMap(MapFile.Reader map) {
if (map != null)
try {
map.close();
} catch (final IOException e) {
e.printStackTrace();
}
map = null;
}
/** Get an entry from output generated by this class. */
public static <K extends WritableComparable, V extends Writable>
Writable getEntry(MapFile.Reader[] readers,
Partitioner<K, V> partitioner,
K key,
V value) throws IOException {
int part = partitioner.getPartition(key, value, readers.length);
return readers[part].get(key, value);
}
public static void main(String[] args)
throws Exception {
if (args == null || args.length < 2) {
System.out.println("LinkDumper$Reader usage: <webgraphdb> <url>");
return;
}
// open the readers for the linkdump directory
Configuration conf = NutchConfiguration.create();
FileSystem fs = FileSystem.get(conf);
Path webGraphDb = new Path(args[0]);
String url = args[1];
MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, new Path(
webGraphDb, DUMP_DIR), conf);
// get the link nodes for the url
Text key = new Text(url);
LinkNodes nodes = new LinkNodes();
MapFileOutputFormat.getEntry(readers,
new HashPartitioner<Text, LinkNodes>(), key, nodes);
// print out the link nodes
LinkNode[] linkNodesAr = nodes.getLinks();
System.out.println(url + ":");
for (LinkNode node : linkNodesAr) {
System.out.println(" " + node.getUrl() + " - "
+ node.getNode().toString());
}
// close the readers
FSUtils.closeReaders(readers);
}
/**
* Closes a group of MapFile readers.
*
* @param readers The MapFile readers to close.
* @throws IOException If an error occurs while closing a reader.
*/
public static void closeReaders(MapFile.Reader[] readers)
throws IOException {
// loop through the readers closing one by one
if (readers != null) {
for (int i = 0; i < readers.length; i++) {
MapFile.Reader reader = readers[i];
if (reader != null) {
reader.close();
}
}
}
}
/** Get an entry from output generated by this class. */
public static <K extends WritableComparable, V extends Writable>
Writable getEntry(MapFile.Reader[] readers,
Partitioner<K, V> partitioner,
K key,
V value) throws IOException {
int part = partitioner.getPartition(key, value, readers.length);
return readers[part].get(key, value);
}
/** Get an entry from output generated by this class. */
public static <K extends WritableComparable<?>, V extends Writable>
Writable getEntry(MapFile.Reader[] readers,
Partitioner<K, V> partitioner, K key, V value) throws IOException {
int part = partitioner.getPartition(key, value, readers.length);
return readers[part].get(key, value);
}
/**
* Checks the merged segment and removes the stuff again.
*
* @param the test directory
* @param the merged segment
* @return the final status
*/
protected byte checkMergedSegment(Path testDir, Path mergedSegment) throws Exception {
// Get a MapFile reader for the <Text,CrawlDatum> pairs
MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, new Path(mergedSegment, CrawlDatum.FETCH_DIR_NAME), conf);
Text key = new Text();
CrawlDatum value = new CrawlDatum();
byte finalStatus = 0x0;
for (MapFile.Reader reader : readers) {
while (reader.next(key, value)) {
LOG.info("Reading status for: " + key.toString() + " > " + CrawlDatum.getStatusName(value.getStatus()));
// Only consider fetch status
if (CrawlDatum.hasFetchStatus(value) && key.toString().equals("http://nutch.apache.org/")) {
finalStatus = value.getStatus();
}
}
// Close the reader again
reader.close();
}
// Remove the test directory again
fs.delete(testDir, true);
LOG.info("Final fetch status for: http://nutch.apache.org/ > " + CrawlDatum.getStatusName(finalStatus));
// Return the final status
return finalStatus;
}
/** Open the output generated by this format. */
private MapFile.Reader[] getReaders(String subDir) throws IOException {
Path dir = new Path(segmentDir, subDir);
FileSystem fs = dir.getFileSystem(conf);
Path[] names = FileUtil.stat2Paths(fs.listStatus(dir, SegmentPathFilter.INSTANCE));
// sort names, so that hash partitioning works
Arrays.sort(names);
MapFile.Reader[] parts = new MapFile.Reader[names.length];
for (int i = 0; i < names.length; i++) {
parts[i] = new MapFile.Reader(fs, names[i].toString(), conf);
}
return parts;
}
/** Get an entry from output generated by this class. */
public static <K extends WritableComparable, V extends Writable>
Writable getEntry(MapFile.Reader[] readers,
Partitioner<K, V> partitioner,
K key,
V value) throws IOException {
int part = partitioner.getPartition(key, value, readers.length);
return readers[part].get(key, value);
}
public static void main(String[] args)
throws Exception {
if (args == null || args.length < 2) {
System.out.println("LinkDumper$Reader usage: <webgraphdb> <url>");
return;
}
// open the readers for the linkdump directory
Configuration conf = NutchConfiguration.create();
FileSystem fs = FileSystem.get(conf);
Path webGraphDb = new Path(args[0]);
String url = args[1];
MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, new Path(
webGraphDb, DUMP_DIR), conf);
// get the link nodes for the url
Text key = new Text(url);
LinkNodes nodes = new LinkNodes();
MapFileOutputFormat.getEntry(readers,
new HashPartitioner<Text, LinkNodes>(), key, nodes);
// print out the link nodes
LinkNode[] linkNodesAr = nodes.getLinks();
System.out.println(url + ":");
for (LinkNode node : linkNodesAr) {
System.out.println(" " + node.getUrl() + " - "
+ node.getNode().toString());
}
// close the readers
FSUtils.closeReaders(readers);
}
public void generateSplits(Path parent, Configuration conf) throws IOException
{
List<FileSplitInfo> list = new ArrayList<>();
// get a Hadoop file system handle
FileSystem fs = getFileSystem(parent);
// get the list of paths of the subdirectories of the parent
Path[] paths = FileUtil.stat2Paths(fs.listStatus(parent));
Arrays.sort(paths);
int partition = 0;
// look inside each subdirectory for a data dir and keep track
for (Path p : paths)
{
Path mapfile = null;
FileStatus[] dirFiles = fs.listStatus(p);
for (FileStatus dirFile : dirFiles)
{
if (dirFile.getPath().getName().equals("data"))
{
mapfile = dirFile.getPath().getParent();
break;
}
}
if (mapfile != null)
{
RasterWritable val = new RasterWritable();
MapFile.Reader reader = createMapFileReader(conf, mapfile);
TileIdWritable firstKey = (TileIdWritable) reader.getClosest(new TileIdWritable(0), val);
TileIdWritable lastKey = (TileIdWritable) reader.getClosest(new TileIdWritable(Long.MAX_VALUE), val, true);
if (firstKey != null && lastKey != null)
{
list.add(new FileSplitInfo(firstKey.get(), lastKey.get(), mapfile.getName(), partition));
}
partition++;
}
}
splits = list.toArray(new FileSplitInfo[list.size()]);
}
@Test
@Category(UnitTest.class)
public void testGenerateSplitsFromPath() throws Exception
{
// Setup a mock directory structure
Path rootPath = new Path(FileSplitTest.class.getName() + "-testRootPath");
Path path1 = new Path(rootPath, FileSplitTest.class.getName() + "-testPath1");
Path path2 = new Path(rootPath, FileSplitTest.class.getName() + "-testPath2");
Path path3 = new Path(rootPath, FileSplitTest.class.getName() + "-testPath3");
Path path1_1 = new Path(path1, "notDataDir");
Path path1_2 = new Path(path1, "data");
Path path2_1 = new Path(path2, "data");
Path path3_1 = new Path(path3, "notDataDir");
// Setup the FileSystem
FileSystem mockFS = new FileSystemBuilder()
.fileStatus(rootPath, new FileStatusBuilder().path(path1).build())
.fileStatus(rootPath, new FileStatusBuilder().path(path2).build())
.fileStatus(rootPath, new FileStatusBuilder().path(path3).build())
.fileStatus(path1, new FileStatusBuilder().path(path1_1).build())
.fileStatus(path1, new FileStatusBuilder().path(path1_2).build())
.fileStatus(path2, new FileStatusBuilder().path(path2_1).build())
.fileStatus(path3, new FileStatusBuilder().path(path3_1).build())
.build();
// setup map file readers for each of the data directories
RasterWritable mockValue = new RasterWritable();
TileIdWritable[] path1Keys = {new TileIdWritable(2L), new TileIdWritable(4L), new TileIdWritable(6L)};
RasterWritable[] path1Values = {mockValue, mockValue, mockValue};
TileIdWritable[] path2Keys = {new TileIdWritable(5L), new TileIdWritable(6L), new TileIdWritable(7L)};
RasterWritable[] path2Values = {mockValue, mockValue, mockValue};
MapFile.Reader mockMapFileReaderPath1 = new MapFileReaderBuilder()
.keyClass(TileIdWritable.class)
.valueClass(RasterWritable.class)
.keys(path1Keys)
.values(path1Values)
.build();
MapFile.Reader mockMapFileReaderPath2 = new MapFileReaderBuilder()
.keyClass(TileIdWritable.class)
.valueClass(RasterWritable.class)
.keys(path2Keys)
.values(path2Values)
.build();
// Setup a Configuration
Configuration mockConfiguration = new ConfigurationBuilder().build();
FileSplit spySubject = new FileSplit();
subject = spy(spySubject);
doReturn(mockFS).when(subject).getFileSystem(rootPath);
doReturn(mockMapFileReaderPath1).when(subject).createMapFileReader(mockConfiguration, path1);
doReturn(mockMapFileReaderPath2).when(subject).createMapFileReader(mockConfiguration, path2);
subject.generateSplits(rootPath, mockConfiguration);
// Verify we got splits for path 1 and 2
SplitInfo[] splits = subject.getSplits();
Assert.assertEquals(2, splits.length);
verifySplit(path1, path1Keys, splits, 0);
verifySplit(path2, path2Keys, splits, 1);
}
@Override
public void close() throws IOException {
for (MapFile.Reader r : readers) {
r.close();
}
}
@Override
public List<Pair<Long, Long>> initialize(MapFile.Reader[] readers, Class<? extends Writable> valueClass)
throws IOException {
List<Pair<Long, Long>> l = new ArrayList<>(readers.length);
for (MapFile.Reader r : readers) {
//Get the first and last keys:
long first = -1;
long last = -1;
//First key: no method for this for some inexplicable reason :/
LongWritable k = new LongWritable();
Writable v = ReflectionUtils.newInstance(valueClass, null);
boolean hasNext = r.next(k, v);
if(!hasNext){
//This map file is empty - no data
l.add(new Pair<>(-1L, -1L));
continue;
}
first = k.get();
//Last key: easy
r.reset();
r.finalKey(k);
last = k.get();
l.add(new Pair<>(first, last));
}
//Check that things are actually contiguous:
List<Pair<Long, Long>> sorted = new ArrayList<>(l.size());
for(Pair<Long,Long> p : l){
if(p.getLeft() >= 0){
sorted.add(p);
}
}
Collections.sort(sorted, new Comparator<Pair<Long, Long>>() {
@Override
public int compare(Pair<Long, Long> o1, Pair<Long, Long> o2) {
return Long.compare(o1.getFirst(), o2.getFirst());
}
});
if (sorted.size() == 0){
throw new IllegalStateException("Map file is empty - no data available");
}
if (sorted.get(0).getFirst() != 0L) {
throw new UnsupportedOperationException("Minimum key value is not 0: got " + sorted.get(0).getFirst());
}
for (int i = 0; i < sorted.size() - 1; i++) {
long currLast = sorted.get(i).getSecond();
long nextFirst = sorted.get(i + 1).getFirst();
if(nextFirst == -1){
//Skip empty map file
continue;
}
if (currLast + 1 != nextFirst) {
throw new IllegalStateException(
"Keys are not contiguous between readers: first/last indices (inclusive) " + "are "
+ sorted
+ ".\n LongIndexKey assumes unique and contiguous LongWritable keys");
}
}
readerIndices = l;
return readerIndices;
}
private Writable getEntry(MapFile.Reader[] readers, Text url,
Writable entry) throws IOException {
return MapFileOutputFormat.getEntry(readers, PARTITIONER, url, entry);
}
protected MapFile.Reader createMapFileReader(Configuration conf, Path mapfile) throws IOException
{
return new MapFile.Reader(mapfile, conf);
}
private void closeReaders(MapFile.Reader[] readers) throws IOException {
for (int i = 0; i < readers.length; i++) {
readers[i].close();
}
}