类org.apache.hadoop.io.SequenceFile.Writer源码实例Demo

下面列出了怎么用org.apache.hadoop.io.SequenceFile.Writer的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: alchemy   文件: SequenceFileUtil.java
public static void writeSequenceFile(String path) throws Exception{
	Writer.Option filePath = Writer.file(new Path(path));
	Writer.Option keyClass = Writer.keyClass(IntWritable.class);
	Writer.Option valueClass = Writer.valueClass(Text.class);
	Writer.Option compression = Writer.compression(CompressionType.NONE);
	Writer writer = SequenceFile.createWriter(configuration, filePath, keyClass, valueClass, compression);
	IntWritable key = new IntWritable();
	Text value = new Text("");
	for(int i=0;i<100;i++){
		key.set(i);
		value.set("value_"+i);
		writer.append(key, value);
	}
	writer.hflush();
	writer.close();
}
 
@Override
protected void processInputStream(InputStream stream, final FlowFile flowFile, final Writer writer) throws IOException {

    try (final ZipInputStream zipIn = new ZipInputStream(new BufferedInputStream(stream))) {
        ZipEntry zipEntry;
        while ((zipEntry = zipIn.getNextEntry()) != null) {
            if (zipEntry.isDirectory()) {
                continue;
            }
            final File file = new File(zipEntry.getName());
            final String key = file.getName();
            long fileSize = zipEntry.getSize();
            final InputStreamWritable inStreamWritable = new InputStreamWritable(zipIn, (int) fileSize);
            writer.append(new Text(key), inStreamWritable);
            logger.debug("Appending FlowFile {} to Sequence File", new Object[]{key});
        }
    }
}
 
@Override
protected void processInputStream(final InputStream stream, final FlowFile tarArchivedFlowFile, final Writer writer) throws IOException {
    try (final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(stream))) {
        TarArchiveEntry tarEntry;
        while ((tarEntry = tarIn.getNextTarEntry()) != null) {
            if (tarEntry.isDirectory()) {
                continue;
            }
            final String key = tarEntry.getName();
            final long fileSize = tarEntry.getSize();
            final InputStreamWritable inStreamWritable = new InputStreamWritable(tarIn, (int) fileSize);
            writer.append(new Text(key), inStreamWritable);
            logger.debug("Appending FlowFile {} to Sequence File", new Object[]{key});
        }
    }
}
 
源代码4 项目: circus-train   文件: CircusTrainCopyListing.java
@Override
public void doBuildListing(Path pathToListFile, DistCpOptions options) throws IOException {
  try (Writer writer = newWriter(pathToListFile)) {

    Path sourceRootPath = getRootPath(getConf());

    for (Path sourcePath : options.getSourcePaths()) {

      FileSystem fileSystem = sourcePath.getFileSystem(getConf());
      FileStatus directory = fileSystem.getFileStatus(sourcePath);

      Map<String, CopyListingFileStatus> children = new FileStatusTreeTraverser(fileSystem)
          .preOrderTraversal(directory)
          .transform(new CopyListingFileStatusFunction(fileSystem, options))
          .uniqueIndex(new RelativePathFunction(sourceRootPath));

      for (Entry<String, CopyListingFileStatus> entry : children.entrySet()) {
        LOG.debug("Adding '{}' with relative path '{}'", entry.getValue().getPath(), entry.getKey());
        writer.append(new Text(entry.getKey()), entry.getValue());
        writer.sync();
      }
    }
  }
}
 
源代码5 项目: hadoop   文件: TestSequenceFileSerialization.java
public void testJavaSerialization() throws Exception {
  Path file = new Path(System.getProperty("test.build.data",".") +
      "/testseqser.seq");
  
  fs.delete(file, true);
  Writer writer = SequenceFile.createWriter(fs, conf, file, Long.class,
      String.class);
  
  writer.append(1L, "one");
  writer.append(2L, "two");
  
  writer.close();
  
  Reader reader = new Reader(fs, file, conf);
  assertEquals(1L, reader.next((Object) null));
  assertEquals("one", reader.getCurrentValue((Object) null));
  assertEquals(2L, reader.next((Object) null));
  assertEquals("two", reader.getCurrentValue((Object) null));
  assertNull(reader.next((Object) null));
  reader.close();
  
}
 
源代码6 项目: big-c   文件: TestSequenceFileSerialization.java
public void testJavaSerialization() throws Exception {
  Path file = new Path(System.getProperty("test.build.data",".") +
      "/testseqser.seq");
  
  fs.delete(file, true);
  Writer writer = SequenceFile.createWriter(fs, conf, file, Long.class,
      String.class);
  
  writer.append(1L, "one");
  writer.append(2L, "two");
  
  writer.close();
  
  Reader reader = new Reader(fs, file, conf);
  assertEquals(1L, reader.next((Object) null));
  assertEquals("one", reader.getCurrentValue((Object) null));
  assertEquals(2L, reader.next((Object) null));
  assertEquals("two", reader.getCurrentValue((Object) null));
  assertNull(reader.next((Object) null));
  reader.close();
  
}
 
private void writeRowIds(Writer writer, SegmentReader segmentReader) throws IOException {
  Terms terms = segmentReader.terms(BlurConstants.ROW_ID);
  if (terms == null) {
    return;
  }
  TermsEnum termsEnum = terms.iterator(null);
  BytesRef rowId;
  long s = System.nanoTime();
  while ((rowId = termsEnum.next()) != null) {
    long n = System.nanoTime();
    if (n + _10_SECONDS > s) {
      _progressable.progress();
      s = System.nanoTime();
    }
    writer.append(new Text(rowId.utf8ToString()), NullWritable.get());
  }
}
 
private synchronized void storeGenerations() throws IOException {
  FileSystem fileSystem = _path.getFileSystem(_configuration);
  FileStatus[] listStatus = fileSystem.listStatus(_path);
  SortedSet<FileStatus> existing = new TreeSet<FileStatus>(Arrays.asList(listStatus));
  long currentFile;
  if (!existing.isEmpty()) {
    FileStatus last = existing.last();
    currentFile = Long.parseLong(last.getPath().getName());
  } else {
    currentFile = 0;
  }
  Path path = new Path(_path, buffer(currentFile + 1));
  LOG.info("Creating new snapshot file [{0}]", path);
  FSDataOutputStream outputStream = fileSystem.create(path, false);
  Writer writer = SequenceFile.createWriter(_configuration, outputStream, Text.class, LongWritable.class,
      CompressionType.NONE, null);
  for (Entry<String, Long> e : _namesToGenerations.entrySet()) {
    writer.append(new Text(e.getKey()), new LongWritable(e.getValue()));
  }
  writer.close();
  outputStream.close();
  cleanupOldFiles(fileSystem, existing);
}
 
源代码9 项目: RDFS   文件: TestSequenceFileSerialization.java
public void testJavaSerialization() throws Exception {
  Path file = new Path(System.getProperty("test.build.data",".") +
      "/test.seq");
  
  fs.delete(file, true);
  Writer writer = SequenceFile.createWriter(fs, conf, file, Long.class,
      String.class);
  
  writer.append(1L, "one");
  writer.append(2L, "two");
  
  writer.close();
  
  Reader reader = new Reader(fs, file, conf);
  assertEquals(1L, reader.next((Object) null));
  assertEquals("one", reader.getCurrentValue((Object) null));
  assertEquals(2L, reader.next((Object) null));
  assertEquals("two", reader.getCurrentValue((Object) null));
  assertNull(reader.next((Object) null));
  reader.close();
  
}
 
源代码10 项目: nifi   文件: ZipUnpackerSequenceFileWriter.java
@Override
protected void processInputStream(InputStream stream, final FlowFile flowFile, final Writer writer) throws IOException {

    try (final ZipInputStream zipIn = new ZipInputStream(new BufferedInputStream(stream))) {
        ZipEntry zipEntry;
        while ((zipEntry = zipIn.getNextEntry()) != null) {
            if (zipEntry.isDirectory()) {
                continue;
            }
            final File file = new File(zipEntry.getName());
            final String key = file.getName();
            long fileSize = zipEntry.getSize();
            final InputStreamWritable inStreamWritable = new InputStreamWritable(zipIn, (int) fileSize);
            writer.append(new Text(key), inStreamWritable);
            logger.debug("Appending FlowFile {} to Sequence File", new Object[]{key});
        }
    }
}
 
源代码11 项目: nifi   文件: TarUnpackerSequenceFileWriter.java
@Override
protected void processInputStream(final InputStream stream, final FlowFile tarArchivedFlowFile, final Writer writer) throws IOException {
    try (final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(stream))) {
        TarArchiveEntry tarEntry;
        while ((tarEntry = tarIn.getNextTarEntry()) != null) {
            if (tarEntry.isDirectory()) {
                continue;
            }
            final String key = tarEntry.getName();
            final long fileSize = tarEntry.getSize();
            final InputStreamWritable inStreamWritable = new InputStreamWritable(tarIn, (int) fileSize);
            writer.append(new Text(key), inStreamWritable);
            logger.debug("Appending FlowFile {} to Sequence File", new Object[]{key});
        }
    }
}
 
源代码12 项目: emr-sample-apps   文件: ConvertFastaForCloud.java
/**
 * @param args
 * @throws IOException 
 */
public static void main(String[] args) throws IOException {
	if (args.length != 2) {
		System.err.println("Usage: ConvertFastaForCloud file.fa outfile.br");
		System.exit(-1);
	}
	
	String infile = args[0];
	String outfile = args[1];
	
	System.err.println("Converting " + infile + " into " + outfile);
	
	JobConf config = new JobConf();
	
	SequenceFile.Writer writer = SequenceFile.createWriter(FileSystem.get(config), config,
			new Path(outfile), IntWritable.class, BytesWritable.class);
	
	convertFile(infile, writer);
	
	writer.close();
	
	System.err.println("min_seq_len: " + min_seq_len);
	System.err.println("max_seq_len: " + max_seq_len);
	System.err.println("Using DNAString version: " + DNAString.VERSION);
}
 
源代码13 项目: hadoop-gpu   文件: TestSequenceFileSerialization.java
public void testJavaSerialization() throws Exception {
  Path file = new Path(System.getProperty("test.build.data",".") +
      "/test.seq");
  
  fs.delete(file, true);
  Writer writer = SequenceFile.createWriter(fs, conf, file, Long.class,
      String.class);
  
  writer.append(1L, "one");
  writer.append(2L, "two");
  
  writer.close();
  
  Reader reader = new Reader(fs, file, conf);
  assertEquals(1L, reader.next((Object) null));
  assertEquals("one", reader.getCurrentValue((Object) null));
  assertEquals(2L, reader.next((Object) null));
  assertEquals("two", reader.getCurrentValue((Object) null));
  assertNull(reader.next((Object) null));
  reader.close();
  
}
 
源代码14 项目: ambiverse-nlu   文件: SparkUimaUtils.java
public static void createSequenceFile(Object[] params, String uri)
    throws URISyntaxException, IOException, UIMAException, NoSuchMethodException, MissingSettingException, ClassNotFoundException {
  Configuration conf = new Configuration();
  Path path = new Path(uri);
  Writer writer =
      SequenceFile.createWriter(
          conf, Writer.file(path),
          Writer.keyClass(Text.class),
          Writer.valueClass(SCAS.class));

  int count = 0;

  CollectionReaderDescription readerDescription = Reader.getCollectionReaderDescription(Reader.COLLECTION_FORMAT.NYT, params);
  for (JCas jCas : SimplePipelineCasPoolIterator.iteratePipeline(20, readerDescription)) {
      if(JCasUtil.exists(jCas, DocumentMetaData.class)) {
        ++count;
        // Get the ID.
        DocumentMetaData dmd = JCasUtil.selectSingle(jCas, DocumentMetaData.class);
        String docId = "NULL";
        if (dmd != null) {
          docId = dmd.getDocumentId();
        } else {
          throw new IOException("No Document ID for xml: " + jCas.getView("xml").getDocumentText());
        }
        Text docIdText = new Text(docId);
        SCAS scas = new SCAS(jCas.getCas());
        writer.append(docIdText, scas);
      }
      jCas.release();
  }
  logger.info("Wrote " + count + " documents to " + uri);
  IOUtils.closeStream(writer);
}
 
@Override
protected void processInputStream(final InputStream stream, final FlowFile flowFileStreamPackedFlowFile, final Writer writer) throws IOException {
    final FlowFileUnpackager unpackager = new FlowFileUnpackager();
    try (final InputStream in = new BufferedInputStream(stream)) {
        while (unpackager.hasMoreData()) {
            unpackager.unpackageFlowFile(stream, writer);
        }
    }
}
 
源代码16 项目: circus-train   文件: CircusTrainCopyListing.java
private Writer newWriter(Path pathToListFile) throws IOException {
  FileSystem fs = pathToListFile.getFileSystem(getConf());
  if (fs.exists(pathToListFile)) {
    fs.delete(pathToListFile, false);
  }
  return createWriter(getConf(), file(pathToListFile), keyClass(Text.class), valueClass(CopyListingFileStatus.class),
      compression(NONE));
}
 
源代码17 项目: hadoop   文件: TestHSync.java
/** Test hsync via SequenceFiles */
@Test
public void testSequenceFileSync() throws Exception {
  Configuration conf = new HdfsConfiguration();
  MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();

  final FileSystem fs = cluster.getFileSystem();
  final Path p = new Path("/testSequenceFileSync/foo");
  final int len = 1 << 16;
  FSDataOutputStream out = fs.create(p, FsPermission.getDefault(),
      EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK),
      4096, (short) 1, len, null);
  Writer w = SequenceFile.createWriter(new Configuration(),
      Writer.stream(out),
      Writer.keyClass(RandomDatum.class),
      Writer.valueClass(RandomDatum.class),
      Writer.compression(CompressionType.NONE, new DefaultCodec()));
  w.hflush();
  checkSyncMetric(cluster, 0);
  w.hsync();
  checkSyncMetric(cluster, 1);
  int seed = new Random().nextInt();
  RandomDatum.Generator generator = new RandomDatum.Generator(seed);
  generator.next();
  w.append(generator.getKey(), generator.getValue());
  w.hsync();
  checkSyncMetric(cluster, 2);
  w.close();
  checkSyncMetric(cluster, 2);
  out.close();
  checkSyncMetric(cluster, 3);
  cluster.shutdown();
}
 
源代码18 项目: big-c   文件: TestHSync.java
/** Test hsync via SequenceFiles */
@Test
public void testSequenceFileSync() throws Exception {
  Configuration conf = new HdfsConfiguration();
  MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();

  final FileSystem fs = cluster.getFileSystem();
  final Path p = new Path("/testSequenceFileSync/foo");
  final int len = 1 << 16;
  FSDataOutputStream out = fs.create(p, FsPermission.getDefault(),
      EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK),
      4096, (short) 1, len, null);
  Writer w = SequenceFile.createWriter(new Configuration(),
      Writer.stream(out),
      Writer.keyClass(RandomDatum.class),
      Writer.valueClass(RandomDatum.class),
      Writer.compression(CompressionType.NONE, new DefaultCodec()));
  w.hflush();
  checkSyncMetric(cluster, 0);
  w.hsync();
  checkSyncMetric(cluster, 1);
  int seed = new Random().nextInt();
  RandomDatum.Generator generator = new RandomDatum.Generator(seed);
  generator.next();
  w.append(generator.getKey(), generator.getValue());
  w.hsync();
  checkSyncMetric(cluster, 2);
  w.close();
  checkSyncMetric(cluster, 2);
  out.close();
  checkSyncMetric(cluster, 3);
  cluster.shutdown();
}
 
源代码19 项目: beam   文件: HadoopExternalSorter.java
/**
 * Initializes the hadoop sorter. Does some local file system setup, and is somewhat expensive
 * (~20 ms on local machine). Only executed when necessary.
 */
private void initHadoopSorter() throws IOException {
  if (!initialized) {
    tempDir = new Path(options.getTempLocation(), "tmp" + UUID.randomUUID().toString());
    paths = new Path[] {new Path(tempDir, "test.seq")};

    JobConf conf = new JobConf();
    // Sets directory for intermediate files created during merge of merge sort
    conf.set("io.seqfile.local.dir", tempDir.toUri().getPath());

    writer =
        SequenceFile.createWriter(
            conf,
            Writer.valueClass(BytesWritable.class),
            Writer.keyClass(BytesWritable.class),
            Writer.file(paths[0]),
            Writer.compression(CompressionType.NONE));

    FileSystem fs = FileSystem.getLocal(conf);
    // Directory has to exist for Hadoop to recognize it as deletable on exit
    fs.mkdirs(tempDir);
    fs.deleteOnExit(tempDir);

    sorter =
        new SequenceFile.Sorter(
            fs, new BytesWritable.Comparator(), BytesWritable.class, BytesWritable.class, conf);
    sorter.setMemory(options.getMemoryMB() * 1024 * 1024);

    initialized = true;
  }
}
 
源代码20 项目: pulsar   文件: HdfsSequentialTextSink.java
@Override
public Writer getWriter() throws IOException {
   counter = new AtomicLong(0);

   return SequenceFile
            .createWriter(
               getConfiguration(),
               getOptions().toArray(new Option[getOptions().size()]));
}
 
源代码21 项目: pulsar   文件: HdfsSequentialTextSink.java
@Override
protected List<Option> getOptions() throws IllegalArgumentException, IOException {
    List<Option> opts = super.getOptions();
    opts.add(Writer.keyClass(LongWritable.class));
    opts.add(Writer.valueClass(Text.class));
    return opts;
}
 
源代码22 项目: pulsar   文件: HdfsAbstractSequenceFileSink.java
protected List<Option> getOptions() throws IllegalArgumentException, IOException {
    List<Option> list = new ArrayList<Option>();
    list.add(Writer.stream(getHdfsStream()));

    if (getCompressionCodec() != null) {
        list.add(Writer.compression(SequenceFile.CompressionType.RECORD, getCompressionCodec()));
    }
    return list;
}
 
源代码23 项目: pulsar   文件: HdfsTextSink.java
@Override
protected List<Option> getOptions() throws IllegalArgumentException, IOException {
    List<Option> opts = super.getOptions();
    opts.add(Writer.keyClass(Text.class));
    opts.add(Writer.valueClass(Text.class));
    return opts;
}
 
源代码24 项目: pulsar   文件: HdfsSequentialTextSink.java
@Override
public Writer getWriter() throws IOException {
   counter = new AtomicLong(0);

   return SequenceFile
            .createWriter(
               getConfiguration(),
               getOptions().toArray(new Option[getOptions().size()]));
}
 
源代码25 项目: pulsar   文件: HdfsSequentialTextSink.java
@Override
protected List<Option> getOptions() throws IllegalArgumentException, IOException {
    List<Option> opts = super.getOptions();
    opts.add(Writer.keyClass(LongWritable.class));
    opts.add(Writer.valueClass(Text.class));
    return opts;
}
 
源代码26 项目: pulsar   文件: HdfsAbstractSequenceFileSink.java
protected List<Option> getOptions() throws IllegalArgumentException, IOException {
    List<Option> list = new ArrayList<Option>();
    list.add(Writer.stream(getHdfsStream()));

    if (getCompressionCodec() != null) {
        list.add(Writer.compression(SequenceFile.CompressionType.RECORD, getCompressionCodec()));
    }
    return list;
}
 
源代码27 项目: pulsar   文件: HdfsTextSink.java
@Override
protected List<Option> getOptions() throws IllegalArgumentException, IOException {
    List<Option> opts = super.getOptions();
    opts.add(Writer.keyClass(Text.class));
    opts.add(Writer.valueClass(Text.class));
    return opts;
}
 
源代码28 项目: compiler   文件: GitConnector.java
public GitConnector(String path, String projectName, Writer astWriter, long astWriterLen, Writer commitWriter, long commitWriterLen, Writer contentWriter, long contentWriterLen) {
	this(path, projectName);
	this.astWriter = astWriter;
	this.commitWriter = commitWriter;
	this.contentWriter = contentWriter;
	this.astWriterLen = astWriterLen;
	this.commitWriterLen = commitWriterLen;
	this.contentWriterLen = contentWriterLen;
}
 
源代码29 项目: Kylin   文件: CopySeq.java
public static void copyTo64MB(String src, String dst) throws IOException {
    Configuration hconf = new Configuration();
    Path srcPath = new Path(src);
    Path dstPath = new Path(dst);

    FileSystem fs = FileSystem.get(hconf);
    long srcSize = fs.getFileStatus(srcPath).getLen();
    int copyTimes = (int) (67108864 / srcSize); // 64 MB
    System.out.println("Copy " + copyTimes + " times");

    Reader reader = new Reader(hconf, SequenceFile.Reader.file(srcPath));
    Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
    Text value = new Text();

    Writer writer = SequenceFile.createWriter(hconf, Writer.file(dstPath), Writer.keyClass(key.getClass()), Writer.valueClass(Text.class), Writer.compression(CompressionType.BLOCK, getLZOCodec(hconf)));

    int count = 0;
    while (reader.next(key, value)) {
        for (int i = 0; i < copyTimes; i++) {
            writer.append(key, value);
            count++;
        }
    }

    System.out.println("Len: " + writer.getLength());
    System.out.println("Rows: " + count);

    reader.close();
    writer.close();
}
 
private void createCacheFile(Path file, SegmentKey segmentKey) throws IOException {
  LOG.info("Building cache for segment [{0}] to [{1}]", segmentKey, file);
  Path tmpPath = getTmpWriterPath(file.getParent());
  try (Writer writer = createWriter(_configuration, tmpPath)) {
    DirectoryReader reader = getReader();
    for (AtomicReaderContext context : reader.leaves()) {
      SegmentReader segmentReader = AtomicReaderUtil.getSegmentReader(context.reader());
      if (segmentReader.getSegmentName().equals(segmentKey.getSegmentName())) {
        writeRowIds(writer, segmentReader);
        break;
      }
    }
  }
  commitWriter(_configuration, file, tmpPath);
}
 
 类所在包
 类方法
 同包方法