下面列出了org.apache.hadoop.io.SequenceFile.Writer#close ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void writeSequenceFile(String path) throws Exception{
Writer.Option filePath = Writer.file(new Path(path));
Writer.Option keyClass = Writer.keyClass(IntWritable.class);
Writer.Option valueClass = Writer.valueClass(Text.class);
Writer.Option compression = Writer.compression(CompressionType.NONE);
Writer writer = SequenceFile.createWriter(configuration, filePath, keyClass, valueClass, compression);
IntWritable key = new IntWritable();
Text value = new Text("");
for(int i=0;i<100;i++){
key.set(i);
value.set("value_"+i);
writer.append(key, value);
}
writer.hflush();
writer.close();
}
public void testJavaSerialization() throws Exception {
Path file = new Path(System.getProperty("test.build.data",".") +
"/testseqser.seq");
fs.delete(file, true);
Writer writer = SequenceFile.createWriter(fs, conf, file, Long.class,
String.class);
writer.append(1L, "one");
writer.append(2L, "two");
writer.close();
Reader reader = new Reader(fs, file, conf);
assertEquals(1L, reader.next((Object) null));
assertEquals("one", reader.getCurrentValue((Object) null));
assertEquals(2L, reader.next((Object) null));
assertEquals("two", reader.getCurrentValue((Object) null));
assertNull(reader.next((Object) null));
reader.close();
}
public void testJavaSerialization() throws Exception {
Path file = new Path(System.getProperty("test.build.data",".") +
"/testseqser.seq");
fs.delete(file, true);
Writer writer = SequenceFile.createWriter(fs, conf, file, Long.class,
String.class);
writer.append(1L, "one");
writer.append(2L, "two");
writer.close();
Reader reader = new Reader(fs, file, conf);
assertEquals(1L, reader.next((Object) null));
assertEquals("one", reader.getCurrentValue((Object) null));
assertEquals(2L, reader.next((Object) null));
assertEquals("two", reader.getCurrentValue((Object) null));
assertNull(reader.next((Object) null));
reader.close();
}
private synchronized void storeGenerations() throws IOException {
FileSystem fileSystem = _path.getFileSystem(_configuration);
FileStatus[] listStatus = fileSystem.listStatus(_path);
SortedSet<FileStatus> existing = new TreeSet<FileStatus>(Arrays.asList(listStatus));
long currentFile;
if (!existing.isEmpty()) {
FileStatus last = existing.last();
currentFile = Long.parseLong(last.getPath().getName());
} else {
currentFile = 0;
}
Path path = new Path(_path, buffer(currentFile + 1));
LOG.info("Creating new snapshot file [{0}]", path);
FSDataOutputStream outputStream = fileSystem.create(path, false);
Writer writer = SequenceFile.createWriter(_configuration, outputStream, Text.class, LongWritable.class,
CompressionType.NONE, null);
for (Entry<String, Long> e : _namesToGenerations.entrySet()) {
writer.append(new Text(e.getKey()), new LongWritable(e.getValue()));
}
writer.close();
outputStream.close();
cleanupOldFiles(fileSystem, existing);
}
public void testJavaSerialization() throws Exception {
Path file = new Path(System.getProperty("test.build.data",".") +
"/test.seq");
fs.delete(file, true);
Writer writer = SequenceFile.createWriter(fs, conf, file, Long.class,
String.class);
writer.append(1L, "one");
writer.append(2L, "two");
writer.close();
Reader reader = new Reader(fs, file, conf);
assertEquals(1L, reader.next((Object) null));
assertEquals("one", reader.getCurrentValue((Object) null));
assertEquals(2L, reader.next((Object) null));
assertEquals("two", reader.getCurrentValue((Object) null));
assertNull(reader.next((Object) null));
reader.close();
}
public void testJavaSerialization() throws Exception {
Path file = new Path(System.getProperty("test.build.data",".") +
"/test.seq");
fs.delete(file, true);
Writer writer = SequenceFile.createWriter(fs, conf, file, Long.class,
String.class);
writer.append(1L, "one");
writer.append(2L, "two");
writer.close();
Reader reader = new Reader(fs, file, conf);
assertEquals(1L, reader.next((Object) null));
assertEquals("one", reader.getCurrentValue((Object) null));
assertEquals(2L, reader.next((Object) null));
assertEquals("two", reader.getCurrentValue((Object) null));
assertNull(reader.next((Object) null));
reader.close();
}
/** Test hsync via SequenceFiles */
@Test
public void testSequenceFileSync() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
final FileSystem fs = cluster.getFileSystem();
final Path p = new Path("/testSequenceFileSync/foo");
final int len = 1 << 16;
FSDataOutputStream out = fs.create(p, FsPermission.getDefault(),
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK),
4096, (short) 1, len, null);
Writer w = SequenceFile.createWriter(new Configuration(),
Writer.stream(out),
Writer.keyClass(RandomDatum.class),
Writer.valueClass(RandomDatum.class),
Writer.compression(CompressionType.NONE, new DefaultCodec()));
w.hflush();
checkSyncMetric(cluster, 0);
w.hsync();
checkSyncMetric(cluster, 1);
int seed = new Random().nextInt();
RandomDatum.Generator generator = new RandomDatum.Generator(seed);
generator.next();
w.append(generator.getKey(), generator.getValue());
w.hsync();
checkSyncMetric(cluster, 2);
w.close();
checkSyncMetric(cluster, 2);
out.close();
checkSyncMetric(cluster, 3);
cluster.shutdown();
}
/** Test hsync via SequenceFiles */
@Test
public void testSequenceFileSync() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
final FileSystem fs = cluster.getFileSystem();
final Path p = new Path("/testSequenceFileSync/foo");
final int len = 1 << 16;
FSDataOutputStream out = fs.create(p, FsPermission.getDefault(),
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK),
4096, (short) 1, len, null);
Writer w = SequenceFile.createWriter(new Configuration(),
Writer.stream(out),
Writer.keyClass(RandomDatum.class),
Writer.valueClass(RandomDatum.class),
Writer.compression(CompressionType.NONE, new DefaultCodec()));
w.hflush();
checkSyncMetric(cluster, 0);
w.hsync();
checkSyncMetric(cluster, 1);
int seed = new Random().nextInt();
RandomDatum.Generator generator = new RandomDatum.Generator(seed);
generator.next();
w.append(generator.getKey(), generator.getValue());
w.hsync();
checkSyncMetric(cluster, 2);
w.close();
checkSyncMetric(cluster, 2);
out.close();
checkSyncMetric(cluster, 3);
cluster.shutdown();
}
public static void copyTo64MB(String src, String dst) throws IOException {
Configuration hconf = new Configuration();
Path srcPath = new Path(src);
Path dstPath = new Path(dst);
FileSystem fs = FileSystem.get(hconf);
long srcSize = fs.getFileStatus(srcPath).getLen();
int copyTimes = (int) (67108864 / srcSize); // 64 MB
System.out.println("Copy " + copyTimes + " times");
Reader reader = new Reader(hconf, SequenceFile.Reader.file(srcPath));
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
Text value = new Text();
Writer writer = SequenceFile.createWriter(hconf, Writer.file(dstPath), Writer.keyClass(key.getClass()), Writer.valueClass(Text.class), Writer.compression(CompressionType.BLOCK, getLZOCodec(hconf)));
int count = 0;
while (reader.next(key, value)) {
for (int i = 0; i < copyTimes; i++) {
writer.append(key, value);
count++;
}
}
System.out.println("Len: " + writer.getLength());
System.out.println("Rows: " + count);
reader.close();
writer.close();
}
private void generateData(String mrIncWorkingPathStr, String rowId, String recordId, String value) throws IOException {
Path path = new Path(new Path(mrIncWorkingPathStr), "new");
Writer writer = new SequenceFile.Writer(miniCluster.getFileSystem(), conf, new Path(path, UUID.randomUUID()
.toString()), Text.class, BlurRecord.class);
BlurRecord blurRecord = new BlurRecord();
blurRecord.setRowId(rowId);
blurRecord.setRecordId(recordId);
blurRecord.setFamily("fam0");
blurRecord.addColumn("col0", value);
writer.append(new Text(rowId), blurRecord);
writer.close();
}
private org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getMrWorkingPathWriter(
final Configuration configuration) throws IOException {
PrivilegedExceptionAction<org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter> privilegedExceptionAction = new PrivilegedExceptionAction<org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter>() {
@Override
public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter run() throws Exception {
String workingPathStr = configuration.get(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH);
Path workingPath = new Path(workingPathStr);
Path tmpDir = new Path(workingPath, "tmp");
FileSystem fileSystem = tmpDir.getFileSystem(configuration);
String loadId = configuration.get(BlurSerDe.BLUR_MR_LOAD_ID);
Path loadPath = new Path(tmpDir, loadId);
final Writer writer = new SequenceFile.Writer(fileSystem, configuration, new Path(loadPath, UUID.randomUUID()
.toString()), Text.class, BlurRecord.class);
return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() {
@Override
public void write(Writable w) throws IOException {
BlurRecord blurRecord = (BlurRecord) w;
String rowId = blurRecord.getRowId();
writer.append(new Text(rowId), blurRecord);
}
@Override
public void close(boolean abort) throws IOException {
writer.close();
}
};
}
};
UserGroupInformation userGroupInformation = getUGI(configuration);
try {
return userGroupInformation.doAs(privilegedExceptionAction);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
private void writeStats(Path[] jobDirectories) throws IOException {
if (!INGEST_METRICS) {
log.info("ingest metrics disabled");
} else {
long now = System.currentTimeMillis();
for (Path p : jobDirectories)
reporter.getCounter("MapFileLoader.EndTimes", p.getName()).increment(now);
// Write out the metrics.
// We are going to serialize the counters into a file in HDFS.
// The context was set in the processKeyValues method below, and should not be null. We'll guard against NPE anyway
FileSystem fs = getFileSystem(seqFileHdfs);
RawLocalFileSystem rawFS = new RawLocalFileSystem();
rawFS.setConf(conf);
CompressionCodec cc = new GzipCodec();
CompressionType ct = CompressionType.BLOCK;
Counters c = reporter.getCounters();
if (null != c && c.countCounters() > 0) {
// Serialize the counters to a file in HDFS.
Path src = new Path(File.createTempFile("MapFileLoader", ".metrics").getAbsolutePath());
Writer writer = SequenceFile.createWriter(conf, Writer.file(rawFS.makeQualified(src)), Writer.keyClass(NullWritable.class),
Writer.valueClass(Counters.class), Writer.compression(ct, cc));
writer.append(NullWritable.get(), c);
writer.close();
// Now we will try to move the file to HDFS.
// Copy the file to the temp dir
try {
Path mDir = new Path(workDir, "MapFileLoaderMetrics");
if (!fs.exists(mDir))
fs.mkdirs(mDir);
Path dst = new Path(mDir, src.getName());
log.info("Copying file " + src + " to " + dst);
fs.copyFromLocalFile(false, true, src, dst);
// If this worked, then remove the local file
rawFS.delete(src, false);
// also remove the residual crc file
rawFS.delete(getCrcFile(src), false);
} catch (IOException e) {
// If an error occurs in the copy, then we will leave in the local metrics directory.
log.error("Error copying metrics file into HDFS, will remain in metrics directory.");
}
// reset reporter so that old metrics don't persist over time
this.reporter = new StandaloneStatusReporter();
}
}
}
/**
* @param jobFileStatusses statusses sorted by modification time.
* @param batch which batch needs to be processed (used to calculate offset in
* jobFileStatusses.
* @param batchSize process up to length items (or less as to not exceed the
* length of jobFileStatusses
* @param processRecordService to be used to access create ProcessRecords.
* @throws IOException when the index file cannot be written or moved, or when
* the HBase records cannot be created.
*/
private void processBatch(FileStatus jobFileStatusses[], int batch,
int batchSize, ProcessRecordService processRecordService, String cluster,
Path outputPath) throws IOException {
int startIndex = batch * batchSize;
LOG.info("Batch startIndex: " + startIndex + " batchSize: " + batchSize);
// Some protection against over and under runs.
if ((jobFileStatusses == null) || (startIndex < 0)
|| (startIndex >= jobFileStatusses.length)) {
return;
}
MinMaxJobFileTracker minMaxJobFileTracker = new MinMaxJobFileTracker();
Path initialProcesFile =
processRecordService.getInitialProcessFile(cluster, batch);
Writer processFileWriter =
processRecordService.createProcessFileWriter(initialProcesFile);
// Make sure we don't run off the end of the array
int endIndexExclusive =
Math.min((startIndex + batchSize), jobFileStatusses.length);
try {
for (int i = startIndex; i < endIndexExclusive; i++) {
FileStatus fileStatus = jobFileStatusses[i];
JobFile jobFile = minMaxJobFileTracker.track(fileStatus);
// String jobfileName = fileStatus.getPath().getName();
// LOG.info(jobfileName + " modified: "
// + fileStatus.getModificationTime());
processFileWriter.append(jobFile, fileStatus);
}
} finally {
processFileWriter.close();
}
Path processFile =
processRecordService.moveProcessFile(initialProcesFile, outputPath);
int processedJobFiles = endIndexExclusive - startIndex;
ProcessRecord processRecord = new ProcessRecord(cluster,
ProcessState.PREPROCESSED,
minMaxJobFileTracker.getMinModificationTimeMillis(),
minMaxJobFileTracker.getMaxModificationTimeMillis(), processedJobFiles,
processFile.toString(), minMaxJobFileTracker.getMinJobId(),
minMaxJobFileTracker.getMaxJobId());
LOG.info("Creating processRecord: " + processRecord);
processRecordService.writeJobRecord(processRecord);
}