下面列出了org.apache.hadoop.io.SequenceFile.Reader#file ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
static private void finalize(Configuration conf, JobConf jobconf,
final Path destPath, String presevedAttributes) throws IOException {
if (presevedAttributes == null) {
return;
}
EnumSet<FileAttribute> preseved = FileAttribute.parse(presevedAttributes);
if (!preseved.contains(FileAttribute.USER)
&& !preseved.contains(FileAttribute.GROUP)
&& !preseved.contains(FileAttribute.PERMISSION)) {
return;
}
FileSystem dstfs = destPath.getFileSystem(conf);
Path dstdirlist = new Path(jobconf.get(DST_DIR_LIST_LABEL));
try (SequenceFile.Reader in =
new SequenceFile.Reader(jobconf, Reader.file(dstdirlist))) {
Text dsttext = new Text();
FilePair pair = new FilePair();
for(; in.next(dsttext, pair); ) {
Path absdst = new Path(destPath, pair.output);
updateDestStatus(pair.input, dstfs.getFileStatus(absdst),
preseved, dstfs);
}
}
}
static private void finalize(Configuration conf, JobConf jobconf,
final Path destPath, String presevedAttributes) throws IOException {
if (presevedAttributes == null) {
return;
}
EnumSet<FileAttribute> preseved = FileAttribute.parse(presevedAttributes);
if (!preseved.contains(FileAttribute.USER)
&& !preseved.contains(FileAttribute.GROUP)
&& !preseved.contains(FileAttribute.PERMISSION)) {
return;
}
FileSystem dstfs = destPath.getFileSystem(conf);
Path dstdirlist = new Path(jobconf.get(DST_DIR_LIST_LABEL));
try (SequenceFile.Reader in =
new SequenceFile.Reader(jobconf, Reader.file(dstdirlist))) {
Text dsttext = new Text();
FilePair pair = new FilePair();
for(; in.next(dsttext, pair); ) {
Path absdst = new Path(destPath, pair.output);
updateDestStatus(pair.input, dstfs.getFileStatus(absdst),
preseved, dstfs);
}
}
}
public static void main(String[] args) throws IOException {
String uri = args[0];
String split = args[1];
Configuration conf = new Configuration();
Path path = new Path(uri);
SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(conf, Reader.file(path));
Text key = new Text();
OrdImageWritable value = new OrdImageWritable();
int num = 0;
while (reader.next(key, value)) {
System.out.println(key.toString() + " " + value.getByteLength());
ImageIO.write(value.getImage(), "jpg", new File("image" +split+"_" + num++ + ".jpg"));
}
} finally {
IOUtils.closeStream(reader);
}
}
public static void readSequenceFile(String path) throws Exception{
Reader.Option filePath = Reader.file(new Path(path));//指定文件路径
Reader sqReader = new Reader(configuration,filePath);//构造read而对象
Writable key = (Writable)ReflectionUtils.newInstance(sqReader.getKeyClass(), configuration);
Writable value = (Writable)ReflectionUtils.newInstance(sqReader.getValueClass(), configuration);
while(sqReader.next(key, value)){
logger.info("key:"+key+",value:"+value);
}
}
@Override
public Set<FlowFile> readSequenceFile(final Path file, Configuration configuration, FileSystem fileSystem) throws IOException {
Set<FlowFile> flowFiles = new HashSet<>();
final SequenceFile.Reader reader = new SequenceFile.Reader(configuration, Reader.file(fileSystem.makeQualified(file)));
final String inputfileName = file.getName() + "." + System.nanoTime() + ".";
int counter = 0;
LOG.debug("Reading from sequence file {}", new Object[]{file});
final OutputStreamWritableCallback writer = new OutputStreamWritableCallback(reader);
Text key = new Text();
try {
while (reader.next(key)) {
String fileName = key.toString();
// the key may be a file name, and may not
if (LOOKS_LIKE_FILENAME.matcher(fileName).matches()) {
if (fileName.contains(File.separator)) {
fileName = StringUtils.substringAfterLast(fileName, File.separator);
}
fileName = fileName + "." + System.nanoTime();
} else {
fileName = inputfileName + ++counter;
}
FlowFile flowFile = session.create();
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName);
try {
flowFile = session.write(flowFile, writer);
flowFiles.add(flowFile);
} catch (ProcessException e) {
LOG.error("Could not write to flowfile {}", new Object[]{flowFile}, e);
session.remove(flowFile);
}
key.clear();
}
} finally {
IOUtils.closeQuietly(reader);
}
return flowFiles;
}
public void map(ImmutableBytesWritable row, Result result,
Context context) throws IOException, InterruptedException {
String yrstr = Bytes.toString(result.getValue(
Constants.hbase_column_family.getBytes(),
Constants.hbase_column_yearrate.getBytes()));
String rltstr = Bytes.toString(result.getValue(
Constants.hbase_column_family.getBytes(),
Constants.hbase_column_repaylimittime.getBytes()));
List<String> list = HdfsHelper
.ls(Constants.hdfs_kmeans_point_output_path);
String clusterid = null;
for (String file : list) {
if (file.contains("_")) {
continue;
}
SequenceFile.Reader reader = new SequenceFile.Reader(
HBaseContext.config, Reader.file(new Path(file)));
IntWritable clusterId = new IntWritable();
WeightedPropertyVectorWritable value = new WeightedPropertyVectorWritable();
while (reader.next(clusterId, value)) {
String yearrate = String.valueOf(value.getVector().get(0));
String repaylimittime = String.valueOf(value.getVector()
.get(1));
if (yrstr.equals(yearrate) && rltstr.equals(repaylimittime)) {
clusterid = clusterId.toString();
break;
}
}
reader.close();
}
key.set(row.get());
value.set(clusterid);
clusterid = null;
context.write(key, value);
}
private int readFile() throws IllegalArgumentException, IOException {
int count = 0;
final FileSystem fs = FileSystem.get(MapReduceTestUtils.getConfiguration());
final FileStatus[] fss =
fs.listStatus(
new Path(
TestUtils.TEMP_DIR
+ File.separator
+ MapReduceTestEnvironment.HDFS_BASE_DIRECTORY
+ "/t1/pairs"));
for (final FileStatus ifs : fss) {
if (ifs.isFile() && ifs.getPath().toString().matches(".*part-r-0000[0-9]")) {
try (SequenceFile.Reader reader =
new SequenceFile.Reader(
MapReduceTestUtils.getConfiguration(),
Reader.file(ifs.getPath()))) {
final Text key = new Text();
final Text val = new Text();
while (reader.next(key, val)) {
count++;
}
}
}
}
return count;
}
@Override
public Set<FlowFile> readSequenceFile(final Path file, Configuration configuration, FileSystem fileSystem) throws IOException {
Set<FlowFile> flowFiles = new HashSet<>();
final SequenceFile.Reader reader = new SequenceFile.Reader(configuration, Reader.file(fileSystem.makeQualified(file)));
final String inputfileName = file.getName() + "." + System.nanoTime() + ".";
int counter = 0;
LOG.debug("Reading from sequence file {}", new Object[]{file});
final OutputStreamWritableCallback writer = new OutputStreamWritableCallback(reader);
Text key = new Text();
try {
while (reader.next(key)) {
String fileName = key.toString();
// the key may be a file name, and may not
if (LOOKS_LIKE_FILENAME.matcher(fileName).matches()) {
if (fileName.contains(File.separator)) {
fileName = StringUtils.substringAfterLast(fileName, File.separator);
}
fileName = fileName + "." + System.nanoTime();
} else {
fileName = inputfileName + ++counter;
}
FlowFile flowFile = session.create();
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName);
try {
flowFile = session.write(flowFile, writer);
flowFiles.add(flowFile);
} catch (ProcessException e) {
LOG.error("Could not write to flowfile {}", new Object[]{flowFile}, e);
session.remove(flowFile);
}
key.clear();
}
} finally {
IOUtils.closeQuietly(reader);
}
return flowFiles;
}
public ProtoMessageReader(Configuration conf, Path filePath, Parser<T> parser) throws IOException {
this.filePath = filePath;
// The writer does not flush the length during hflush. Using length options lets us read
// past length in the FileStatus but it will throw EOFException during a read instead
// of returning null.
this.reader = new Reader(conf, Reader.file(filePath), Reader.length(Long.MAX_VALUE));
this.writable = new ProtoMessageWritable<>(parser);
}
@Override
public Set<FlowFile> readSequenceFile(Path file, Configuration configuration, FileSystem fileSystem) throws IOException {
final SequenceFile.Reader reader;
Set<FlowFile> flowFiles = new HashSet<>();
reader = new SequenceFile.Reader(configuration, Reader.file(fileSystem.makeQualified(file)));
final Text key = new Text();
final KeyValueWriterCallback callback = new KeyValueWriterCallback(reader);
final String inputfileName = file.getName() + "." + System.nanoTime() + ".";
int counter = 0;
LOG.debug("Read from SequenceFile: {} ", new Object[]{file});
try {
while (reader.next(key)) {
String fileName = key.toString();
// the key may be a file name, and may not
if (LOOKS_LIKE_FILENAME.matcher(fileName).matches()) {
if (fileName.contains(File.separator)) {
fileName = StringUtils.substringAfterLast(fileName, File.separator);
}
fileName = fileName + "." + System.nanoTime();
} else {
fileName = inputfileName + ++counter;
}
FlowFile flowFile = session.create();
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName);
callback.key = key;
try {
flowFile = session.write(flowFile, callback);
flowFiles.add(flowFile);
} catch (ProcessException e) {
LOG.error("Could not write to flowfile {}", new Object[]{flowFile}, e);
session.remove(flowFile);
}
key.clear();
}
} finally {
IOUtils.closeQuietly(reader);
}
return flowFiles;
}
/**
* Produce splits such that each is no greater than the quotient of the
* total size and the number of splits requested.
* @param job The handle to the JobConf object
* @param numSplits Number of splits requested
*/
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
int cnfiles = job.getInt(SRC_COUNT_LABEL, -1);
long cbsize = job.getLong(TOTAL_SIZE_LABEL, -1);
String srcfilelist = job.get(SRC_LIST_LABEL, "");
if (cnfiles < 0 || cbsize < 0 || "".equals(srcfilelist)) {
throw new RuntimeException("Invalid metadata: #files(" + cnfiles +
") total_size(" + cbsize + ") listuri(" +
srcfilelist + ")");
}
Path src = new Path(srcfilelist);
FileSystem fs = src.getFileSystem(job);
FileStatus srcst = fs.getFileStatus(src);
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
LongWritable key = new LongWritable();
FilePair value = new FilePair();
final long targetsize = cbsize / numSplits;
long pos = 0L;
long last = 0L;
long acc = 0L;
long cbrem = srcst.getLen();
try (SequenceFile.Reader sl =
new SequenceFile.Reader(job, Reader.file(src))) {
for (; sl.next(key, value); last = sl.getPosition()) {
// if adding this split would put this split past the target size,
// cut the last split and put this next file in the next split.
if (acc + key.get() > targetsize && acc != 0) {
long splitsize = last - pos;
splits.add(new FileSplit(src, pos, splitsize, (String[])null));
cbrem -= splitsize;
pos = last;
acc = 0L;
}
acc += key.get();
}
}
if (cbrem != 0) {
splits.add(new FileSplit(src, pos, cbrem, (String[])null));
}
return splits.toArray(new FileSplit[splits.size()]);
}
/**
* Delete the dst files/dirs which do not exist in src
*
* @return total count of files and directories deleted from destination
* @throws IOException
*/
static private long deleteNonexisting(
FileSystem dstfs, FileStatus dstroot, Path dstsorted,
FileSystem jobfs, Path jobdir, JobConf jobconf, Configuration conf
) throws IOException {
if (dstroot.isFile()) {
throw new IOException("dst must be a directory when option "
+ Options.DELETE.cmd + " is set, but dst (= " + dstroot.getPath()
+ ") is not a directory.");
}
//write dst lsr results
final Path dstlsr = new Path(jobdir, "_distcp_dst_lsr");
try (final SequenceFile.Writer writer = SequenceFile.createWriter(jobconf,
Writer.file(dstlsr), Writer.keyClass(Text.class),
Writer.valueClass(NullWritable.class), Writer.compression(
SequenceFile.CompressionType.NONE))) {
//do lsr to get all file statuses in dstroot
final Stack<FileStatus> lsrstack = new Stack<FileStatus>();
for(lsrstack.push(dstroot); !lsrstack.isEmpty(); ) {
final FileStatus status = lsrstack.pop();
if (status.isDirectory()) {
for(FileStatus child : dstfs.listStatus(status.getPath())) {
String relative = makeRelative(dstroot.getPath(), child.getPath());
writer.append(new Text(relative), NullWritable.get());
lsrstack.push(child);
}
}
}
}
//sort lsr results
final Path sortedlsr = new Path(jobdir, "_distcp_dst_lsr_sorted");
SequenceFile.Sorter sorter = new SequenceFile.Sorter(jobfs,
new Text.Comparator(), Text.class, NullWritable.class, jobconf);
sorter.sort(dstlsr, sortedlsr);
//compare lsr list and dst list
long deletedPathsCount = 0;
try (SequenceFile.Reader lsrin =
new SequenceFile.Reader(jobconf, Reader.file(sortedlsr));
SequenceFile.Reader dstin =
new SequenceFile.Reader(jobconf, Reader.file(dstsorted))) {
//compare sorted lsr list and sorted dst list
final Text lsrpath = new Text();
final Text dstpath = new Text();
final Text dstfrom = new Text();
final Trash trash = new Trash(dstfs, conf);
Path lastpath = null;
boolean hasnext = dstin.next(dstpath, dstfrom);
while (lsrin.next(lsrpath, NullWritable.get())) {
int dst_cmp_lsr = dstpath.compareTo(lsrpath);
while (hasnext && dst_cmp_lsr < 0) {
hasnext = dstin.next(dstpath, dstfrom);
dst_cmp_lsr = dstpath.compareTo(lsrpath);
}
if (dst_cmp_lsr == 0) {
//lsrpath exists in dst, skip it
hasnext = dstin.next(dstpath, dstfrom);
} else {
//lsrpath does not exist, delete it
final Path rmpath = new Path(dstroot.getPath(), lsrpath.toString());
++deletedPathsCount;
if ((lastpath == null || !isAncestorPath(lastpath, rmpath))) {
if (!(trash.moveToTrash(rmpath) || dstfs.delete(rmpath, true))) {
throw new IOException("Failed to delete " + rmpath);
}
lastpath = rmpath;
}
}
}
}
return deletedPathsCount;
}
/**
* Produce splits such that each is no greater than the quotient of the
* total size and the number of splits requested.
* @param job The handle to the JobConf object
* @param numSplits Number of splits requested
*/
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
int cnfiles = job.getInt(SRC_COUNT_LABEL, -1);
long cbsize = job.getLong(TOTAL_SIZE_LABEL, -1);
String srcfilelist = job.get(SRC_LIST_LABEL, "");
if (cnfiles < 0 || cbsize < 0 || "".equals(srcfilelist)) {
throw new RuntimeException("Invalid metadata: #files(" + cnfiles +
") total_size(" + cbsize + ") listuri(" +
srcfilelist + ")");
}
Path src = new Path(srcfilelist);
FileSystem fs = src.getFileSystem(job);
FileStatus srcst = fs.getFileStatus(src);
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
LongWritable key = new LongWritable();
FilePair value = new FilePair();
final long targetsize = cbsize / numSplits;
long pos = 0L;
long last = 0L;
long acc = 0L;
long cbrem = srcst.getLen();
try (SequenceFile.Reader sl =
new SequenceFile.Reader(job, Reader.file(src))) {
for (; sl.next(key, value); last = sl.getPosition()) {
// if adding this split would put this split past the target size,
// cut the last split and put this next file in the next split.
if (acc + key.get() > targetsize && acc != 0) {
long splitsize = last - pos;
splits.add(new FileSplit(src, pos, splitsize, (String[])null));
cbrem -= splitsize;
pos = last;
acc = 0L;
}
acc += key.get();
}
}
if (cbrem != 0) {
splits.add(new FileSplit(src, pos, cbrem, (String[])null));
}
return splits.toArray(new FileSplit[splits.size()]);
}
/**
* Delete the dst files/dirs which do not exist in src
*
* @return total count of files and directories deleted from destination
* @throws IOException
*/
static private long deleteNonexisting(
FileSystem dstfs, FileStatus dstroot, Path dstsorted,
FileSystem jobfs, Path jobdir, JobConf jobconf, Configuration conf
) throws IOException {
if (dstroot.isFile()) {
throw new IOException("dst must be a directory when option "
+ Options.DELETE.cmd + " is set, but dst (= " + dstroot.getPath()
+ ") is not a directory.");
}
//write dst lsr results
final Path dstlsr = new Path(jobdir, "_distcp_dst_lsr");
try (final SequenceFile.Writer writer = SequenceFile.createWriter(jobconf,
Writer.file(dstlsr), Writer.keyClass(Text.class),
Writer.valueClass(NullWritable.class), Writer.compression(
SequenceFile.CompressionType.NONE))) {
//do lsr to get all file statuses in dstroot
final Stack<FileStatus> lsrstack = new Stack<FileStatus>();
for(lsrstack.push(dstroot); !lsrstack.isEmpty(); ) {
final FileStatus status = lsrstack.pop();
if (status.isDirectory()) {
for(FileStatus child : dstfs.listStatus(status.getPath())) {
String relative = makeRelative(dstroot.getPath(), child.getPath());
writer.append(new Text(relative), NullWritable.get());
lsrstack.push(child);
}
}
}
}
//sort lsr results
final Path sortedlsr = new Path(jobdir, "_distcp_dst_lsr_sorted");
SequenceFile.Sorter sorter = new SequenceFile.Sorter(jobfs,
new Text.Comparator(), Text.class, NullWritable.class, jobconf);
sorter.sort(dstlsr, sortedlsr);
//compare lsr list and dst list
long deletedPathsCount = 0;
try (SequenceFile.Reader lsrin =
new SequenceFile.Reader(jobconf, Reader.file(sortedlsr));
SequenceFile.Reader dstin =
new SequenceFile.Reader(jobconf, Reader.file(dstsorted))) {
//compare sorted lsr list and sorted dst list
final Text lsrpath = new Text();
final Text dstpath = new Text();
final Text dstfrom = new Text();
final Trash trash = new Trash(dstfs, conf);
Path lastpath = null;
boolean hasnext = dstin.next(dstpath, dstfrom);
while (lsrin.next(lsrpath, NullWritable.get())) {
int dst_cmp_lsr = dstpath.compareTo(lsrpath);
while (hasnext && dst_cmp_lsr < 0) {
hasnext = dstin.next(dstpath, dstfrom);
dst_cmp_lsr = dstpath.compareTo(lsrpath);
}
if (dst_cmp_lsr == 0) {
//lsrpath exists in dst, skip it
hasnext = dstin.next(dstpath, dstfrom);
} else {
//lsrpath does not exist, delete it
final Path rmpath = new Path(dstroot.getPath(), lsrpath.toString());
++deletedPathsCount;
if ((lastpath == null || !isAncestorPath(lastpath, rmpath))) {
if (!(trash.moveToTrash(rmpath) || dstfs.delete(rmpath, true))) {
throw new IOException("Failed to delete " + rmpath);
}
lastpath = rmpath;
}
}
}
}
return deletedPathsCount;
}
@Override
public Set<FlowFile> readSequenceFile(Path file, Configuration configuration, FileSystem fileSystem) throws IOException {
final SequenceFile.Reader reader;
Set<FlowFile> flowFiles = new HashSet<>();
reader = new SequenceFile.Reader(configuration, Reader.file(fileSystem.makeQualified(file)));
final Text key = new Text();
final KeyValueWriterCallback callback = new KeyValueWriterCallback(reader);
final String inputfileName = file.getName() + "." + System.nanoTime() + ".";
int counter = 0;
LOG.debug("Read from SequenceFile: {} ", new Object[]{file});
try {
while (reader.next(key)) {
String fileName = key.toString();
// the key may be a file name, and may not
if (LOOKS_LIKE_FILENAME.matcher(fileName).matches()) {
if (fileName.contains(File.separator)) {
fileName = StringUtils.substringAfterLast(fileName, File.separator);
}
fileName = fileName + "." + System.nanoTime();
} else {
fileName = inputfileName + ++counter;
}
FlowFile flowFile = session.create();
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName);
callback.key = key;
try {
flowFile = session.write(flowFile, callback);
flowFiles.add(flowFile);
} catch (ProcessException e) {
LOG.error("Could not write to flowfile {}", new Object[]{flowFile}, e);
session.remove(flowFile);
}
key.clear();
}
} finally {
IOUtils.closeQuietly(reader);
}
return flowFiles;
}