下面列出了org.apache.hadoop.io.ArrayPrimitiveWritable#org.apache.hadoop.io.SequenceFile 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void createInputFile(String rootName) throws IOException {
cleanup(); // clean up if previous run failed
Path inputFile = new Path(MAP_INPUT_DIR, "in_file");
SequenceFile.Writer writer =
SequenceFile.createWriter(fs, fsConfig, inputFile,
Text.class, LongWritable.class, CompressionType.NONE);
try {
nrFiles = 0;
listSubtree(new Path(rootName), writer);
} finally {
writer.close();
}
LOG.info("Created map input files.");
}
@Override
public void setUp() throws Exception {
pigServer = new PigServer(LOCAL);
File tmpFile = File.createTempFile("test", ".txt");
tmpFileName = tmpFile.getAbsolutePath();
System.err.println("fileName: "+tmpFileName);
Path path = new Path("file:///"+tmpFileName);
JobConf conf = new JobConf();
FileSystem fs = FileSystem.get(path.toUri(), conf);
IntWritable key = new IntWritable();
Text value = new Text();
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(fs, conf, path,
key.getClass(), value.getClass());
for (int i=0; i < DATA.length; i++) {
key.set(i);
value.set(DATA[i]);
writer.append(key, value);
}
} finally {
IOUtils.closeStream(writer);
}
}
public static long readAndAppendAst(Configuration conf, FileSystem fileSystem, MapFile.Writer writer, String fileName, long lastKey) throws IOException {
long newLastKey = lastKey;
SequenceFile.Reader r = new SequenceFile.Reader(fileSystem, new Path(fileName), conf);
LongWritable longKey = new LongWritable();
BytesWritable value = new BytesWritable();
try {
while (r.next(longKey, value)) {
newLastKey = longKey.get() + lastKey;
writer.append(new LongWritable(newLastKey), value);
}
} catch (Exception e) {
System.err.println(fileName);
e.printStackTrace();
} finally {
r.close();
}
return newLastKey;
}
/**
* Get token from the token sequence file.
* @param authPath
* @param proxyUserName
* @return Token for proxyUserName if it exists.
* @throws IOException
*/
private static Optional<Token<?>> getTokenFromSeqFile(String authPath, String proxyUserName) throws IOException {
try (Closer closer = Closer.create()) {
FileSystem localFs = FileSystem.getLocal(new Configuration());
SequenceFile.Reader tokenReader =
closer.register(new SequenceFile.Reader(localFs, new Path(authPath), localFs.getConf()));
Text key = new Text();
Token<?> value = new Token<>();
while (tokenReader.next(key, value)) {
LOG.info("Found token for " + key);
if (key.toString().equals(proxyUserName)) {
return Optional.<Token<?>> of(value);
}
}
}
return Optional.absent();
}
private int checkFileContents(String filePath, String message) throws IOException {
SequenceFile.Reader r = new SequenceFile.Reader(
FileSystem.get(new Configuration()),
new Path(filePath),
new Configuration());
Text routingKey = new Text();
MessageWritable value = new MessageWritable();
StringSerDe serde = new StringSerDe();
int i = 0;
while (r.next(routingKey, value)) {
assertEquals(routingKey.toString(), "routingKey");
assertEquals(serde.deserialize(value.getMessage().getPayload()), message + i);
++i;
}
r.close();
return i;
}
public void reduce(WritableComparable key, Iterator values,
OutputCollector output, Reporter reporter
) throws IOException {
if (first) {
first = false;
MapOutputFile mapOutputFile = new MapOutputFile(taskId.getJobID());
mapOutputFile.setConf(conf);
Path input = mapOutputFile.getInputFile(0, taskId);
FileSystem fs = FileSystem.get(conf);
assertTrue("reduce input exists " + input, fs.exists(input));
SequenceFile.Reader rdr =
new SequenceFile.Reader(fs, input, conf);
assertEquals("is reduce input compressed " + input,
compressInput,
rdr.isCompressed());
rdr.close();
}
}
public static void writeSequenceFile(String path) throws Exception{
Writer.Option filePath = Writer.file(new Path(path));
Writer.Option keyClass = Writer.keyClass(IntWritable.class);
Writer.Option valueClass = Writer.valueClass(Text.class);
Writer.Option compression = Writer.compression(CompressionType.NONE);
Writer writer = SequenceFile.createWriter(configuration, filePath, keyClass, valueClass, compression);
IntWritable key = new IntWritable();
Text value = new Text("");
for(int i=0;i<100;i++){
key.set(i);
value.set("value_"+i);
writer.append(key, value);
}
writer.hflush();
writer.close();
}
@Test
public void testFailOnCloseError() throws IOException {
File inFile = File.createTempFile("TestCopyListingIn", null);
inFile.deleteOnExit();
File outFile = File.createTempFile("TestCopyListingOut", null);
outFile.deleteOnExit();
List<Path> srcs = new ArrayList<Path>();
srcs.add(new Path(inFile.toURI()));
Exception expectedEx = new IOException("boom");
SequenceFile.Writer writer = mock(SequenceFile.Writer.class);
doThrow(expectedEx).when(writer).close();
SimpleCopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS);
DistCpOptions options = new DistCpOptions(srcs, new Path(outFile.toURI()));
Exception actualEx = null;
try {
listing.doBuildListing(writer, options);
} catch (Exception e) {
actualEx = e;
}
Assert.assertNotNull("close writer didn't fail", actualEx);
Assert.assertEquals(expectedEx, actualEx);
}
private void checkSplits(Path listFile, List<InputSplit> splits) throws IOException {
long lastEnd = 0;
//Verify if each split's start is matching with the previous end and
//we are not missing anything
for (InputSplit split : splits) {
FileSplit fileSplit = (FileSplit) split;
long start = fileSplit.getStart();
Assert.assertEquals(lastEnd, start);
lastEnd = start + fileSplit.getLength();
}
//Verify there is nothing more to read from the input file
SequenceFile.Reader reader
= new SequenceFile.Reader(cluster.getFileSystem().getConf(),
SequenceFile.Reader.file(listFile));
try {
reader.seek(lastEnd);
CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
Text srcRelPath = new Text();
Assert.assertFalse(reader.next(srcRelPath, srcFileStatus));
} finally {
IOUtils.closeStream(reader);
}
}
private void createInputFile(String rootName) throws IOException {
cleanup(); // clean up if previous run failed
Path inputFile = new Path(MAP_INPUT_DIR, "in_file");
SequenceFile.Writer writer =
SequenceFile.createWriter(fs, fsConfig, inputFile,
Text.class, LongWritable.class, CompressionType.NONE);
try {
nrFiles = 0;
listSubtree(new Path(rootName), writer);
} finally {
writer.close();
}
LOG.info("Created map input files.");
}
private void checkRefPoints(int numIterations) throws IOException {
for (int i = 0; i <= numIterations; i++) {
Path out = new Path(getTestTempDirPath("output"), "representativePoints-" + i);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
for (FileStatus file : fs.listStatus(out)) {
if (!file.getPath().getName().startsWith(".")) {
SequenceFile.Reader reader = new SequenceFile.Reader(fs, file.getPath(), conf);
try {
Writable clusterId = new IntWritable(0);
VectorWritable point = new VectorWritable();
while (reader.next(clusterId, point)) {
System.out.println("\tC-" + clusterId + ": " + AbstractCluster.formatVector(point.get(), null));
}
} finally {
reader.close();
}
}
}
}
}
@SuppressWarnings("deprecation")
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
Configuration conf = context.getConfiguration();
Path outPath = new Path(conf.get("centroid.path"));
FileSystem fs = FileSystem.get(conf);
fs.delete(outPath, true);
try (SequenceFile.Writer out = SequenceFile.createWriter(fs, context.getConfiguration(), outPath,
ClusterCenter.class, IntWritable.class)) {
final IntWritable value = new IntWritable(0);
for (ClusterCenter center : centers) {
out.append(center, value);
}
}
}
private static int countProduct(IntWritable key, Path[] src,
Configuration conf) throws IOException {
int product = 1;
for (Path p : src) {
int count = 0;
SequenceFile.Reader r = new SequenceFile.Reader(
cluster.getFileSystem(), p, conf);
IntWritable k = new IntWritable();
IntWritable v = new IntWritable();
while (r.next(k, v)) {
if (k.equals(key)) {
count++;
}
}
r.close();
if (count != 0) {
product *= count;
}
}
return product;
}
private void verifyContents(Path listingPath) throws Exception {
SequenceFile.Reader reader = new SequenceFile.Reader(cluster.getFileSystem(),
listingPath, new Configuration());
Text key = new Text();
CopyListingFileStatus value = new CopyListingFileStatus();
Map<String, String> actualValues = new HashMap<String, String>();
while (reader.next(key, value)) {
if (value.isDirectory() && key.toString().equals("")) {
// ignore root with empty relPath, which is an entry to be
// used for preserving root attributes etc.
continue;
}
actualValues.put(value.getPath().toString(), key.toString());
}
Assert.assertEquals(expectedValues.size(), actualValues.size());
for (Map.Entry<String, String> entry : actualValues.entrySet()) {
Assert.assertEquals(entry.getValue(), expectedValues.get(entry.getKey()));
}
}
private void walkOutput(Path output, Configuration conf, ResultReader resultReader) throws IOException {
FileSystem fileSystem = output.getFileSystem(conf);
FileStatus fileStatus = fileSystem.getFileStatus(output);
if (fileStatus.isDir()) {
FileStatus[] listStatus = fileSystem.listStatus(output, new PathFilter() {
@Override
public boolean accept(Path path) {
return !path.getName().startsWith("_");
}
});
for (FileStatus fs : listStatus) {
walkOutput(fs.getPath(), conf, resultReader);
}
} else {
Reader reader = new SequenceFile.Reader(fileSystem, output, conf);
Text rowId = new Text();
TableBlurRecord tableBlurRecord = new TableBlurRecord();
while (reader.next(rowId, tableBlurRecord)) {
resultReader.read(rowId, tableBlurRecord);
}
reader.close();
}
}
@Override
public Object call() throws Exception {
TensorBlock value = new TensorBlock();
TensorIndexes key = new TensorIndexes();
//directly read from sequence files (individual partfiles)
try(SequenceFile.Reader reader = new SequenceFile.Reader(_job, SequenceFile.Reader.file(_path))) {
//note: next(key, value) does not yet exploit the given serialization classes,
//record reader does but is generally slower.
while (reader.next(key, value)) {
if( value.isEmpty(false) )
continue;
int[] lower = new int[_dims.length];
int[] upper = new int[lower.length];
UtilFunctions.getBlockBounds(key, value.getLongDims(), _blen, lower, upper);
_dest.copy(lower, upper, value);
}
}
return null;
}
public static void write_seq() throws IOException {
String uri = "test_file.seq";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
IntWritable key = new IntWritable();
Text value = new Text();
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(fs, conf, path,
key.getClass(), value.getClass());
for (int i = 0; i < 100; i++) {
key.set(100 - i);
value.set(DATA[i % DATA.length]);
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
writer.append(key, value);
}
} finally {
IOUtils.closeStream(writer); }
}
private List<Tuple2<Long, String>> readSequenceFile(File file) throws IOException {
SequenceFile.Reader reader = new SequenceFile.Reader(
configuration, SequenceFile.Reader.file(new org.apache.hadoop.fs.Path(file.toURI())));
LongWritable key = new LongWritable();
Text val = new Text();
ArrayList<Tuple2<Long, String>> results = new ArrayList<>();
while (reader.next(key, val)) {
results.add(new Tuple2<>(key.get(), val.toString()));
}
reader.close();
return results;
}
@Override
public void open(FileSystem fs, Path path) throws IOException {
super.open(fs, path);
if (keyClass == null) {
throw new IllegalStateException("Key Class has not been initialized.");
}
if (valueClass == null) {
throw new IllegalStateException("Value Class has not been initialized.");
}
CompressionCodec codec = null;
Configuration conf = fs.getConf();
if (!compressionCodecName.equals("None")) {
CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
codec = codecFactory.getCodecByName(compressionCodecName);
if (codec == null) {
throw new RuntimeException("Codec " + compressionCodecName + " not found.");
}
}
// the non-deprecated constructor syntax is only available in recent hadoop versions...
writer = SequenceFile.createWriter(conf,
getStream(),
keyClass,
valueClass,
compressionType,
codec);
}
private static void printSequenceFile(FileSystem fs, Path p,
Configuration conf) throws IOException {
SequenceFile.Reader r = new SequenceFile.Reader(fs, p, conf);
Object key = null;
Object value = null;
while ((key = r.next(key)) != null) {
value = r.getCurrentValue(value);
System.out.println(" Row: " + key + ", " + value);
}
r.close();
}
public TextRecordInputStream(FileStatus f) throws IOException {
final Path fpath = f.getPath();
final Configuration lconf = getConf();
r = new SequenceFile.Reader(lconf,
SequenceFile.Reader.file(fpath));
key = ReflectionUtils.newInstance(
r.getKeyClass().asSubclass(WritableComparable.class), lconf);
val = ReflectionUtils.newInstance(
r.getValueClass().asSubclass(Writable.class), lconf);
inbuf = new DataInputBuffer();
outbuf = new DataOutputBuffer();
}
/**
* Write a partition file for the given job, using the Sampler provided.
* Queries the sampler for a sample keyset, sorts by the output key
* comparator, selects the keys for each rank, and writes to the destination
* returned from {@link TotalOrderPartitioner#getPartitionFile}.
*/
@SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator
public static <K,V> void writePartitionFile(Job job, Sampler<K,V> sampler)
throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = job.getConfiguration();
final InputFormat inf =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
int numPartitions = job.getNumReduceTasks();
K[] samples = (K[])sampler.getSample(inf, job);
LOG.info("Using " + samples.length + " samples");
RawComparator<K> comparator =
(RawComparator<K>) job.getSortComparator();
Arrays.sort(samples, comparator);
Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf));
FileSystem fs = dst.getFileSystem(conf);
if (fs.exists(dst)) {
fs.delete(dst, false);
}
SequenceFile.Writer writer = SequenceFile.createWriter(fs,
conf, dst, job.getMapOutputKeyClass(), NullWritable.class);
NullWritable nullValue = NullWritable.get();
float stepSize = samples.length / (float) numPartitions;
int last = -1;
for(int i = 1; i < numPartitions; ++i) {
int k = Math.round(stepSize * i);
while (last >= k && comparator.compare(samples[last], samples[k]) == 0) {
++k;
}
writer.append(samples[k], nullValue);
last = k;
}
writer.close();
}
@Override
public SequenceFileWriter<K, V> create(FSDataOutputStream out) throws IOException {
org.apache.hadoop.fs.FSDataOutputStream stream = new org.apache.hadoop.fs.FSDataOutputStream(out, null);
CompressionCodec compressionCodec = getCompressionCodec(serializableHadoopConfig.get(), compressionCodecName);
SequenceFile.Writer writer = SequenceFile.createWriter(
serializableHadoopConfig.get(),
SequenceFile.Writer.stream(stream),
SequenceFile.Writer.keyClass(keyClass),
SequenceFile.Writer.valueClass(valueClass),
SequenceFile.Writer.compression(compressionType, compressionCodec));
return new SequenceFileWriter<>(writer);
}
private int readFile() throws IllegalArgumentException, IOException {
int count = 0;
final FileSystem fs = FileSystem.get(MapReduceTestUtils.getConfiguration());
final FileStatus[] fss =
fs.listStatus(
new Path(
TestUtils.TEMP_DIR
+ File.separator
+ MapReduceTestEnvironment.HDFS_BASE_DIRECTORY
+ "/t1/pairs"));
for (final FileStatus ifs : fss) {
if (ifs.isFile() && ifs.getPath().toString().matches(".*part-r-0000[0-9]")) {
try (SequenceFile.Reader reader =
new SequenceFile.Reader(
MapReduceTestUtils.getConfiguration(),
Reader.file(ifs.getPath()))) {
final Text key = new Text();
final Text val = new Text();
while (reader.next(key, val)) {
count++;
}
}
}
}
return count;
}
private void create100EntrySequenceFile(Configuration conf) throws Exception {
try (SequenceFile.Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(new Path(INPUT_DIRECTORY_PATH, "sip_info.txt")),
SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(DoubleWritable.class))) {
for (int i = 0; i < 100; i++) {
writer.append(new Text("sip_" + i), new DoubleWritable(i / 100.0));
}
}
}
@Test
public void testDuplicate() {
SequenceFileWriter<Text, Text> writer = new SequenceFileWriter("BZ", SequenceFile.CompressionType.BLOCK);
writer.setSyncOnFlush(true);
SequenceFileWriter<Text, Text> other = writer.duplicate();
assertTrue(StreamWriterBaseComparator.equals(writer, other));
writer.setSyncOnFlush(false);
assertFalse(StreamWriterBaseComparator.equals(writer, other));
}
@Test(timeout = 10000)
public void buildListingForMultipleSources() throws Exception {
FileSystem fs = FileSystem.get(config);
String testRootString = temporaryRoot + "/source";
Path testRoot = new Path(testRootString);
if (fs.exists(testRoot)) {
delete(fs, testRootString);
}
Path sourceDir1 = new Path(testRoot, "foo/baz/");
Path sourceDir2 = new Path(testRoot, "foo/bang/");
Path sourceFile1 = new Path(testRoot, "foo/bar/source.txt");
URI target = URI.create("s3://bucket/target/moo/");
fs.mkdirs(sourceDir1);
fs.mkdirs(sourceDir2);
createFile(fs, new Path(sourceDir1, "baz_1.dat"));
createFile(fs, new Path(sourceDir1, "baz_2.dat"));
createFile(fs, new Path(sourceDir2, "bang_0.dat"));
createFile(fs, sourceFile1.toString());
final Path listFile = new Path(testRoot, temporaryRoot + "/fileList.seq");
listing.buildListing(listFile, options(Arrays.asList(sourceFile1, sourceDir1, sourceDir2), target));
Set<String> expectedRelativePaths = Sets.newHashSet("/source.txt", "/baz_1.dat", "/baz_2.dat", "/bang_0.dat");
try (SequenceFile.Reader reader = new SequenceFile.Reader(config, SequenceFile.Reader.file(listFile))) {
CopyListingFileStatus fileStatus = new CopyListingFileStatus();
Text relativePath = new Text();
int relativePathCount = expectedRelativePaths.size();
for (int i = 0; i < relativePathCount; i++) {
assertThat(reader.next(relativePath, fileStatus), is(true));
assertThat("Expected path not found " + relativePath.toString(),
expectedRelativePaths.remove(relativePath.toString()), is(true));
}
}
assertThat("Expected relativePaths to be empty but was: " + expectedRelativePaths, expectedRelativePaths.isEmpty(),
is(true));
}
public MessagePackSequenceFileReader(LogFilePath path) throws Exception {
Configuration config = new Configuration();
Path fsPath = new Path(path.getLogFilePath());
FileSystem fs = FileUtil.getFileSystem(path.getLogFilePath());
this.mReader = new SequenceFile.Reader(fs, fsPath, config);
this.mKey = (BytesWritable) mReader.getKeyClass().newInstance();
this.mValue = (BytesWritable) mReader.getValueClass().newInstance();
}
private void testSeqFile(CompressionCodec compressionCodec, SequenceFile.CompressionType compressionType)
throws Exception {
RecordWriterManager mgr = managerBuilder()
.dirPathTemplate(getTestDir().toString() + "/${YYYY()}")
.compressionCodec(compressionCodec)
.compressionType(compressionType)
.fileType(HdfsFileType.SEQUENCE_FILE)
.build();
FileSystem fs = FileSystem.get(uri, hdfsConf);
Path file = new Path(getTestDir(), UUID.randomUUID().toString());
long expires = System.currentTimeMillis() + 50000;
RecordWriter writer = mgr.createWriter(fs, file, 50000);
Assert.assertTrue(expires <= writer.getExpiresOn());
Assert.assertFalse(writer.isTextFile());
Assert.assertTrue(writer.isSeqFile());
Record record = RecordCreator.create();
record.set(Field.create("a"));
writer.write(record);
writer.close();
SequenceFile.Reader reader = new SequenceFile.Reader(fs, file, new HdfsConfiguration());
Text key = new Text();
Text value = new Text();
Assert.assertTrue(reader.next(key, value));
Assert.assertNotNull(UUID.fromString(key.toString()));
Assert.assertEquals("a", value.toString().trim());
Assert.assertFalse(reader.next(key, value));
reader.close();
}
public SequenceFileAsBinaryRecordReader(Configuration conf, FileSplit split)
throws IOException {
Path path = split.getPath();
FileSystem fs = path.getFileSystem(conf);
this.in = new SequenceFile.Reader(fs, path, conf);
this.end = split.getStart() + split.getLength();
if (split.getStart() > in.getPosition())
in.sync(split.getStart()); // sync to start
this.start = in.getPosition();
vbytes = in.createValueBytes();
done = start >= end;
}