下面列出了怎么用org.apache.hadoop.io.SequenceFile.Writer的API类实例代码及写法,或者点击链接到github查看源代码。
public static void writeSequenceFile(String path) throws Exception{
Writer.Option filePath = Writer.file(new Path(path));
Writer.Option keyClass = Writer.keyClass(IntWritable.class);
Writer.Option valueClass = Writer.valueClass(Text.class);
Writer.Option compression = Writer.compression(CompressionType.NONE);
Writer writer = SequenceFile.createWriter(configuration, filePath, keyClass, valueClass, compression);
IntWritable key = new IntWritable();
Text value = new Text("");
for(int i=0;i<100;i++){
key.set(i);
value.set("value_"+i);
writer.append(key, value);
}
writer.hflush();
writer.close();
}
@Override
protected void processInputStream(InputStream stream, final FlowFile flowFile, final Writer writer) throws IOException {
try (final ZipInputStream zipIn = new ZipInputStream(new BufferedInputStream(stream))) {
ZipEntry zipEntry;
while ((zipEntry = zipIn.getNextEntry()) != null) {
if (zipEntry.isDirectory()) {
continue;
}
final File file = new File(zipEntry.getName());
final String key = file.getName();
long fileSize = zipEntry.getSize();
final InputStreamWritable inStreamWritable = new InputStreamWritable(zipIn, (int) fileSize);
writer.append(new Text(key), inStreamWritable);
logger.debug("Appending FlowFile {} to Sequence File", new Object[]{key});
}
}
}
@Override
protected void processInputStream(final InputStream stream, final FlowFile tarArchivedFlowFile, final Writer writer) throws IOException {
try (final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(stream))) {
TarArchiveEntry tarEntry;
while ((tarEntry = tarIn.getNextTarEntry()) != null) {
if (tarEntry.isDirectory()) {
continue;
}
final String key = tarEntry.getName();
final long fileSize = tarEntry.getSize();
final InputStreamWritable inStreamWritable = new InputStreamWritable(tarIn, (int) fileSize);
writer.append(new Text(key), inStreamWritable);
logger.debug("Appending FlowFile {} to Sequence File", new Object[]{key});
}
}
}
@Override
public void doBuildListing(Path pathToListFile, DistCpOptions options) throws IOException {
try (Writer writer = newWriter(pathToListFile)) {
Path sourceRootPath = getRootPath(getConf());
for (Path sourcePath : options.getSourcePaths()) {
FileSystem fileSystem = sourcePath.getFileSystem(getConf());
FileStatus directory = fileSystem.getFileStatus(sourcePath);
Map<String, CopyListingFileStatus> children = new FileStatusTreeTraverser(fileSystem)
.preOrderTraversal(directory)
.transform(new CopyListingFileStatusFunction(fileSystem, options))
.uniqueIndex(new RelativePathFunction(sourceRootPath));
for (Entry<String, CopyListingFileStatus> entry : children.entrySet()) {
LOG.debug("Adding '{}' with relative path '{}'", entry.getValue().getPath(), entry.getKey());
writer.append(new Text(entry.getKey()), entry.getValue());
writer.sync();
}
}
}
}
public void testJavaSerialization() throws Exception {
Path file = new Path(System.getProperty("test.build.data",".") +
"/testseqser.seq");
fs.delete(file, true);
Writer writer = SequenceFile.createWriter(fs, conf, file, Long.class,
String.class);
writer.append(1L, "one");
writer.append(2L, "two");
writer.close();
Reader reader = new Reader(fs, file, conf);
assertEquals(1L, reader.next((Object) null));
assertEquals("one", reader.getCurrentValue((Object) null));
assertEquals(2L, reader.next((Object) null));
assertEquals("two", reader.getCurrentValue((Object) null));
assertNull(reader.next((Object) null));
reader.close();
}
public void testJavaSerialization() throws Exception {
Path file = new Path(System.getProperty("test.build.data",".") +
"/testseqser.seq");
fs.delete(file, true);
Writer writer = SequenceFile.createWriter(fs, conf, file, Long.class,
String.class);
writer.append(1L, "one");
writer.append(2L, "two");
writer.close();
Reader reader = new Reader(fs, file, conf);
assertEquals(1L, reader.next((Object) null));
assertEquals("one", reader.getCurrentValue((Object) null));
assertEquals(2L, reader.next((Object) null));
assertEquals("two", reader.getCurrentValue((Object) null));
assertNull(reader.next((Object) null));
reader.close();
}
private void writeRowIds(Writer writer, SegmentReader segmentReader) throws IOException {
Terms terms = segmentReader.terms(BlurConstants.ROW_ID);
if (terms == null) {
return;
}
TermsEnum termsEnum = terms.iterator(null);
BytesRef rowId;
long s = System.nanoTime();
while ((rowId = termsEnum.next()) != null) {
long n = System.nanoTime();
if (n + _10_SECONDS > s) {
_progressable.progress();
s = System.nanoTime();
}
writer.append(new Text(rowId.utf8ToString()), NullWritable.get());
}
}
private synchronized void storeGenerations() throws IOException {
FileSystem fileSystem = _path.getFileSystem(_configuration);
FileStatus[] listStatus = fileSystem.listStatus(_path);
SortedSet<FileStatus> existing = new TreeSet<FileStatus>(Arrays.asList(listStatus));
long currentFile;
if (!existing.isEmpty()) {
FileStatus last = existing.last();
currentFile = Long.parseLong(last.getPath().getName());
} else {
currentFile = 0;
}
Path path = new Path(_path, buffer(currentFile + 1));
LOG.info("Creating new snapshot file [{0}]", path);
FSDataOutputStream outputStream = fileSystem.create(path, false);
Writer writer = SequenceFile.createWriter(_configuration, outputStream, Text.class, LongWritable.class,
CompressionType.NONE, null);
for (Entry<String, Long> e : _namesToGenerations.entrySet()) {
writer.append(new Text(e.getKey()), new LongWritable(e.getValue()));
}
writer.close();
outputStream.close();
cleanupOldFiles(fileSystem, existing);
}
public void testJavaSerialization() throws Exception {
Path file = new Path(System.getProperty("test.build.data",".") +
"/test.seq");
fs.delete(file, true);
Writer writer = SequenceFile.createWriter(fs, conf, file, Long.class,
String.class);
writer.append(1L, "one");
writer.append(2L, "two");
writer.close();
Reader reader = new Reader(fs, file, conf);
assertEquals(1L, reader.next((Object) null));
assertEquals("one", reader.getCurrentValue((Object) null));
assertEquals(2L, reader.next((Object) null));
assertEquals("two", reader.getCurrentValue((Object) null));
assertNull(reader.next((Object) null));
reader.close();
}
@Override
protected void processInputStream(InputStream stream, final FlowFile flowFile, final Writer writer) throws IOException {
try (final ZipInputStream zipIn = new ZipInputStream(new BufferedInputStream(stream))) {
ZipEntry zipEntry;
while ((zipEntry = zipIn.getNextEntry()) != null) {
if (zipEntry.isDirectory()) {
continue;
}
final File file = new File(zipEntry.getName());
final String key = file.getName();
long fileSize = zipEntry.getSize();
final InputStreamWritable inStreamWritable = new InputStreamWritable(zipIn, (int) fileSize);
writer.append(new Text(key), inStreamWritable);
logger.debug("Appending FlowFile {} to Sequence File", new Object[]{key});
}
}
}
@Override
protected void processInputStream(final InputStream stream, final FlowFile tarArchivedFlowFile, final Writer writer) throws IOException {
try (final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(stream))) {
TarArchiveEntry tarEntry;
while ((tarEntry = tarIn.getNextTarEntry()) != null) {
if (tarEntry.isDirectory()) {
continue;
}
final String key = tarEntry.getName();
final long fileSize = tarEntry.getSize();
final InputStreamWritable inStreamWritable = new InputStreamWritable(tarIn, (int) fileSize);
writer.append(new Text(key), inStreamWritable);
logger.debug("Appending FlowFile {} to Sequence File", new Object[]{key});
}
}
}
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err.println("Usage: ConvertFastaForCloud file.fa outfile.br");
System.exit(-1);
}
String infile = args[0];
String outfile = args[1];
System.err.println("Converting " + infile + " into " + outfile);
JobConf config = new JobConf();
SequenceFile.Writer writer = SequenceFile.createWriter(FileSystem.get(config), config,
new Path(outfile), IntWritable.class, BytesWritable.class);
convertFile(infile, writer);
writer.close();
System.err.println("min_seq_len: " + min_seq_len);
System.err.println("max_seq_len: " + max_seq_len);
System.err.println("Using DNAString version: " + DNAString.VERSION);
}
public void testJavaSerialization() throws Exception {
Path file = new Path(System.getProperty("test.build.data",".") +
"/test.seq");
fs.delete(file, true);
Writer writer = SequenceFile.createWriter(fs, conf, file, Long.class,
String.class);
writer.append(1L, "one");
writer.append(2L, "two");
writer.close();
Reader reader = new Reader(fs, file, conf);
assertEquals(1L, reader.next((Object) null));
assertEquals("one", reader.getCurrentValue((Object) null));
assertEquals(2L, reader.next((Object) null));
assertEquals("two", reader.getCurrentValue((Object) null));
assertNull(reader.next((Object) null));
reader.close();
}
public static void createSequenceFile(Object[] params, String uri)
throws URISyntaxException, IOException, UIMAException, NoSuchMethodException, MissingSettingException, ClassNotFoundException {
Configuration conf = new Configuration();
Path path = new Path(uri);
Writer writer =
SequenceFile.createWriter(
conf, Writer.file(path),
Writer.keyClass(Text.class),
Writer.valueClass(SCAS.class));
int count = 0;
CollectionReaderDescription readerDescription = Reader.getCollectionReaderDescription(Reader.COLLECTION_FORMAT.NYT, params);
for (JCas jCas : SimplePipelineCasPoolIterator.iteratePipeline(20, readerDescription)) {
if(JCasUtil.exists(jCas, DocumentMetaData.class)) {
++count;
// Get the ID.
DocumentMetaData dmd = JCasUtil.selectSingle(jCas, DocumentMetaData.class);
String docId = "NULL";
if (dmd != null) {
docId = dmd.getDocumentId();
} else {
throw new IOException("No Document ID for xml: " + jCas.getView("xml").getDocumentText());
}
Text docIdText = new Text(docId);
SCAS scas = new SCAS(jCas.getCas());
writer.append(docIdText, scas);
}
jCas.release();
}
logger.info("Wrote " + count + " documents to " + uri);
IOUtils.closeStream(writer);
}
@Override
protected void processInputStream(final InputStream stream, final FlowFile flowFileStreamPackedFlowFile, final Writer writer) throws IOException {
final FlowFileUnpackager unpackager = new FlowFileUnpackager();
try (final InputStream in = new BufferedInputStream(stream)) {
while (unpackager.hasMoreData()) {
unpackager.unpackageFlowFile(stream, writer);
}
}
}
private Writer newWriter(Path pathToListFile) throws IOException {
FileSystem fs = pathToListFile.getFileSystem(getConf());
if (fs.exists(pathToListFile)) {
fs.delete(pathToListFile, false);
}
return createWriter(getConf(), file(pathToListFile), keyClass(Text.class), valueClass(CopyListingFileStatus.class),
compression(NONE));
}
/** Test hsync via SequenceFiles */
@Test
public void testSequenceFileSync() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
final FileSystem fs = cluster.getFileSystem();
final Path p = new Path("/testSequenceFileSync/foo");
final int len = 1 << 16;
FSDataOutputStream out = fs.create(p, FsPermission.getDefault(),
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK),
4096, (short) 1, len, null);
Writer w = SequenceFile.createWriter(new Configuration(),
Writer.stream(out),
Writer.keyClass(RandomDatum.class),
Writer.valueClass(RandomDatum.class),
Writer.compression(CompressionType.NONE, new DefaultCodec()));
w.hflush();
checkSyncMetric(cluster, 0);
w.hsync();
checkSyncMetric(cluster, 1);
int seed = new Random().nextInt();
RandomDatum.Generator generator = new RandomDatum.Generator(seed);
generator.next();
w.append(generator.getKey(), generator.getValue());
w.hsync();
checkSyncMetric(cluster, 2);
w.close();
checkSyncMetric(cluster, 2);
out.close();
checkSyncMetric(cluster, 3);
cluster.shutdown();
}
/** Test hsync via SequenceFiles */
@Test
public void testSequenceFileSync() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
final FileSystem fs = cluster.getFileSystem();
final Path p = new Path("/testSequenceFileSync/foo");
final int len = 1 << 16;
FSDataOutputStream out = fs.create(p, FsPermission.getDefault(),
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK),
4096, (short) 1, len, null);
Writer w = SequenceFile.createWriter(new Configuration(),
Writer.stream(out),
Writer.keyClass(RandomDatum.class),
Writer.valueClass(RandomDatum.class),
Writer.compression(CompressionType.NONE, new DefaultCodec()));
w.hflush();
checkSyncMetric(cluster, 0);
w.hsync();
checkSyncMetric(cluster, 1);
int seed = new Random().nextInt();
RandomDatum.Generator generator = new RandomDatum.Generator(seed);
generator.next();
w.append(generator.getKey(), generator.getValue());
w.hsync();
checkSyncMetric(cluster, 2);
w.close();
checkSyncMetric(cluster, 2);
out.close();
checkSyncMetric(cluster, 3);
cluster.shutdown();
}
/**
* Initializes the hadoop sorter. Does some local file system setup, and is somewhat expensive
* (~20 ms on local machine). Only executed when necessary.
*/
private void initHadoopSorter() throws IOException {
if (!initialized) {
tempDir = new Path(options.getTempLocation(), "tmp" + UUID.randomUUID().toString());
paths = new Path[] {new Path(tempDir, "test.seq")};
JobConf conf = new JobConf();
// Sets directory for intermediate files created during merge of merge sort
conf.set("io.seqfile.local.dir", tempDir.toUri().getPath());
writer =
SequenceFile.createWriter(
conf,
Writer.valueClass(BytesWritable.class),
Writer.keyClass(BytesWritable.class),
Writer.file(paths[0]),
Writer.compression(CompressionType.NONE));
FileSystem fs = FileSystem.getLocal(conf);
// Directory has to exist for Hadoop to recognize it as deletable on exit
fs.mkdirs(tempDir);
fs.deleteOnExit(tempDir);
sorter =
new SequenceFile.Sorter(
fs, new BytesWritable.Comparator(), BytesWritable.class, BytesWritable.class, conf);
sorter.setMemory(options.getMemoryMB() * 1024 * 1024);
initialized = true;
}
}
@Override
public Writer getWriter() throws IOException {
counter = new AtomicLong(0);
return SequenceFile
.createWriter(
getConfiguration(),
getOptions().toArray(new Option[getOptions().size()]));
}
@Override
protected List<Option> getOptions() throws IllegalArgumentException, IOException {
List<Option> opts = super.getOptions();
opts.add(Writer.keyClass(LongWritable.class));
opts.add(Writer.valueClass(Text.class));
return opts;
}
protected List<Option> getOptions() throws IllegalArgumentException, IOException {
List<Option> list = new ArrayList<Option>();
list.add(Writer.stream(getHdfsStream()));
if (getCompressionCodec() != null) {
list.add(Writer.compression(SequenceFile.CompressionType.RECORD, getCompressionCodec()));
}
return list;
}
@Override
protected List<Option> getOptions() throws IllegalArgumentException, IOException {
List<Option> opts = super.getOptions();
opts.add(Writer.keyClass(Text.class));
opts.add(Writer.valueClass(Text.class));
return opts;
}
@Override
public Writer getWriter() throws IOException {
counter = new AtomicLong(0);
return SequenceFile
.createWriter(
getConfiguration(),
getOptions().toArray(new Option[getOptions().size()]));
}
@Override
protected List<Option> getOptions() throws IllegalArgumentException, IOException {
List<Option> opts = super.getOptions();
opts.add(Writer.keyClass(LongWritable.class));
opts.add(Writer.valueClass(Text.class));
return opts;
}
protected List<Option> getOptions() throws IllegalArgumentException, IOException {
List<Option> list = new ArrayList<Option>();
list.add(Writer.stream(getHdfsStream()));
if (getCompressionCodec() != null) {
list.add(Writer.compression(SequenceFile.CompressionType.RECORD, getCompressionCodec()));
}
return list;
}
@Override
protected List<Option> getOptions() throws IllegalArgumentException, IOException {
List<Option> opts = super.getOptions();
opts.add(Writer.keyClass(Text.class));
opts.add(Writer.valueClass(Text.class));
return opts;
}
public GitConnector(String path, String projectName, Writer astWriter, long astWriterLen, Writer commitWriter, long commitWriterLen, Writer contentWriter, long contentWriterLen) {
this(path, projectName);
this.astWriter = astWriter;
this.commitWriter = commitWriter;
this.contentWriter = contentWriter;
this.astWriterLen = astWriterLen;
this.commitWriterLen = commitWriterLen;
this.contentWriterLen = contentWriterLen;
}
public static void copyTo64MB(String src, String dst) throws IOException {
Configuration hconf = new Configuration();
Path srcPath = new Path(src);
Path dstPath = new Path(dst);
FileSystem fs = FileSystem.get(hconf);
long srcSize = fs.getFileStatus(srcPath).getLen();
int copyTimes = (int) (67108864 / srcSize); // 64 MB
System.out.println("Copy " + copyTimes + " times");
Reader reader = new Reader(hconf, SequenceFile.Reader.file(srcPath));
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
Text value = new Text();
Writer writer = SequenceFile.createWriter(hconf, Writer.file(dstPath), Writer.keyClass(key.getClass()), Writer.valueClass(Text.class), Writer.compression(CompressionType.BLOCK, getLZOCodec(hconf)));
int count = 0;
while (reader.next(key, value)) {
for (int i = 0; i < copyTimes; i++) {
writer.append(key, value);
count++;
}
}
System.out.println("Len: " + writer.getLength());
System.out.println("Rows: " + count);
reader.close();
writer.close();
}
private void createCacheFile(Path file, SegmentKey segmentKey) throws IOException {
LOG.info("Building cache for segment [{0}] to [{1}]", segmentKey, file);
Path tmpPath = getTmpWriterPath(file.getParent());
try (Writer writer = createWriter(_configuration, tmpPath)) {
DirectoryReader reader = getReader();
for (AtomicReaderContext context : reader.leaves()) {
SegmentReader segmentReader = AtomicReaderUtil.getSegmentReader(context.reader());
if (segmentReader.getSegmentName().equals(segmentKey.getSegmentName())) {
writeRowIds(writer, segmentReader);
break;
}
}
}
commitWriter(_configuration, file, tmpPath);
}