下面列出了org.apache.hadoop.io.SequenceFile.CompressionType#BLOCK 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
protected boolean writeStats(Logger log, Job job, JobID jobId, Counters counters, long start, long stop, boolean outputMutations, FileSystem fs,
Path statsDir, String metricsLabelOverride) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
// We are going to serialize the counters into a file in HDFS.
// The context was set in the processKeyValues method below, and should not be null. We'll guard against NPE anyway
try (RawLocalFileSystem rawFS = new RawLocalFileSystem()) {
rawFS.setConf(conf);
CompressionCodec cc = new GzipCodec();
CompressionType ct = CompressionType.BLOCK;
// Add additional counters
if (!outputMutations) {
counters.findCounter(IngestProcess.OUTPUT_DIRECTORY.name(), job.getWorkingDirectory().getName()).increment(1);
} else {
counters.findCounter(IngestProcess.LIVE_INGEST).increment(1);
}
counters.findCounter(IngestProcess.START_TIME).increment(start);
counters.findCounter(IngestProcess.END_TIME).increment(stop);
if (metricsLabelOverride != null) {
counters.getGroup(IngestProcess.METRICS_LABEL_OVERRIDE.name()).findCounter(metricsLabelOverride).increment(1);
}
// Serialize the counters to a file in HDFS.
Path src = new Path("/tmp" + File.separator + job.getJobName() + ".metrics");
src = rawFS.makeQualified(src);
createFileWithRetries(rawFS, src);
try (Writer writer = SequenceFile.createWriter(conf, Writer.file(src), Writer.keyClass(Text.class), Writer.valueClass(Counters.class),
Writer.compression(ct, cc))) {
writer.append(new Text(jobId.toString()), counters);
}
// Now we will try to move the file to HDFS.
// Copy the file to the temp dir
try {
if (!fs.exists(statsDir))
fs.mkdirs(statsDir);
Path dst = new Path(statsDir, src.getName());
log.info("Copying file " + src + " to " + dst);
fs.copyFromLocalFile(false, true, src, dst);
// If this worked, then remove the local file
rawFS.delete(src, false);
// also remove the residual crc file
rawFS.delete(getCrcFile(src), false);
} catch (IOException e) {
// If an error occurs in the copy, then we will leave in the local metrics directory.
log.error("Error copying metrics file into HDFS, will remain in metrics directory.", e);
return false;
}
}
// now if configured, lets write the stats out to statsd
CounterToStatsDConfiguration statsDConfig = new CounterToStatsDConfiguration(conf);
if (statsDConfig.isConfigured()) {
log.info("Sending final counters via statsd: " + statsDConfig);
CounterStatsDClient statsd = statsDConfig.getClient();
try {
statsd.sendFinalStats(counters);
} finally {
statsd.close();
}
}
return true;
}
private void writeStats(Path[] jobDirectories) throws IOException {
if (!INGEST_METRICS) {
log.info("ingest metrics disabled");
} else {
long now = System.currentTimeMillis();
for (Path p : jobDirectories)
reporter.getCounter("MapFileLoader.EndTimes", p.getName()).increment(now);
// Write out the metrics.
// We are going to serialize the counters into a file in HDFS.
// The context was set in the processKeyValues method below, and should not be null. We'll guard against NPE anyway
FileSystem fs = getFileSystem(seqFileHdfs);
RawLocalFileSystem rawFS = new RawLocalFileSystem();
rawFS.setConf(conf);
CompressionCodec cc = new GzipCodec();
CompressionType ct = CompressionType.BLOCK;
Counters c = reporter.getCounters();
if (null != c && c.countCounters() > 0) {
// Serialize the counters to a file in HDFS.
Path src = new Path(File.createTempFile("MapFileLoader", ".metrics").getAbsolutePath());
Writer writer = SequenceFile.createWriter(conf, Writer.file(rawFS.makeQualified(src)), Writer.keyClass(NullWritable.class),
Writer.valueClass(Counters.class), Writer.compression(ct, cc));
writer.append(NullWritable.get(), c);
writer.close();
// Now we will try to move the file to HDFS.
// Copy the file to the temp dir
try {
Path mDir = new Path(workDir, "MapFileLoaderMetrics");
if (!fs.exists(mDir))
fs.mkdirs(mDir);
Path dst = new Path(mDir, src.getName());
log.info("Copying file " + src + " to " + dst);
fs.copyFromLocalFile(false, true, src, dst);
// If this worked, then remove the local file
rawFS.delete(src, false);
// also remove the residual crc file
rawFS.delete(getCrcFile(src), false);
} catch (IOException e) {
// If an error occurs in the copy, then we will leave in the local metrics directory.
log.error("Error copying metrics file into HDFS, will remain in metrics directory.");
}
// reset reporter so that old metrics don't persist over time
this.reporter = new StandaloneStatusReporter();
}
}
}
/**
* test {@code BloomMapFile.Writer} constructors
*/
@SuppressWarnings("deprecation")
public void testBloomMapFileConstructors() {
BloomMapFile.Writer writer = null;
try {
FileSystem ts = FileSystem.get(conf);
String testFileName = TEST_FILE.toString();
writer = new BloomMapFile.Writer(conf, ts,
testFileName, IntWritable.class, Text.class, CompressionType.BLOCK,
defaultCodec, defaultProgress);
assertNotNull("testBloomMapFileConstructors error !!!", writer);
writer.close();
writer = new BloomMapFile.Writer(conf, ts,
testFileName, IntWritable.class, Text.class, CompressionType.BLOCK,
defaultProgress);
assertNotNull("testBloomMapFileConstructors error !!!", writer);
writer.close();
writer = new BloomMapFile.Writer(conf, ts,
testFileName, IntWritable.class, Text.class, CompressionType.BLOCK);
assertNotNull("testBloomMapFileConstructors error !!!", writer);
writer.close();
writer = new BloomMapFile.Writer(conf, ts,
testFileName, IntWritable.class, Text.class, CompressionType.RECORD,
defaultCodec, defaultProgress);
assertNotNull("testBloomMapFileConstructors error !!!", writer);
writer.close();
writer = new BloomMapFile.Writer(conf, ts,
testFileName, IntWritable.class, Text.class, CompressionType.RECORD,
defaultProgress);
assertNotNull("testBloomMapFileConstructors error !!!", writer);
writer.close();
writer = new BloomMapFile.Writer(conf, ts,
testFileName, IntWritable.class, Text.class, CompressionType.RECORD);
assertNotNull("testBloomMapFileConstructors error !!!", writer);
writer.close();
writer = new BloomMapFile.Writer(conf, ts,
testFileName, WritableComparator.get(Text.class), Text.class);
assertNotNull("testBloomMapFileConstructors error !!!", writer);
writer.close();
} catch (Exception ex) {
fail("testBloomMapFileConstructors error !!!");
} finally {
IOUtils.cleanup(null, writer);
}
}
@Override
public void open(String filePath) throws IOException {
DefaultCodec defCodec = new DefaultCodec();
CompressionType cType = CompressionType.BLOCK;
open(filePath, defCodec, cType);
}
/**
* test {@code BloomMapFile.Writer} constructors
*/
@SuppressWarnings("deprecation")
public void testBloomMapFileConstructors() {
BloomMapFile.Writer writer = null;
try {
FileSystem ts = FileSystem.get(conf);
String testFileName = TEST_FILE.toString();
writer = new BloomMapFile.Writer(conf, ts,
testFileName, IntWritable.class, Text.class, CompressionType.BLOCK,
defaultCodec, defaultProgress);
assertNotNull("testBloomMapFileConstructors error !!!", writer);
writer.close();
writer = new BloomMapFile.Writer(conf, ts,
testFileName, IntWritable.class, Text.class, CompressionType.BLOCK,
defaultProgress);
assertNotNull("testBloomMapFileConstructors error !!!", writer);
writer.close();
writer = new BloomMapFile.Writer(conf, ts,
testFileName, IntWritable.class, Text.class, CompressionType.BLOCK);
assertNotNull("testBloomMapFileConstructors error !!!", writer);
writer.close();
writer = new BloomMapFile.Writer(conf, ts,
testFileName, IntWritable.class, Text.class, CompressionType.RECORD,
defaultCodec, defaultProgress);
assertNotNull("testBloomMapFileConstructors error !!!", writer);
writer.close();
writer = new BloomMapFile.Writer(conf, ts,
testFileName, IntWritable.class, Text.class, CompressionType.RECORD,
defaultProgress);
assertNotNull("testBloomMapFileConstructors error !!!", writer);
writer.close();
writer = new BloomMapFile.Writer(conf, ts,
testFileName, IntWritable.class, Text.class, CompressionType.RECORD);
assertNotNull("testBloomMapFileConstructors error !!!", writer);
writer.close();
writer = new BloomMapFile.Writer(conf, ts,
testFileName, WritableComparator.get(Text.class), Text.class);
assertNotNull("testBloomMapFileConstructors error !!!", writer);
writer.close();
} catch (Exception ex) {
fail("testBloomMapFileConstructors error !!!");
} finally {
IOUtils.cleanup(null, writer);
}
}
@Override
public void open(String filePath) throws IOException {
DefaultCodec defCodec = new DefaultCodec();
CompressionType cType = CompressionType.BLOCK;
open(filePath, defCodec, cType);
}