下面列出了怎么用org.apache.hadoop.io.UTF8的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Create the job configuration.
*/
private JobConf setupJob(int numMaps, int numReduces, String jarFile) {
JobConf jobConf = new JobConf(getConf());
jobConf.setJarByClass(MRBench.class);
FileInputFormat.addInputPath(jobConf, INPUT_DIR);
jobConf.setInputFormat(TextInputFormat.class);
jobConf.setOutputFormat(TextOutputFormat.class);
jobConf.setOutputValueClass(UTF8.class);
jobConf.setMapOutputKeyClass(UTF8.class);
jobConf.setMapOutputValueClass(UTF8.class);
if (null != jarFile) {
jobConf.setJar(jarFile);
}
jobConf.setMapperClass(Map.class);
jobConf.setReducerClass(Reduce.class);
jobConf.setNumMapTasks(numMaps);
jobConf.setNumReduceTasks(numReduces);
jobConf
.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
return jobConf;
}
/**
* Create the job configuration.
*/
private JobConf setupJob(int numMaps, int numReduces, String jarFile) {
JobConf jobConf = new JobConf(getConf());
jobConf.setJarByClass(MRBench.class);
FileInputFormat.addInputPath(jobConf, INPUT_DIR);
jobConf.setInputFormat(TextInputFormat.class);
jobConf.setOutputFormat(TextOutputFormat.class);
jobConf.setOutputValueClass(UTF8.class);
jobConf.setMapOutputKeyClass(UTF8.class);
jobConf.setMapOutputValueClass(UTF8.class);
if (null != jarFile) {
jobConf.setJar(jarFile);
}
jobConf.setMapperClass(Map.class);
jobConf.setReducerClass(Reduce.class);
jobConf.setNumMapTasks(numMaps);
jobConf.setNumReduceTasks(numReduces);
jobConf
.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
return jobConf;
}
@Override
public void readFields(DataInput in) throws IOException {
this.leaderId = UTF8.readString(in);
this.partition = in.readInt();
this.beginOffset = in.readLong();
this.offset = in.readLong();
this.checksum = in.readLong();
this.topic = in.readUTF();
this.time = in.readLong();
this.server = in.readUTF(); // left for legacy
this.service = in.readUTF(); // left for legacy
this.partitionMap = new MapWritable();
try {
this.partitionMap.readFields(in);
} catch (IOException e) {
this.setServer(this.server);
this.setService(this.service);
}
}
@Test
public void testIgnoreStatsWithSignedSortOrder() {
ParquetMetadataConverter converter = new ParquetMetadataConverter();
BinaryStatistics stats = new BinaryStatistics();
stats.incrementNumNulls();
stats.updateStats(Binary.fromString("A"));
stats.incrementNumNulls();
stats.updateStats(Binary.fromString("z"));
stats.incrementNumNulls();
PrimitiveType binaryType = Types.required(PrimitiveTypeName.BINARY)
.as(OriginalType.UTF8).named("b");
Statistics convertedStats = converter.fromParquetStatistics(
Version.FULL_VERSION,
StatsHelper.V1.toParquetStatistics(stats),
binaryType);
Assert.assertFalse("Stats should not include min/max: " + convertedStats, convertedStats.hasNonNullValue());
Assert.assertTrue("Stats should have null count: " + convertedStats, convertedStats.isNumNullsSet());
Assert.assertEquals("Stats should have 3 nulls: " + convertedStats, 3L, convertedStats.getNumNulls());
}
private void testStillUseStatsWithSignedSortOrderIfSingleValue(StatsHelper helper) {
ParquetMetadataConverter converter = new ParquetMetadataConverter();
BinaryStatistics stats = new BinaryStatistics();
stats.incrementNumNulls();
stats.updateStats(Binary.fromString("A"));
stats.incrementNumNulls();
stats.updateStats(Binary.fromString("A"));
stats.incrementNumNulls();
PrimitiveType binaryType = Types.required(PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("b");
Statistics convertedStats = converter.fromParquetStatistics(
Version.FULL_VERSION,
ParquetMetadataConverter.toParquetStatistics(stats),
binaryType);
Assert.assertFalse("Stats should not be empty: " + convertedStats, convertedStats.isEmpty());
Assert.assertArrayEquals("min == max: " + convertedStats, convertedStats.getMaxBytes(), convertedStats.getMinBytes());
}
public static void writeTest(FileSystem fs, boolean fastCheck)
throws Exception {
fs.delete(DATA_DIR, true);
fs.delete(WRITE_DIR, true);
JobConf job = new JobConf(conf, TestFileSystem.class);
job.setBoolean("fs.test.fastCheck", fastCheck);
FileInputFormat.setInputPaths(job, CONTROL_DIR);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(WriteMapper.class);
job.setReducerClass(LongSumReducer.class);
FileOutputFormat.setOutputPath(job, WRITE_DIR);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
public static void readTest(FileSystem fs, boolean fastCheck)
throws Exception {
fs.delete(READ_DIR, true);
JobConf job = new JobConf(conf, TestFileSystem.class);
job.setBoolean("fs.test.fastCheck", fastCheck);
FileInputFormat.setInputPaths(job, CONTROL_DIR);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(ReadMapper.class);
job.setReducerClass(LongSumReducer.class);
FileOutputFormat.setOutputPath(job, READ_DIR);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
public static void seekTest(FileSystem fs, boolean fastCheck)
throws Exception {
fs.delete(READ_DIR, true);
JobConf job = new JobConf(conf, TestFileSystem.class);
job.setBoolean("fs.test.fastCheck", fastCheck);
FileInputFormat.setInputPaths(job,CONTROL_DIR);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(SeekMapper.class);
job.setReducerClass(LongSumReducer.class);
FileOutputFormat.setOutputPath(job, READ_DIR);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
/**
* Create the job configuration.
*/
private JobConf setupJob(int numMaps, int numReduces, String jarFile) {
JobConf jobConf = new JobConf(getConf());
jobConf.setJarByClass(MRBench.class);
FileInputFormat.addInputPath(jobConf, INPUT_DIR);
jobConf.setInputFormat(TextInputFormat.class);
jobConf.setOutputFormat(TextOutputFormat.class);
jobConf.setOutputValueClass(UTF8.class);
jobConf.setMapOutputKeyClass(UTF8.class);
jobConf.setMapOutputValueClass(UTF8.class);
if (null != jarFile) {
jobConf.setJar(jarFile);
}
jobConf.setMapperClass(Map.class);
jobConf.setReducerClass(Reduce.class);
jobConf.setNumMapTasks(numMaps);
jobConf.setNumReduceTasks(numReduces);
return jobConf;
}
public static void writeTest(FileSystem fs, boolean fastCheck)
throws Exception {
fs.delete(DATA_DIR, true);
fs.delete(WRITE_DIR, true);
JobConf job = new JobConf(conf, TestFileSystem.class);
job.setBoolean("fs.test.fastCheck", fastCheck);
FileInputFormat.setInputPaths(job, CONTROL_DIR);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(WriteMapper.class);
job.setReducerClass(LongSumReducer.class);
FileOutputFormat.setOutputPath(job, WRITE_DIR);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
public static void readTest(FileSystem fs, boolean fastCheck)
throws Exception {
fs.delete(READ_DIR, true);
JobConf job = new JobConf(conf, TestFileSystem.class);
job.setBoolean("fs.test.fastCheck", fastCheck);
FileInputFormat.setInputPaths(job, CONTROL_DIR);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(ReadMapper.class);
job.setReducerClass(LongSumReducer.class);
FileOutputFormat.setOutputPath(job, READ_DIR);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
public static void seekTest(FileSystem fs, boolean fastCheck)
throws Exception {
fs.delete(READ_DIR, true);
JobConf job = new JobConf(conf, TestFileSystem.class);
job.setBoolean("fs.test.fastCheck", fastCheck);
FileInputFormat.setInputPaths(job,CONTROL_DIR);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(SeekMapper.class);
job.setReducerClass(LongSumReducer.class);
FileOutputFormat.setOutputPath(job, READ_DIR);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
/**
* Map file name and offset into statistical data.
* <p>
* The map task is to get the
* <tt>key</tt>, which contains the file name, and the
* <tt>value</tt>, which is the offset within the file.
*
* The parameters are passed to the abstract method
* {@link #doIO(Reporter,String,long)}, which performs the io operation,
* usually read or write data, and then
* {@link #collectStats(OutputCollector,String,long,Object)}
* is called to prepare stat data for a subsequent reducer.
*/
public void map(UTF8 key,
LongWritable value,
OutputCollector<UTF8, UTF8> output,
Reporter reporter) throws IOException {
String name = key.toString();
long longValue = value.get();
reporter.setStatus("starting " + name + " ::host = " + hostName);
long tStart = System.currentTimeMillis();
Object statValue = doIO(reporter, name, longValue);
long tEnd = System.currentTimeMillis();
long execTime = tEnd - tStart;
collectStats(output, name, execTime, statValue);
reporter.setStatus("finished " + name + " ::host = " + hostName);
}
/**
* Create the job configuration.
*/
private static JobConf setupJob(int numMaps, int numReduces, String jarFile) {
JobConf jobConf = new JobConf(MRBench.class);
FileInputFormat.addInputPath(jobConf, INPUT_DIR);
jobConf.setInputFormat(TextInputFormat.class);
jobConf.setOutputFormat(TextOutputFormat.class);
jobConf.setOutputValueClass(UTF8.class);
jobConf.setMapOutputKeyClass(UTF8.class);
jobConf.setMapOutputValueClass(UTF8.class);
if (null != jarFile) {
jobConf.setJar(jarFile);
}
jobConf.setMapperClass(Map.class);
jobConf.setReducerClass(Reduce.class);
jobConf.setNumMapTasks(numMaps);
jobConf.setNumReduceTasks(numReduces);
return jobConf;
}
public void map(WritableComparable key, Text value,
OutputCollector<UTF8, UTF8> output,
Reporter reporter) throws IOException
{
String line = value.toString();
output.collect(new UTF8(process(line)), new UTF8(""));
}
public void reduce(UTF8 key, Iterator<UTF8> values,
OutputCollector<UTF8, UTF8> output, Reporter reporter) throws IOException
{
while(values.hasNext()) {
output.collect(key, new UTF8(values.next().toString()));
}
}
public void map(WritableComparable key, Text value,
OutputCollector<UTF8, UTF8> output,
Reporter reporter) throws IOException
{
String line = value.toString();
output.collect(new UTF8(process(line)), new UTF8(""));
}
public void reduce(UTF8 key, Iterator<UTF8> values,
OutputCollector<UTF8, UTF8> output, Reporter reporter) throws IOException
{
while(values.hasNext()) {
output.collect(key, new UTF8(values.next().toString()));
}
}
@Override
public void readFields(DataInput in) throws IOException {
topic = UTF8.readString(in);
leaderId = UTF8.readString(in);
String str = UTF8.readString(in);
if (!str.isEmpty())
try {
uri = new URI(str);
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
partition = in.readInt();
offset = in.readLong();
latestOffset = in.readLong();
}
@Override
public void write(DataOutput out) throws IOException {
UTF8.writeString(out, topic);
UTF8.writeString(out, leaderId);
if (uri != null)
UTF8.writeString(out, uri.toString());
else
UTF8.writeString(out, "");
out.writeInt(partition);
out.writeLong(offset);
out.writeLong(latestOffset);
}
@Override
public void write(DataOutput out) throws IOException {
UTF8.writeString(out, this.leaderId);
out.writeInt(this.partition);
out.writeLong(this.beginOffset);
out.writeLong(this.offset);
out.writeLong(this.checksum);
out.writeUTF(this.topic);
out.writeLong(this.time);
out.writeUTF(this.server); // left for legacy
out.writeUTF(this.service); // left for legacy
this.partitionMap.write(out);
}
private void testUseStatsWithSignedSortOrder(StatsHelper helper) {
// override defaults and use stats that were accumulated using signed order
Configuration conf = new Configuration();
conf.setBoolean("parquet.strings.signed-min-max.enabled", true);
ParquetMetadataConverter converter = new ParquetMetadataConverter(conf);
BinaryStatistics stats = new BinaryStatistics();
stats.incrementNumNulls();
stats.updateStats(Binary.fromString("A"));
stats.incrementNumNulls();
stats.updateStats(Binary.fromString("z"));
stats.incrementNumNulls();
PrimitiveType binaryType = Types.required(PrimitiveTypeName.BINARY)
.as(OriginalType.UTF8).named("b");
Statistics convertedStats = converter.fromParquetStatistics(
Version.FULL_VERSION,
helper.toParquetStatistics(stats),
binaryType);
Assert.assertFalse("Stats should not be empty", convertedStats.isEmpty());
Assert.assertTrue(convertedStats.isNumNullsSet());
Assert.assertEquals("Should have 3 nulls", 3, convertedStats.getNumNulls());
if (helper == StatsHelper.V1) {
assertFalse("Min-max should be null for V1 stats", convertedStats.hasNonNullValue());
} else {
Assert.assertEquals("Should have correct min (unsigned sort)",
Binary.fromString("A"), convertedStats.genericGetMin());
Assert.assertEquals("Should have correct max (unsigned sort)",
Binary.fromString("z"), convertedStats.genericGetMax());
}
}
public static void createControlFile(FileSystem fs,
long megaBytes, int numFiles,
long seed) throws Exception {
LOG.info("creating control file: "+megaBytes+" bytes, "+numFiles+" files");
Path controlFile = new Path(CONTROL_DIR, "files");
fs.delete(controlFile, true);
Random random = new Random(seed);
SequenceFile.Writer writer =
SequenceFile.createWriter(fs, conf, controlFile,
UTF8.class, LongWritable.class, CompressionType.NONE);
long totalSize = 0;
long maxSize = ((megaBytes / numFiles) * 2) + 1;
try {
while (totalSize < megaBytes) {
UTF8 name = new UTF8(Long.toString(random.nextLong()));
long size = random.nextLong();
if (size < 0)
size = -size;
size = size % maxSize;
//LOG.info(" adding: name="+name+" size="+size);
writer.append(name, new LongWritable(size));
totalSize += size;
}
} finally {
writer.close();
}
LOG.info("created control file for: "+totalSize+" bytes");
}
public void map(UTF8 key, LongWritable value,
OutputCollector<UTF8, LongWritable> collector,
Reporter reporter)
throws IOException {
String name = key.toString();
long size = value.get();
long seed = Long.parseLong(name);
random.setSeed(seed);
reporter.setStatus("creating " + name);
// write to temp file initially to permit parallel execution
Path tempFile = new Path(DATA_DIR, name+suffix);
OutputStream out = fs.create(tempFile);
long written = 0;
try {
while (written < size) {
if (fastCheck) {
Arrays.fill(buffer, (byte)random.nextInt(Byte.MAX_VALUE));
} else {
random.nextBytes(buffer);
}
long remains = size - written;
int length = (remains<=buffer.length) ? (int)remains : buffer.length;
out.write(buffer, 0, length);
written += length;
reporter.setStatus("writing "+name+"@"+written+"/"+size);
}
} finally {
out.close();
}
// rename to final location
fs.rename(tempFile, new Path(DATA_DIR, name));
collector.collect(new UTF8("bytes"), new LongWritable(written));
reporter.setStatus("wrote " + name);
}
public void map(WritableComparable key, Text value,
OutputCollector<UTF8, UTF8> output,
Reporter reporter) throws IOException
{
String line = value.toString();
output.collect(new UTF8(process(line)), new UTF8(""));
}
public void reduce(UTF8 key, Iterator<UTF8> values,
OutputCollector<UTF8, UTF8> output, Reporter reporter) throws IOException
{
while(values.hasNext()) {
output.collect(key, new UTF8(values.next().toString()));
}
}
/** {@inheritDoc} */
public void readFields(DataInput in) throws IOException {
name = UTF8.readString(in);
storageID = UTF8.readString(in);
// the infoPort read could be negative, if the port is a large number (more
// than 15 bits in storage size (but less than 16 bits).
// So chop off the first two bytes (and hence the signed bits) before
// setting the field.
this.infoPort = in.readShort() & 0x0000ffff;
}
@SuppressWarnings("deprecation")
public static byte[] readBytes(DataInputStream in) throws IOException {
UTF8 ustr = TL_DATA.get().U_STR;
ustr.readFields(in);
int len = ustr.getLength();
byte[] bytes = new byte[len];
System.arraycopy(ustr.getBytes(), 0, bytes, 0, len);
return bytes;
}
/**
* Reading the path from the image and converting it to byte[][] directly
* this saves us an array copy and conversions to and from String
* @param in
* @return the array each element of which is a byte[] representation
* of a path component
* @throws IOException
*/
@SuppressWarnings("deprecation")
public static byte[][] readPathComponents(DataInputStream in)
throws IOException {
UTF8 ustr = TL_DATA.get().U_STR;
ustr.readFields(in);
return DFSUtil.bytes2byteArray(ustr.getBytes(),
ustr.getLength(), (byte) Path.SEPARATOR_CHAR);
}
/** Serialization for FSEditLog */
void readFieldsFromFSEditLog(DataInput in) throws IOException {
this.name = UTF8.readString(in);
this.storageID = UTF8.readString(in);
this.infoPort = in.readShort() & 0x0000ffff;
this.capacity = in.readLong();
this.dfsUsed = in.readLong();
this.remaining = in.readLong();
this.lastUpdate = in.readLong();
this.xceiverCount = in.readInt();
this.location = Text.readString(in);
this.hostName = Text.readString(in);
setAdminState(WritableUtils.readEnum(in, AdminStates.class));
}