下面列出了org.apache.hadoop.io.MapFile#Writer ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void createLinkDb(Configuration config, FileSystem fs, Path linkdb, TreeMap init) throws Exception {
LOG.fine("* creating linkdb: " + linkdb);
Path dir = new Path(linkdb, LinkDb.CURRENT_NAME);
MapFile.Writer writer = new MapFile.Writer(config, fs, new Path(dir, "part-00000").toString(), Text.class, Inlinks.class);
Iterator it = init.keySet().iterator();
while (it.hasNext()) {
String key = (String)it.next();
Inlinks inlinks = new Inlinks();
String[] vals = (String[])init.get(key);
for (int i = 0; i < vals.length; i++) {
Inlink in = new Inlink(vals[i], vals[i]);
inlinks.add(in);
}
writer.append(new Text(key), inlinks);
}
writer.close();
}
public static long readAndAppendAst(Configuration conf, FileSystem fileSystem, MapFile.Writer writer, String fileName, long lastKey) throws IOException {
long newLastKey = lastKey;
SequenceFile.Reader r = new SequenceFile.Reader(fileSystem, new Path(fileName), conf);
LongWritable longKey = new LongWritable();
BytesWritable value = new BytesWritable();
try {
while (r.next(longKey, value)) {
newLastKey = longKey.get() + lastKey;
writer.append(new LongWritable(newLastKey), value);
}
} catch (Exception e) {
System.err.println(fileName);
e.printStackTrace();
} finally {
r.close();
}
return newLastKey;
}
@Override
public RecordWriter<WritableComparable<?>, Writable> getRecordWriter(TaskAttemptContext context) throws IOException
{
CompressionCodec codec = null;
CompressionType compressionType = CompressionType.NONE;
if (getCompressOutput(context))
{
// find the kind of compression to do
compressionType = SequenceFileOutputFormat.getOutputCompressionType(context);
// find the right codec
codec = getCompressionCodec(context);
}
Path file = getDefaultWorkFile(context, "");
MapFile.Writer out = createMapFileWriter(context, codec, compressionType, file);
return new Writer(out);
}
private void createCrawlDb(Configuration config, FileSystem fs, Path crawldb, TreeSet<String> init, CrawlDatum cd) throws Exception {
LOG.fine("* creating crawldb: " + crawldb);
Path dir = new Path(crawldb, CrawlDb.CURRENT_NAME);
MapFile.Writer writer = new MapFile.Writer(config, fs, new Path(dir, "part-00000").toString(), Text.class, CrawlDatum.class);
Iterator<String> it = init.iterator();
while (it.hasNext()) {
String key = it.next();
writer.append(new Text(key), cd);
}
writer.close();
}
private static void createMapFile(Configuration conf, FileSystem fs, Path path,
CompressionCodec codec, CompressionType type, int records) throws IOException {
MapFile.Writer writer =
new MapFile.Writer(conf, path,
MapFile.Writer.keyClass(Text.class),
MapFile.Writer.valueClass(Text.class),
MapFile.Writer.compression(type, codec));
Text key = new Text();
for (int j = 0; j < records; j++) {
key.set(String.format("%03d", j));
writer.append(key, key);
}
writer.close();
}
public RecordWriter<WritableComparable, Writable> getRecordWriter(FileSystem ignored, JobConf job,
String name, Progressable progress)
throws IOException {
// get the path of the temporary output file
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
CompressionCodec codec = null;
CompressionType compressionType = CompressionType.NONE;
if (getCompressOutput(job)) {
// find the kind of compression to do
compressionType = SequenceFileOutputFormat.getOutputCompressionType(job);
// find the right codec
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, job);
}
// ignore the progress parameter, since MapFile is local
final MapFile.Writer out =
new MapFile.Writer(job, fs, file.toString(),
job.getOutputKeyClass().asSubclass(WritableComparable.class),
job.getOutputValueClass().asSubclass(Writable.class),
compressionType, codec,
progress);
return new RecordWriter<WritableComparable, Writable>() {
public void write(WritableComparable key, Writable value)
throws IOException {
out.append(key, value);
}
public void close(Reporter reporter) throws IOException { out.close();}
};
}
public void close() {
try {
for (MapFile.Writer w : writers) {
w.close();
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
isClosed.set(true);
}
}
public RecordWriter<WritableComparable, Writable> getRecordWriter(FileSystem ignored, JobConf job,
String name, Progressable progress)
throws IOException {
// get the path of the temporary output file
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
CompressionCodec codec = null;
CompressionType compressionType = CompressionType.NONE;
if (getCompressOutput(job)) {
// find the kind of compression to do
compressionType = SequenceFileOutputFormat.getOutputCompressionType(job);
// find the right codec
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, job);
}
// ignore the progress parameter, since MapFile is local
final MapFile.Writer out =
new MapFile.Writer(job, fs, file.toString(),
job.getOutputKeyClass().asSubclass(WritableComparable.class),
job.getOutputValueClass().asSubclass(Writable.class),
compressionType, codec,
progress);
return new RecordWriter<WritableComparable, Writable>() {
public void write(WritableComparable key, Writable value)
throws IOException {
out.append(key, value);
}
public void close(Reporter reporter) throws IOException { out.close();}
};
}
private void createCrawlDb(Configuration config, FileSystem fs, Path crawldb, TreeSet init, CrawlDatum cd) throws Exception {
LOG.fine("* creating crawldb: " + crawldb);
Path dir = new Path(crawldb, CrawlDb.CURRENT_NAME);
MapFile.Writer writer = new MapFile.Writer(config, fs, new Path(dir, "part-00000").toString(), Text.class, CrawlDatum.class);
Iterator it = init.iterator();
while (it.hasNext()) {
String key = (String)it.next();
writer.append(new Text(key), cd);
}
writer.close();
}
/**
* Creates synthetic crawldb
*
* @param fs
* filesystem where db will be created
* @param crawldb
* path were db will be created
* @param init
* urls to be inserted, objects are of type URLCrawlDatum
* @throws Exception
*/
public static void createCrawlDb(Configuration conf, FileSystem fs, Path crawldb, List<URLCrawlDatum> init)
throws Exception {
LOG.trace("* creating crawldb: " + crawldb);
Path dir = new Path(crawldb, CrawlDb.CURRENT_NAME);
MapFile.Writer writer = new MapFile.Writer(conf, fs, new Path(dir, "part-00000")
.toString(), Text.class, CrawlDatum.class);
Iterator<URLCrawlDatum> it = init.iterator();
while (it.hasNext()) {
URLCrawlDatum row = it.next();
LOG.info("adding:" + row.url.toString());
writer.append(new Text(row.url), row.datum);
}
writer.close();
}
/**
* Creates synthetic crawldb
*
* @param fs
* filesystem where db will be created
* @param crawldb
* path were db will be created
* @param init
* urls to be inserted, objects are of type URLCrawlDatum
* @throws Exception
*/
public static void createCrawlDb(Configuration conf, FileSystem fs, Path crawldb, List<URLCrawlDatum> init)
throws Exception {
LOG.trace("* creating crawldb: " + crawldb);
Path dir = new Path(crawldb, CrawlDb.CURRENT_NAME);
MapFile.Writer writer = new MapFile.Writer(conf, fs, new Path(dir, "part-00000")
.toString(), Text.class, CrawlDatum.class);
Iterator<URLCrawlDatum> it = init.iterator();
while (it.hasNext()) {
URLCrawlDatum row = it.next();
LOG.info("adding:" + row.url.toString());
writer.append(new Text(row.url), row.datum);
}
writer.close();
}
public synchronized void startWrite() throws IOException {
assertNotWrite("Tried to restart write");
cleanupRead();
m_writer = new MapFile.Writer(m_config, m_fs, m_dirStr, Text.class, BytesWritable.class);
m_writer.setIndexInterval(1);
m_fs.mkdirs(m_metaPath);
m_mode = Mode.WRITE;
}
protected MapFile.Writer createMapFileWriter(TaskAttemptContext context, CompressionCodec codec,
CompressionType compressionType, Path file) throws IOException
{
return new MapFile.Writer(context.getConfiguration(), file,
MapFile.Writer.keyClass(context.getOutputKeyClass().asSubclass(WritableComparable.class)),
MapFile.Writer.valueClass(context.getOutputValueClass().asSubclass(Writable.class)),
MapFile.Writer.compression(compressionType, codec),
MapFile.Writer.progressable(context));
}
public RecordWriter<WritableComparable, Writable> getRecordWriter(FileSystem ignored, JobConf job,
String name, Progressable progress)
throws IOException {
// get the path of the temporary output file
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
CompressionCodec codec = null;
CompressionType compressionType = CompressionType.NONE;
if (getCompressOutput(job)) {
// find the kind of compression to do
compressionType = SequenceFileOutputFormat.getOutputCompressionType(job);
// find the right codec
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, job);
}
// ignore the progress parameter, since MapFile is local
final MapFile.Writer out =
new MapFile.Writer(job, fs, file.toString(),
job.getOutputKeyClass().asSubclass(WritableComparable.class),
job.getOutputValueClass().asSubclass(Writable.class),
compressionType, codec,
progress);
return new RecordWriter<WritableComparable, Writable>() {
public void write(WritableComparable key, Writable value)
throws IOException {
out.append(key, value);
}
public void close(Reporter reporter) throws IOException { out.close();}
};
}
public void close() {
try {
for (MapFile.Writer w : writers) {
w.close();
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
isClosed.set(true);
}
}
public RecordWriter<Text, NutchWritable> getRecordWriter(final FileSystem fs,
final JobConf job,
final String name,
final Progressable progress) throws IOException {
Path out = FileOutputFormat.getOutputPath(job);
final Path fetch =
new Path(new Path(out, CrawlDatum.FETCH_DIR_NAME), name);
final Path content =
new Path(new Path(out, Content.DIR_NAME), name);
final CompressionType compType = SequenceFileOutputFormat.getOutputCompressionType(job);
final MapFile.Writer fetchOut =
new MapFile.Writer(job, fs, fetch.toString(), Text.class, CrawlDatum.class,
compType, progress);
return new RecordWriter<Text, NutchWritable>() {
private MapFile.Writer contentOut;
private RecordWriter<Text, Parse> parseOut;
{
if (Fetcher.isStoringContent(job)) {
contentOut = new MapFile.Writer(job, fs, content.toString(),
Text.class, Content.class,
compType, progress);
}
if (Fetcher.isParsing(job)) {
parseOut = new ParseOutputFormat().getRecordWriter(fs, job, name, progress);
}
}
public void write(Text key, NutchWritable value)
throws IOException {
Writable w = value.get();
if (w instanceof CrawlDatum)
fetchOut.append(key, w);
else if (w instanceof Content && contentOut != null)
contentOut.append(key, w);
else if (w instanceof Parse && parseOut != null)
parseOut.write(key, (Parse)w);
}
public void close(Reporter reporter) throws IOException {
fetchOut.close();
if (contentOut != null) {
contentOut.close();
}
if (parseOut != null) {
parseOut.close(reporter);
}
}
};
}
@Override
public void run() {
if (readHandle.getLength() == 0 || !readHandle.isClosed() || readHandle.getLastAddConfirmed() < 0) {
promise.completeExceptionally(
new IllegalArgumentException("An empty or open ledger should never be offloaded"));
return;
}
long ledgerId = readHandle.getId();
String storagePath = getStoragePath(storageBasePath, extraMetadata.get(MANAGED_LEDGER_NAME));
String dataFilePath = getDataFilePath(storagePath, ledgerId, uuid);
LongWritable key = new LongWritable();
BytesWritable value = new BytesWritable();
try {
MapFile.Writer dataWriter = new MapFile.Writer(configuration,
new Path(dataFilePath),
MapFile.Writer.keyClass(LongWritable.class),
MapFile.Writer.valueClass(BytesWritable.class));
//store the ledgerMetadata in -1 index
key.set(METADATA_KEY_INDEX);
byte[] ledgerMetadata = buildLedgerMetadataFormat(readHandle.getLedgerMetadata());
value.set(ledgerMetadata, 0, ledgerMetadata.length);
dataWriter.append(key, value);
AtomicLong haveOffloadEntryNumber = new AtomicLong(0);
long needToOffloadFirstEntryNumber = 0;
CountDownLatch countDownLatch;
//avoid prefetch too much data into memory
Semaphore semaphore = new Semaphore(managedLedgerOffloadPrefetchRounds);
do {
long end = Math.min(needToOffloadFirstEntryNumber + ENTRIES_PER_READ - 1, readHandle.getLastAddConfirmed());
log.debug("read ledger entries. start: {}, end: {}", needToOffloadFirstEntryNumber, end);
LedgerEntries ledgerEntriesOnce = readHandle.readAsync(needToOffloadFirstEntryNumber, end).get();
semaphore.acquire();
countDownLatch = new CountDownLatch(1);
assignmentScheduler.chooseThread(ledgerId).submit(FileSystemWriter.create(ledgerEntriesOnce, dataWriter, semaphore,
countDownLatch, haveOffloadEntryNumber, this)).addListener(() -> {
}, Executors.newSingleThreadExecutor());
needToOffloadFirstEntryNumber = end + 1;
} while (needToOffloadFirstEntryNumber - 1 != readHandle.getLastAddConfirmed() && fileSystemWriteException == null);
countDownLatch.await();
if (fileSystemWriteException != null) {
throw fileSystemWriteException;
}
IOUtils.closeStream(dataWriter);
promise.complete(null);
} catch (Exception e) {
log.error("Exception when get CompletableFuture<LedgerEntries> : ManagerLedgerName: {}, " +
"LedgerId: {}, UUID: {} ", extraMetadata.get(MANAGED_LEDGER_NAME), ledgerId, uuid, e);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
promise.completeExceptionally(e);
}
}
public RecordWriter<Text, NutchWritable> getRecordWriter(final FileSystem fs,
final JobConf job,
final String name,
final Progressable progress) throws IOException {
Path out = FileOutputFormat.getOutputPath(job);
final Path fetch =
new Path(new Path(out, CrawlDatum.FETCH_DIR_NAME), name);
final Path content =
new Path(new Path(out, Content.DIR_NAME), name);
final CompressionType compType = SequenceFileOutputFormat.getOutputCompressionType(job);
final MapFile.Writer fetchOut =
new MapFile.Writer(job, fs, fetch.toString(), Text.class, CrawlDatum.class,
compType, progress);
return new RecordWriter<Text, NutchWritable>() {
private MapFile.Writer contentOut;
private RecordWriter<Text, Parse> parseOut;
{
if (Fetcher.isStoringContent(job)) {
contentOut = new MapFile.Writer(job, fs, content.toString(),
Text.class, Content.class,
compType, progress);
}
if (Fetcher.isParsing(job)) {
parseOut = new ParseOutputFormat().getRecordWriter(fs, job, name, progress);
}
}
public void write(Text key, NutchWritable value)
throws IOException {
Writable w = value.get();
if (w instanceof CrawlDatum)
fetchOut.append(key, w);
else if (w instanceof Content)
contentOut.append(key, w);
else if (w instanceof Parse)
parseOut.write(key, (Parse)w);
}
public void close(Reporter reporter) throws IOException {
fetchOut.close();
if (contentOut != null) {
contentOut.close();
}
if (parseOut != null) {
parseOut.close(reporter);
}
}
};
}
private synchronized MapFile.Writer getWriter() throws IOException {
assertWrite();
return m_writer;
}
public Writer(MapFile.Writer out)
{
this.out = out;
tileid = new TileIdWritable();
}