下面列出了怎么用org.apache.hadoop.io.MapFile的API类实例代码及写法,或者点击链接到github查看源代码。
private void validateMapFileOutputContent(FileSystem fs, Path dir) throws IOException {
// map output is a directory with index and data files
Path expectedMapDir = new Path(dir, partFile);
assert (fs.getFileStatus(expectedMapDir).isDirectory());
FileStatus[] files = fs.listStatus(expectedMapDir);
int fileCount = 0;
boolean dataFileFound = false;
boolean indexFileFound = false;
for (FileStatus f : files) {
if (f.isFile()) {
++fileCount;
if (f.getPath().getName().equals(MapFile.INDEX_FILE_NAME)) {
indexFileFound = true;
} else if (f.getPath().getName().equals(MapFile.DATA_FILE_NAME)) {
dataFileFound = true;
}
}
}
assert (fileCount > 0);
assert (dataFileFound && indexFileFound);
}
@Override
protected List<FileStatus> listStatus(JobContext job
)throws IOException {
List<FileStatus> files = super.listStatus(job);
int len = files.size();
for(int i=0; i < len; ++i) {
FileStatus file = files.get(i);
if (file.isDirectory()) { // it's a MapFile
Path p = file.getPath();
FileSystem fs = p.getFileSystem(job.getConfiguration());
// use the data file
files.set(i, fs.getFileStatus(new Path(p, MapFile.DATA_FILE_NAME)));
}
}
return files;
}
private void validateMapFileOutputContent(
FileSystem fs, Path dir) throws IOException {
// map output is a directory with index and data files
Path expectedMapDir = new Path(dir, partFile);
assert(fs.getFileStatus(expectedMapDir).isDirectory());
FileStatus[] files = fs.listStatus(expectedMapDir);
int fileCount = 0;
boolean dataFileFound = false;
boolean indexFileFound = false;
for (FileStatus f : files) {
if (f.isFile()) {
++fileCount;
if (f.getPath().getName().equals(MapFile.INDEX_FILE_NAME)) {
indexFileFound = true;
}
else if (f.getPath().getName().equals(MapFile.DATA_FILE_NAME)) {
dataFileFound = true;
}
}
}
assert(fileCount > 0);
assert(dataFileFound && indexFileFound);
}
private void codecTestMapFile(Class<? extends CompressionCodec> clazz,
CompressionType type, int records) throws Exception {
FileSystem fs = FileSystem.get(conf);
LOG.info("Creating MapFiles with " + records +
" records using codec " + clazz.getSimpleName());
Path path = new Path(new Path(
System.getProperty("test.build.data", "/tmp")),
clazz.getSimpleName() + "-" + type + "-" + records);
LOG.info("Writing " + path);
createMapFile(conf, fs, path, clazz.newInstance(), type, records);
MapFile.Reader reader = new MapFile.Reader(path, conf);
Text key1 = new Text("002");
assertNotNull(reader.get(key1, new Text()));
Text key2 = new Text("004");
assertNotNull(reader.get(key2, new Text()));
}
@Override
protected List<FileStatus> listStatus(JobContext job
)throws IOException {
List<FileStatus> files = super.listStatus(job);
int len = files.size();
for(int i=0; i < len; ++i) {
FileStatus file = files.get(i);
if (file.isDirectory()) { // it's a MapFile
Path p = file.getPath();
FileSystem fs = p.getFileSystem(job.getConfiguration());
// use the data file
files.set(i, fs.getFileStatus(new Path(p, MapFile.DATA_FILE_NAME)));
}
}
return files;
}
private void validateMapFileOutputContent(
FileSystem fs, Path dir) throws IOException {
// map output is a directory with index and data files
Path expectedMapDir = new Path(dir, partFile);
assert(fs.getFileStatus(expectedMapDir).isDirectory());
FileStatus[] files = fs.listStatus(expectedMapDir);
int fileCount = 0;
boolean dataFileFound = false;
boolean indexFileFound = false;
for (FileStatus f : files) {
if (f.isFile()) {
++fileCount;
if (f.getPath().getName().equals(MapFile.INDEX_FILE_NAME)) {
indexFileFound = true;
}
else if (f.getPath().getName().equals(MapFile.DATA_FILE_NAME)) {
dataFileFound = true;
}
}
}
assert(fileCount > 0);
assert(dataFileFound && indexFileFound);
}
private void validateMapFileOutputContent(
FileSystem fs, Path dir) throws IOException {
// map output is a directory with index and data files
Path expectedMapDir = new Path(dir, partFile);
assert(fs.getFileStatus(expectedMapDir).isDirectory());
FileStatus[] files = fs.listStatus(expectedMapDir);
int fileCount = 0;
boolean dataFileFound = false;
boolean indexFileFound = false;
for (FileStatus f : files) {
if (f.isFile()) {
++fileCount;
if (f.getPath().getName().equals(MapFile.INDEX_FILE_NAME)) {
indexFileFound = true;
}
else if (f.getPath().getName().equals(MapFile.DATA_FILE_NAME)) {
dataFileFound = true;
}
}
}
assert(fileCount > 0);
assert(dataFileFound && indexFileFound);
}
/**
*
* @param outputDir Output directory for the map file(s)
* @param mapFileSplitSize Split size for the map file: if 0, use a single map file for all output. If > 0,
* multiple map files will be used: each will contain a maximum of mapFileSplitSize.
* This can be used to avoid having a single multi gigabyte map file, which may be
* undesirable in some cases (transfer across the network, for example)
* @param convertTextTo If null: Make no changes to Text writable objects. If non-null, Text writable instances
* will be converted to this type. This is useful, when would rather store numerical values
* even if the original record reader produces strings/text.
* @param indexInterval Index interval for the Map file. Defaults to 1, which is suitable for most cases
* @param filenamePattern The naming pattern for the map files. Used with String.format(pattern, int)
* @param hadoopConfiguration Hadoop configuration.
*/
public AbstractMapFileWriter(@NonNull File outputDir, int mapFileSplitSize, WritableType convertTextTo,
int indexInterval, String filenamePattern,
org.apache.hadoop.conf.Configuration hadoopConfiguration) {
if(indexInterval <= 0){
throw new UnsupportedOperationException("Index interval: must be >= 0 (got: " + indexInterval + ")");
}
this.outputDir = outputDir;
this.mapFileSplitSize = mapFileSplitSize;
if (convertTextTo == WritableType.Text) {
convertTextTo = null;
}
this.convertTextTo = convertTextTo;
this.indexInterval = indexInterval;
this.filenamePattern = filenamePattern;
this.hadoopConfiguration = hadoopConfiguration;
if(this.hadoopConfiguration.get(MAP_FILE_INDEX_INTERVAL_KEY) != null){
this.hadoopConfiguration.set(MAP_FILE_INDEX_INTERVAL_KEY, String.valueOf(indexInterval));
}
opts = new SequenceFile.Writer.Option[]{MapFile.Writer.keyClass(KEY_CLASS),
SequenceFile.Writer.valueClass(getValueClass())};
}
public MapFileReader(List<String> paths, IndexToKey indexToKey, Class<? extends Writable> recordClass)
throws IOException {
this.indexToKey = indexToKey;
this.recordClass = recordClass;
this.readers = new MapFile.Reader[paths.size()];
SequenceFile.Reader.Option[] opts = new SequenceFile.Reader.Option[0];
Configuration config = new Configuration();
for (int i = 0; i < paths.size(); i++) {
readers[i] = new MapFile.Reader(new Path(paths.get(i)), config, opts);
if (readers[i].getValueClass() != recordClass) {
throw new UnsupportedOperationException("MapFile record class: " + readers[i].getValueClass()
+ ", but got class " + recordClass + ", path = " + paths.get(i));
}
}
recordIndexesEachReader = indexToKey.initialize(readers, recordClass);
}
@Override
public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uuid, Map<String, String> offloadDriverMetadata) {
CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
String storagePath = getStoragePath(storageBasePath, offloadDriverMetadata.get(MANAGED_LEDGER_NAME));
String dataFilePath = getDataFilePath(storagePath, ledgerId, uuid);
scheduler.chooseThread(ledgerId).submit(() -> {
try {
MapFile.Reader reader = new MapFile.Reader(new Path(dataFilePath),
configuration);
promise.complete(FileStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId), reader, ledgerId));
} catch (Throwable t) {
log.error("Failed to open FileStoreBackedReadHandleImpl: ManagerLedgerName: {}, " +
"LegerId: {}, UUID: {}", offloadDriverMetadata.get(MANAGED_LEDGER_NAME), ledgerId, uuid, t);
promise.completeExceptionally(t);
}
});
return promise;
}
private FileStoreBackedReadHandleImpl(ExecutorService executor, MapFile.Reader reader, long ledgerId) throws IOException {
this.ledgerId = ledgerId;
this.executor = executor;
this.reader = reader;
LongWritable key = new LongWritable();
BytesWritable value = new BytesWritable();
try {
key.set(FileSystemManagedLedgerOffloader.METADATA_KEY_INDEX);
reader.get(key, value);
this.ledgerMetadata = parseLedgerMetadata(value.copyBytes());
} catch (IOException e) {
log.error("Fail to read LedgerMetadata for ledgerId {}",
ledgerId);
throw new IOException("Fail to read LedgerMetadata for ledgerId " + key.get());
}
}
private void createLinkDb(Configuration config, FileSystem fs, Path linkdb, TreeMap init) throws Exception {
LOG.fine("* creating linkdb: " + linkdb);
Path dir = new Path(linkdb, LinkDb.CURRENT_NAME);
MapFile.Writer writer = new MapFile.Writer(config, fs, new Path(dir, "part-00000").toString(), Text.class, Inlinks.class);
Iterator it = init.keySet().iterator();
while (it.hasNext()) {
String key = (String)it.next();
Inlinks inlinks = new Inlinks();
String[] vals = (String[])init.get(key);
for (int i = 0; i < vals.length; i++) {
Inlink in = new Inlink(vals[i], vals[i]);
inlinks.add(in);
}
writer.append(new Text(key), inlinks);
}
writer.close();
}
public static long readAndAppendCommit(Configuration conf, FileSystem fileSystem, MapFile.Writer writer, String fileName, long lastAstKey, long lastCommitKey) throws IOException {
long newLastKey = lastCommitKey;
SequenceFile.Reader r = new SequenceFile.Reader(fileSystem, new Path(fileName), conf);
LongWritable longKey = new LongWritable();
BytesWritable value = new BytesWritable();
try {
while (r.next(longKey, value)) {
newLastKey = longKey.get() + lastCommitKey;
Revision rev = Revision.parseFrom(CodedInputStream.newInstance(value.getBytes(), 0, value.getLength()));
Revision.Builder rb = Revision.newBuilder(rev);
for (ChangedFile.Builder cfb : rb.getFilesBuilderList()) {
long key = cfb.getKey();
if (key > 0)
cfb.setKey(lastAstKey + key);
}
writer.append(new LongWritable(newLastKey), new BytesWritable(rb.build().toByteArray()));
}
} catch (Exception e) {
System.err.println(fileName);
e.printStackTrace();
} finally {
r.close();
}
return newLastKey;
}
/** Open the output generated by this format. */
public static MapFile.Reader[] getReaders(FileSystem ignored, Path dir,
Configuration conf)
throws IOException {
FileSystem fs = dir.getFileSystem(conf);
Path[] names = FileUtil.stat2Paths(fs.listStatus(dir));
// sort names, so that hash partitioning works
Arrays.sort(names);
MapFile.Reader[] parts = new MapFile.Reader[names.length];
for (int i = 0; i < names.length; i++) {
parts[i] = new MapFile.Reader(fs, names[i].toString(), conf);
}
return parts;
}
@Override
public RecordWriter<WritableComparable<?>, Writable> getRecordWriter(TaskAttemptContext context) throws IOException
{
CompressionCodec codec = null;
CompressionType compressionType = CompressionType.NONE;
if (getCompressOutput(context))
{
// find the kind of compression to do
compressionType = SequenceFileOutputFormat.getOutputCompressionType(context);
// find the right codec
codec = getCompressionCodec(context);
}
Path file = getDefaultWorkFile(context, "");
MapFile.Writer out = createMapFileWriter(context, codec, compressionType, file);
return new Writer(out);
}
/** Open the output generated by this format. */
public static MapFile.Reader[] getReaders(FileSystem ignored, Path dir,
Configuration conf)
throws IOException {
FileSystem fs = dir.getFileSystem(conf);
Path[] names = FileUtil.stat2Paths(fs.listStatus(dir));
// sort names, so that hash partitioning works
Arrays.sort(names);
MapFile.Reader[] parts = new MapFile.Reader[names.length];
for (int i = 0; i < names.length; i++) {
parts[i] = new MapFile.Reader(fs, names[i].toString(), conf);
}
return parts;
}
@Override
protected List<LocatedFileStatus> listLocatedStatus(JobContext job
)throws IOException {
List<LocatedFileStatus> files = super.listLocatedStatus(job);
int len = files.size();
for(int i=0; i < len; ++i) {
FileStatus file = files.get(i);
if (file.isDir()) { // it's a MapFile
Path p = file.getPath();
FileSystem fs = p.getFileSystem(job.getConfiguration());
// use the data file
files.set(i, fs.listLocatedStatus(
new Path(p, MapFile.DATA_FILE_NAME)).next());
}
}
return files;
}
/**
*
* @param outputDir Output directory for the map file(s)
* @param mapFileSplitSize Split size for the map file: if 0, use a single map file for all output. If > 0,
* multiple map files will be used: each will contain a maximum of mapFileSplitSize.
* This can be used to avoid having a single multi gigabyte map file, which may be
* undesirable in some cases (transfer across the network, for example)
* @param convertTextTo If null: Make no changes to Text writable objects. If non-null, Text writable instances
* will be converted to this type. This is useful, when would rather store numerical values
* even if the original record reader produces strings/text.
* @param indexInterval Index interval for the Map file. Defaults to 1, which is suitable for most cases
* @param filenamePattern The naming pattern for the map files. Used with String.format(pattern, int)
* @param hadoopConfiguration Hadoop configuration.
*/
public AbstractMapFileWriter(@NonNull File outputDir, int mapFileSplitSize, WritableType convertTextTo,
int indexInterval, String filenamePattern,
org.apache.hadoop.conf.Configuration hadoopConfiguration) {
if(indexInterval <= 0){
throw new UnsupportedOperationException("Index interval: must be >= 0 (got: " + indexInterval + ")");
}
this.outputDir = outputDir;
this.mapFileSplitSize = mapFileSplitSize;
if (convertTextTo == WritableType.Text) {
convertTextTo = null;
}
this.convertTextTo = convertTextTo;
this.indexInterval = indexInterval;
this.filenamePattern = filenamePattern;
this.hadoopConfiguration = hadoopConfiguration;
if(this.hadoopConfiguration.get(MAP_FILE_INDEX_INTERVAL_KEY) != null){
this.hadoopConfiguration.set(MAP_FILE_INDEX_INTERVAL_KEY, String.valueOf(indexInterval));
}
opts = new SequenceFile.Writer.Option[]{MapFile.Writer.keyClass(KEY_CLASS),
SequenceFile.Writer.valueClass(getValueClass())};
}
public MapFileReader(List<String> paths, IndexToKey indexToKey, Class<? extends Writable> recordClass)
throws IOException {
this.indexToKey = indexToKey;
this.recordClass = recordClass;
this.readers = new MapFile.Reader[paths.size()];
SequenceFile.Reader.Option[] opts = new SequenceFile.Reader.Option[0];
Configuration config = new Configuration();
for (int i = 0; i < paths.size(); i++) {
readers[i] = new MapFile.Reader(new Path(paths.get(i)), config, opts);
if (readers[i].getValueClass() != recordClass) {
throw new UnsupportedOperationException("MapFile record class: " + readers[i].getValueClass()
+ ", but got class " + recordClass + ", path = " + paths.get(i));
}
}
recordIndexesEachReader = indexToKey.initialize(readers, recordClass);
}
@Override
protected FileStatus[] listStatus(JobConf job) throws IOException {
FileStatus[] files = super.listStatus(job);
for (int i = 0; i < files.length; i++) {
FileStatus file = files[i];
if (file.isDirectory()) { // it's a MapFile
Path dataFile = new Path(file.getPath(), MapFile.DATA_FILE_NAME);
FileSystem fs = file.getPath().getFileSystem(job);
// use the data file
files[i] = fs.getFileStatus(dataFile);
}
}
return files;
}
public RecordWriter<WritableComparable, Writable> getRecordWriter(FileSystem ignored, JobConf job,
String name, Progressable progress)
throws IOException {
// get the path of the temporary output file
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
CompressionCodec codec = null;
CompressionType compressionType = CompressionType.NONE;
if (getCompressOutput(job)) {
// find the kind of compression to do
compressionType = SequenceFileOutputFormat.getOutputCompressionType(job);
// find the right codec
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, job);
}
// ignore the progress parameter, since MapFile is local
final MapFile.Writer out =
new MapFile.Writer(job, fs, file.toString(),
job.getOutputKeyClass().asSubclass(WritableComparable.class),
job.getOutputValueClass().asSubclass(Writable.class),
compressionType, codec,
progress);
return new RecordWriter<WritableComparable, Writable>() {
public void write(WritableComparable key, Writable value)
throws IOException {
out.append(key, value);
}
public void close(Reporter reporter) throws IOException { out.close();}
};
}
/** Open the output generated by this format. */
public static MapFile.Reader[] getReaders(FileSystem ignored, Path dir,
Configuration conf)
throws IOException {
return org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat.
getReaders(dir, conf);
}
/** Get an entry from output generated by this class. */
public static <K extends WritableComparable, V extends Writable>
Writable getEntry(MapFile.Reader[] readers,
Partitioner<K, V> partitioner,
K key,
V value) throws IOException {
int part = partitioner.getPartition(key, value, readers.length);
return readers[part].get(key, value);
}
/** Open the output generated by this format. */
public static MapFile.Reader[] getReaders(Path dir,
Configuration conf) throws IOException {
FileSystem fs = dir.getFileSystem(conf);
Path[] names = FileUtil.stat2Paths(fs.listStatus(dir));
// sort names, so that hash partitioning works
Arrays.sort(names);
MapFile.Reader[] parts = new MapFile.Reader[names.length];
for (int i = 0; i < names.length; i++) {
parts[i] = new MapFile.Reader(fs, names[i].toString(), conf);
}
return parts;
}
/** Get an entry from output generated by this class. */
public static <K extends WritableComparable<?>, V extends Writable>
Writable getEntry(MapFile.Reader[] readers,
Partitioner<K, V> partitioner, K key, V value) throws IOException {
int part = partitioner.getPartition(key, value, readers.length);
return readers[part].get(key, value);
}
/**
* Creates synthetic crawldb
*
* @param fs
* filesystem where db will be created
* @param crawldb
* path were db will be created
* @param init
* urls to be inserted, objects are of type URLCrawlDatum
* @throws Exception
*/
public static void createCrawlDb(Configuration conf, FileSystem fs, Path crawldb, List<URLCrawlDatum> init)
throws Exception {
LOG.trace("* creating crawldb: " + crawldb);
Path dir = new Path(crawldb, CrawlDb.CURRENT_NAME);
MapFile.Writer writer = new MapFile.Writer(conf, fs, new Path(dir, "part-00000")
.toString(), Text.class, CrawlDatum.class);
Iterator<URLCrawlDatum> it = init.iterator();
while (it.hasNext()) {
URLCrawlDatum row = it.next();
LOG.info("adding:" + row.url.toString());
writer.append(new Text(row.url), row.datum);
}
writer.close();
}
private static void createMapFile(Configuration conf, FileSystem fs, Path path,
CompressionCodec codec, CompressionType type, int records) throws IOException {
MapFile.Writer writer =
new MapFile.Writer(conf, path,
MapFile.Writer.keyClass(Text.class),
MapFile.Writer.valueClass(Text.class),
MapFile.Writer.compression(type, codec));
Text key = new Text();
for (int j = 0; j < records; j++) {
key.set(String.format("%03d", j));
writer.append(key, key);
}
writer.close();
}
@Override
protected FileStatus[] listStatus(JobConf job) throws IOException {
FileStatus[] files = super.listStatus(job);
for (int i = 0; i < files.length; i++) {
FileStatus file = files[i];
if (file.isDirectory()) { // it's a MapFile
Path dataFile = new Path(file.getPath(), MapFile.DATA_FILE_NAME);
FileSystem fs = file.getPath().getFileSystem(job);
// use the data file
files[i] = fs.getFileStatus(dataFile);
}
}
return files;
}
public RecordWriter<WritableComparable, Writable> getRecordWriter(FileSystem ignored, JobConf job,
String name, Progressable progress)
throws IOException {
// get the path of the temporary output file
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
CompressionCodec codec = null;
CompressionType compressionType = CompressionType.NONE;
if (getCompressOutput(job)) {
// find the kind of compression to do
compressionType = SequenceFileOutputFormat.getOutputCompressionType(job);
// find the right codec
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, job);
}
// ignore the progress parameter, since MapFile is local
final MapFile.Writer out =
new MapFile.Writer(job, fs, file.toString(),
job.getOutputKeyClass().asSubclass(WritableComparable.class),
job.getOutputValueClass().asSubclass(Writable.class),
compressionType, codec,
progress);
return new RecordWriter<WritableComparable, Writable>() {
public void write(WritableComparable key, Writable value)
throws IOException {
out.append(key, value);
}
public void close(Reporter reporter) throws IOException { out.close();}
};
}
/** Open the output generated by this format. */
public static MapFile.Reader[] getReaders(FileSystem ignored, Path dir,
Configuration conf)
throws IOException {
return org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat.
getReaders(dir, conf);
}