org.apache.hadoop.io.SequenceFile.Reader#file ( )源码实例Demo

下面列出了org.apache.hadoop.io.SequenceFile.Reader#file ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hadoop   文件: DistCpV1.java
static private void finalize(Configuration conf, JobConf jobconf,
    final Path destPath, String presevedAttributes) throws IOException {
  if (presevedAttributes == null) {
    return;
  }
  EnumSet<FileAttribute> preseved = FileAttribute.parse(presevedAttributes);
  if (!preseved.contains(FileAttribute.USER)
      && !preseved.contains(FileAttribute.GROUP)
      && !preseved.contains(FileAttribute.PERMISSION)) {
    return;
  }

  FileSystem dstfs = destPath.getFileSystem(conf);
  Path dstdirlist = new Path(jobconf.get(DST_DIR_LIST_LABEL));
  try (SequenceFile.Reader in =
      new SequenceFile.Reader(jobconf, Reader.file(dstdirlist))) {
    Text dsttext = new Text();
    FilePair pair = new FilePair(); 
    for(; in.next(dsttext, pair); ) {
      Path absdst = new Path(destPath, pair.output);
      updateDestStatus(pair.input, dstfs.getFileStatus(absdst),
          preseved, dstfs);
    }
  }
}
 
源代码2 项目: big-c   文件: DistCpV1.java
static private void finalize(Configuration conf, JobConf jobconf,
    final Path destPath, String presevedAttributes) throws IOException {
  if (presevedAttributes == null) {
    return;
  }
  EnumSet<FileAttribute> preseved = FileAttribute.parse(presevedAttributes);
  if (!preseved.contains(FileAttribute.USER)
      && !preseved.contains(FileAttribute.GROUP)
      && !preseved.contains(FileAttribute.PERMISSION)) {
    return;
  }

  FileSystem dstfs = destPath.getFileSystem(conf);
  Path dstdirlist = new Path(jobconf.get(DST_DIR_LIST_LABEL));
  try (SequenceFile.Reader in =
      new SequenceFile.Reader(jobconf, Reader.file(dstdirlist))) {
    Text dsttext = new Text();
    FilePair pair = new FilePair(); 
    for(; in.next(dsttext, pair); ) {
      Path absdst = new Path(destPath, pair.output);
      updateDestStatus(pair.input, dstfs.getFileStatus(absdst),
          preseved, dstfs);
    }
  }
}
 
源代码3 项目: big-data-lite   文件: SequenceFileRead.java
public static void main(String[] args) throws IOException {
String uri = args[0];
       String split = args[1];
Configuration conf = new Configuration();
Path path = new Path(uri);
SequenceFile.Reader reader = null;
try {		
reader = new SequenceFile.Reader(conf, Reader.file(path));
       Text key = new Text();
       OrdImageWritable value = new OrdImageWritable();

       int num = 0;

while (reader.next(key, value)) {
           System.out.println(key.toString() + " " + value.getByteLength());
           ImageIO.write(value.getImage(), "jpg", new File("image" +split+"_" + num++ + ".jpg"));
}
} finally {
	IOUtils.closeStream(reader);
	}		
}
 
源代码4 项目: alchemy   文件: SequenceFileUtil.java
public static void readSequenceFile(String path) throws Exception{
	Reader.Option filePath = Reader.file(new Path(path));//指定文件路径
	Reader sqReader = new Reader(configuration,filePath);//构造read而对象
	
	Writable key = (Writable)ReflectionUtils.newInstance(sqReader.getKeyClass(), configuration);
	Writable value = (Writable)ReflectionUtils.newInstance(sqReader.getValueClass(), configuration);
	
	while(sqReader.next(key, value)){
		logger.info("key:"+key+",value:"+value);
	}
}
 
源代码5 项目: localization_nifi   文件: ValueReader.java
@Override
public Set<FlowFile> readSequenceFile(final Path file, Configuration configuration, FileSystem fileSystem) throws IOException {

    Set<FlowFile> flowFiles = new HashSet<>();
    final SequenceFile.Reader reader = new SequenceFile.Reader(configuration, Reader.file(fileSystem.makeQualified(file)));
    final String inputfileName = file.getName() + "." + System.nanoTime() + ".";
    int counter = 0;
    LOG.debug("Reading from sequence file {}", new Object[]{file});
    final OutputStreamWritableCallback writer = new OutputStreamWritableCallback(reader);
    Text key = new Text();
    try {
        while (reader.next(key)) {
            String fileName = key.toString();
            // the key may be a file name, and may not
            if (LOOKS_LIKE_FILENAME.matcher(fileName).matches()) {
                if (fileName.contains(File.separator)) {
                    fileName = StringUtils.substringAfterLast(fileName, File.separator);
                }
                fileName = fileName + "." + System.nanoTime();
            } else {
                fileName = inputfileName + ++counter;
            }
            FlowFile flowFile = session.create();
            flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName);
            try {
                flowFile = session.write(flowFile, writer);
                flowFiles.add(flowFile);
            } catch (ProcessException e) {
                LOG.error("Could not write to flowfile {}", new Object[]{flowFile}, e);
                session.remove(flowFile);
            }
            key.clear();
        }
    } finally {
        IOUtils.closeQuietly(reader);
    }

    return flowFiles;
}
 
源代码6 项目: recsys-offline   文件: UpdateClusterJob.java
public void map(ImmutableBytesWritable row, Result result,
		Context context) throws IOException, InterruptedException {
	String yrstr = Bytes.toString(result.getValue(
			Constants.hbase_column_family.getBytes(),
			Constants.hbase_column_yearrate.getBytes()));
	String rltstr = Bytes.toString(result.getValue(
			Constants.hbase_column_family.getBytes(),
			Constants.hbase_column_repaylimittime.getBytes()));

	List<String> list = HdfsHelper
			.ls(Constants.hdfs_kmeans_point_output_path);
	String clusterid = null;
	for (String file : list) {
		if (file.contains("_")) {
			continue;
		}
		SequenceFile.Reader reader = new SequenceFile.Reader(
				HBaseContext.config, Reader.file(new Path(file)));
		IntWritable clusterId = new IntWritable();
		WeightedPropertyVectorWritable value = new WeightedPropertyVectorWritable();
		while (reader.next(clusterId, value)) {
			String yearrate = String.valueOf(value.getVector().get(0));
			String repaylimittime = String.valueOf(value.getVector()
					.get(1));
			if (yrstr.equals(yearrate) && rltstr.equals(repaylimittime)) {
				clusterid = clusterId.toString();
				break;
			}
		}

		reader.close();
	}

	key.set(row.get());
	value.set(clusterid);
	clusterid = null;
	context.write(key, value);
}
 
源代码7 项目: geowave   文件: GeoWaveNNIT.java
private int readFile() throws IllegalArgumentException, IOException {
  int count = 0;
  final FileSystem fs = FileSystem.get(MapReduceTestUtils.getConfiguration());
  final FileStatus[] fss =
      fs.listStatus(
          new Path(
              TestUtils.TEMP_DIR
                  + File.separator
                  + MapReduceTestEnvironment.HDFS_BASE_DIRECTORY
                  + "/t1/pairs"));
  for (final FileStatus ifs : fss) {
    if (ifs.isFile() && ifs.getPath().toString().matches(".*part-r-0000[0-9]")) {
      try (SequenceFile.Reader reader =
          new SequenceFile.Reader(
              MapReduceTestUtils.getConfiguration(),
              Reader.file(ifs.getPath()))) {

        final Text key = new Text();
        final Text val = new Text();

        while (reader.next(key, val)) {
          count++;
        }
      }
    }
  }
  return count;
}
 
源代码8 项目: nifi   文件: ValueReader.java
@Override
public Set<FlowFile> readSequenceFile(final Path file, Configuration configuration, FileSystem fileSystem) throws IOException {

    Set<FlowFile> flowFiles = new HashSet<>();
    final SequenceFile.Reader reader = new SequenceFile.Reader(configuration, Reader.file(fileSystem.makeQualified(file)));
    final String inputfileName = file.getName() + "." + System.nanoTime() + ".";
    int counter = 0;
    LOG.debug("Reading from sequence file {}", new Object[]{file});
    final OutputStreamWritableCallback writer = new OutputStreamWritableCallback(reader);
    Text key = new Text();
    try {
        while (reader.next(key)) {
            String fileName = key.toString();
            // the key may be a file name, and may not
            if (LOOKS_LIKE_FILENAME.matcher(fileName).matches()) {
                if (fileName.contains(File.separator)) {
                    fileName = StringUtils.substringAfterLast(fileName, File.separator);
                }
                fileName = fileName + "." + System.nanoTime();
            } else {
                fileName = inputfileName + ++counter;
            }
            FlowFile flowFile = session.create();
            flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName);
            try {
                flowFile = session.write(flowFile, writer);
                flowFiles.add(flowFile);
            } catch (ProcessException e) {
                LOG.error("Could not write to flowfile {}", new Object[]{flowFile}, e);
                session.remove(flowFile);
            }
            key.clear();
        }
    } finally {
        IOUtils.closeQuietly(reader);
    }

    return flowFiles;
}
 
源代码9 项目: tez   文件: ProtoMessageReader.java
public ProtoMessageReader(Configuration conf, Path filePath, Parser<T> parser) throws IOException {
  this.filePath = filePath;
  // The writer does not flush the length during hflush. Using length options lets us read
  // past length in the FileStatus but it will throw EOFException during a read instead
  // of returning null.
  this.reader = new Reader(conf, Reader.file(filePath), Reader.length(Long.MAX_VALUE));
  this.writable = new ProtoMessageWritable<>(parser);
}
 
源代码10 项目: localization_nifi   文件: KeyValueReader.java
@Override
public Set<FlowFile> readSequenceFile(Path file, Configuration configuration, FileSystem fileSystem) throws IOException {

    final SequenceFile.Reader reader;

    Set<FlowFile> flowFiles = new HashSet<>();
    reader = new SequenceFile.Reader(configuration, Reader.file(fileSystem.makeQualified(file)));
    final Text key = new Text();
    final KeyValueWriterCallback callback = new KeyValueWriterCallback(reader);
    final String inputfileName = file.getName() + "." + System.nanoTime() + ".";
    int counter = 0;
    LOG.debug("Read from SequenceFile: {} ", new Object[]{file});
    try {
        while (reader.next(key)) {
            String fileName = key.toString();
            // the key may be a file name, and may not
            if (LOOKS_LIKE_FILENAME.matcher(fileName).matches()) {
                if (fileName.contains(File.separator)) {
                    fileName = StringUtils.substringAfterLast(fileName, File.separator);
                }
                fileName = fileName + "." + System.nanoTime();
            } else {
                fileName = inputfileName + ++counter;
            }

            FlowFile flowFile = session.create();
            flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName);
            callback.key = key;
            try {
                flowFile = session.write(flowFile, callback);
                flowFiles.add(flowFile);
            } catch (ProcessException e) {
                LOG.error("Could not write to flowfile {}", new Object[]{flowFile}, e);
                session.remove(flowFile);
            }
            key.clear();
        }
    } finally {
        IOUtils.closeQuietly(reader);
    }

    return flowFiles;
}
 
源代码11 项目: hadoop   文件: DistCpV1.java
/**
 * Produce splits such that each is no greater than the quotient of the
 * total size and the number of splits requested.
 * @param job The handle to the JobConf object
 * @param numSplits Number of splits requested
 */
public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
  int cnfiles = job.getInt(SRC_COUNT_LABEL, -1);
  long cbsize = job.getLong(TOTAL_SIZE_LABEL, -1);
  String srcfilelist = job.get(SRC_LIST_LABEL, "");
  if (cnfiles < 0 || cbsize < 0 || "".equals(srcfilelist)) {
    throw new RuntimeException("Invalid metadata: #files(" + cnfiles +
                               ") total_size(" + cbsize + ") listuri(" +
                               srcfilelist + ")");
  }
  Path src = new Path(srcfilelist);
  FileSystem fs = src.getFileSystem(job);
  FileStatus srcst = fs.getFileStatus(src);

  ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
  LongWritable key = new LongWritable();
  FilePair value = new FilePair();
  final long targetsize = cbsize / numSplits;
  long pos = 0L;
  long last = 0L;
  long acc = 0L;
  long cbrem = srcst.getLen();
  try (SequenceFile.Reader sl =
      new SequenceFile.Reader(job, Reader.file(src))) {
    for (; sl.next(key, value); last = sl.getPosition()) {
      // if adding this split would put this split past the target size,
      // cut the last split and put this next file in the next split.
      if (acc + key.get() > targetsize && acc != 0) {
        long splitsize = last - pos;
        splits.add(new FileSplit(src, pos, splitsize, (String[])null));
        cbrem -= splitsize;
        pos = last;
        acc = 0L;
      }
      acc += key.get();
    }
  }
  if (cbrem != 0) {
    splits.add(new FileSplit(src, pos, cbrem, (String[])null));
  }

  return splits.toArray(new FileSplit[splits.size()]);
}
 
源代码12 项目: hadoop   文件: DistCpV1.java
/**
 * Delete the dst files/dirs which do not exist in src
 * 
 * @return total count of files and directories deleted from destination
 * @throws IOException
 */
static private long deleteNonexisting(
    FileSystem dstfs, FileStatus dstroot, Path dstsorted,
    FileSystem jobfs, Path jobdir, JobConf jobconf, Configuration conf
    ) throws IOException {
  if (dstroot.isFile()) {
    throw new IOException("dst must be a directory when option "
        + Options.DELETE.cmd + " is set, but dst (= " + dstroot.getPath()
        + ") is not a directory.");
  }

  //write dst lsr results
  final Path dstlsr = new Path(jobdir, "_distcp_dst_lsr");
  try (final SequenceFile.Writer writer = SequenceFile.createWriter(jobconf,
      Writer.file(dstlsr), Writer.keyClass(Text.class),
      Writer.valueClass(NullWritable.class), Writer.compression(
      SequenceFile.CompressionType.NONE))) {
    //do lsr to get all file statuses in dstroot
    final Stack<FileStatus> lsrstack = new Stack<FileStatus>();
    for(lsrstack.push(dstroot); !lsrstack.isEmpty(); ) {
      final FileStatus status = lsrstack.pop();
      if (status.isDirectory()) {
        for(FileStatus child : dstfs.listStatus(status.getPath())) {
          String relative = makeRelative(dstroot.getPath(), child.getPath());
          writer.append(new Text(relative), NullWritable.get());
          lsrstack.push(child);
        }
      }
    }
  }

  //sort lsr results
  final Path sortedlsr = new Path(jobdir, "_distcp_dst_lsr_sorted");
  SequenceFile.Sorter sorter = new SequenceFile.Sorter(jobfs,
      new Text.Comparator(), Text.class, NullWritable.class, jobconf);
  sorter.sort(dstlsr, sortedlsr);

  //compare lsr list and dst list  
  long deletedPathsCount = 0;
  try (SequenceFile.Reader lsrin =
           new SequenceFile.Reader(jobconf, Reader.file(sortedlsr));
       SequenceFile.Reader  dstin =
           new SequenceFile.Reader(jobconf, Reader.file(dstsorted))) {
    //compare sorted lsr list and sorted dst list
    final Text lsrpath = new Text();
    final Text dstpath = new Text();
    final Text dstfrom = new Text();
    final Trash trash = new Trash(dstfs, conf);
    Path lastpath = null;

    boolean hasnext = dstin.next(dstpath, dstfrom);
    while (lsrin.next(lsrpath, NullWritable.get())) {
      int dst_cmp_lsr = dstpath.compareTo(lsrpath);
      while (hasnext && dst_cmp_lsr < 0) {
        hasnext = dstin.next(dstpath, dstfrom);
        dst_cmp_lsr = dstpath.compareTo(lsrpath);
      }
      
      if (dst_cmp_lsr == 0) {
        //lsrpath exists in dst, skip it
        hasnext = dstin.next(dstpath, dstfrom);
      } else {
        //lsrpath does not exist, delete it
        final Path rmpath = new Path(dstroot.getPath(), lsrpath.toString());
        ++deletedPathsCount;
        if ((lastpath == null || !isAncestorPath(lastpath, rmpath))) {
          if (!(trash.moveToTrash(rmpath) || dstfs.delete(rmpath, true))) {
            throw new IOException("Failed to delete " + rmpath);
          }
          lastpath = rmpath;
        }
      }
    }
  }
  return deletedPathsCount;
}
 
源代码13 项目: big-c   文件: DistCpV1.java
/**
 * Produce splits such that each is no greater than the quotient of the
 * total size and the number of splits requested.
 * @param job The handle to the JobConf object
 * @param numSplits Number of splits requested
 */
public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
  int cnfiles = job.getInt(SRC_COUNT_LABEL, -1);
  long cbsize = job.getLong(TOTAL_SIZE_LABEL, -1);
  String srcfilelist = job.get(SRC_LIST_LABEL, "");
  if (cnfiles < 0 || cbsize < 0 || "".equals(srcfilelist)) {
    throw new RuntimeException("Invalid metadata: #files(" + cnfiles +
                               ") total_size(" + cbsize + ") listuri(" +
                               srcfilelist + ")");
  }
  Path src = new Path(srcfilelist);
  FileSystem fs = src.getFileSystem(job);
  FileStatus srcst = fs.getFileStatus(src);

  ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
  LongWritable key = new LongWritable();
  FilePair value = new FilePair();
  final long targetsize = cbsize / numSplits;
  long pos = 0L;
  long last = 0L;
  long acc = 0L;
  long cbrem = srcst.getLen();
  try (SequenceFile.Reader sl =
      new SequenceFile.Reader(job, Reader.file(src))) {
    for (; sl.next(key, value); last = sl.getPosition()) {
      // if adding this split would put this split past the target size,
      // cut the last split and put this next file in the next split.
      if (acc + key.get() > targetsize && acc != 0) {
        long splitsize = last - pos;
        splits.add(new FileSplit(src, pos, splitsize, (String[])null));
        cbrem -= splitsize;
        pos = last;
        acc = 0L;
      }
      acc += key.get();
    }
  }
  if (cbrem != 0) {
    splits.add(new FileSplit(src, pos, cbrem, (String[])null));
  }

  return splits.toArray(new FileSplit[splits.size()]);
}
 
源代码14 项目: big-c   文件: DistCpV1.java
/**
 * Delete the dst files/dirs which do not exist in src
 * 
 * @return total count of files and directories deleted from destination
 * @throws IOException
 */
static private long deleteNonexisting(
    FileSystem dstfs, FileStatus dstroot, Path dstsorted,
    FileSystem jobfs, Path jobdir, JobConf jobconf, Configuration conf
    ) throws IOException {
  if (dstroot.isFile()) {
    throw new IOException("dst must be a directory when option "
        + Options.DELETE.cmd + " is set, but dst (= " + dstroot.getPath()
        + ") is not a directory.");
  }

  //write dst lsr results
  final Path dstlsr = new Path(jobdir, "_distcp_dst_lsr");
  try (final SequenceFile.Writer writer = SequenceFile.createWriter(jobconf,
      Writer.file(dstlsr), Writer.keyClass(Text.class),
      Writer.valueClass(NullWritable.class), Writer.compression(
      SequenceFile.CompressionType.NONE))) {
    //do lsr to get all file statuses in dstroot
    final Stack<FileStatus> lsrstack = new Stack<FileStatus>();
    for(lsrstack.push(dstroot); !lsrstack.isEmpty(); ) {
      final FileStatus status = lsrstack.pop();
      if (status.isDirectory()) {
        for(FileStatus child : dstfs.listStatus(status.getPath())) {
          String relative = makeRelative(dstroot.getPath(), child.getPath());
          writer.append(new Text(relative), NullWritable.get());
          lsrstack.push(child);
        }
      }
    }
  }

  //sort lsr results
  final Path sortedlsr = new Path(jobdir, "_distcp_dst_lsr_sorted");
  SequenceFile.Sorter sorter = new SequenceFile.Sorter(jobfs,
      new Text.Comparator(), Text.class, NullWritable.class, jobconf);
  sorter.sort(dstlsr, sortedlsr);

  //compare lsr list and dst list  
  long deletedPathsCount = 0;
  try (SequenceFile.Reader lsrin =
           new SequenceFile.Reader(jobconf, Reader.file(sortedlsr));
       SequenceFile.Reader  dstin =
           new SequenceFile.Reader(jobconf, Reader.file(dstsorted))) {
    //compare sorted lsr list and sorted dst list
    final Text lsrpath = new Text();
    final Text dstpath = new Text();
    final Text dstfrom = new Text();
    final Trash trash = new Trash(dstfs, conf);
    Path lastpath = null;

    boolean hasnext = dstin.next(dstpath, dstfrom);
    while (lsrin.next(lsrpath, NullWritable.get())) {
      int dst_cmp_lsr = dstpath.compareTo(lsrpath);
      while (hasnext && dst_cmp_lsr < 0) {
        hasnext = dstin.next(dstpath, dstfrom);
        dst_cmp_lsr = dstpath.compareTo(lsrpath);
      }
      
      if (dst_cmp_lsr == 0) {
        //lsrpath exists in dst, skip it
        hasnext = dstin.next(dstpath, dstfrom);
      } else {
        //lsrpath does not exist, delete it
        final Path rmpath = new Path(dstroot.getPath(), lsrpath.toString());
        ++deletedPathsCount;
        if ((lastpath == null || !isAncestorPath(lastpath, rmpath))) {
          if (!(trash.moveToTrash(rmpath) || dstfs.delete(rmpath, true))) {
            throw new IOException("Failed to delete " + rmpath);
          }
          lastpath = rmpath;
        }
      }
    }
  }
  return deletedPathsCount;
}
 
源代码15 项目: nifi   文件: KeyValueReader.java
@Override
public Set<FlowFile> readSequenceFile(Path file, Configuration configuration, FileSystem fileSystem) throws IOException {

    final SequenceFile.Reader reader;

    Set<FlowFile> flowFiles = new HashSet<>();
    reader = new SequenceFile.Reader(configuration, Reader.file(fileSystem.makeQualified(file)));
    final Text key = new Text();
    final KeyValueWriterCallback callback = new KeyValueWriterCallback(reader);
    final String inputfileName = file.getName() + "." + System.nanoTime() + ".";
    int counter = 0;
    LOG.debug("Read from SequenceFile: {} ", new Object[]{file});
    try {
        while (reader.next(key)) {
            String fileName = key.toString();
            // the key may be a file name, and may not
            if (LOOKS_LIKE_FILENAME.matcher(fileName).matches()) {
                if (fileName.contains(File.separator)) {
                    fileName = StringUtils.substringAfterLast(fileName, File.separator);
                }
                fileName = fileName + "." + System.nanoTime();
            } else {
                fileName = inputfileName + ++counter;
            }

            FlowFile flowFile = session.create();
            flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName);
            callback.key = key;
            try {
                flowFile = session.write(flowFile, callback);
                flowFiles.add(flowFile);
            } catch (ProcessException e) {
                LOG.error("Could not write to flowfile {}", new Object[]{flowFile}, e);
                session.remove(flowFile);
            }
            key.clear();
        }
    } finally {
        IOUtils.closeQuietly(reader);
    }

    return flowFiles;
}