下面列出了怎么用org.apache.hadoop.io.NullWritable的API类实例代码及写法,或者点击链接到github查看源代码。
public static Job getHdfsJob(Configuration conf, TaskConfig taskConfig, IndexInfo indexInfo) throws Exception {
Job job = Job.getInstance(conf, MAIN_CLASS);
job.setJobName("DidiFastIndex_" + taskConfig.getEsTemplate());
job.setJarByClass(FastIndex.class);
job.setMapperClass(FastIndexMapper.class);
job.setInputFormatClass(HCatInputFormat.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(DefaultHCatRecord.class);
HCatInputFormat.setInput(job, taskConfig.getHiveDB(), taskConfig.getHiveTable(), taskConfig.getFilterStr());
job.setReducerClass(FastIndexReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(indexInfo.getReducerNum());
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(taskConfig.getHdfsMROutputPath()));
return job;
}
public Job createJob(int numMapper, int numReducer,
long mapSleepTime, int mapSleepCount,
long reduceSleepTime, int reduceSleepCount)
throws IOException {
Configuration conf = getConf();
conf.setLong(MAP_SLEEP_TIME, mapSleepTime);
conf.setLong(REDUCE_SLEEP_TIME, reduceSleepTime);
conf.setInt(MAP_SLEEP_COUNT, mapSleepCount);
conf.setInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
conf.setInt(MRJobConfig.NUM_MAPS, numMapper);
Job job = Job.getInstance(conf, "sleep");
job.setNumReduceTasks(numReducer);
job.setJarByClass(SleepJob.class);
job.setMapperClass(SleepMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(SleepReducer.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setInputFormatClass(SleepInputFormat.class);
job.setPartitionerClass(SleepJobPartitioner.class);
job.setSpeculativeExecution(false);
job.setJobName("Sleep job");
FileInputFormat.addInputPath(job, new Path("ignored"));
return job;
}
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf());
if (args.length != 2) {
usage();
return 2;
}
TeraInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setJobName("TeraSum");
job.setJarByClass(TeraChecksum.class);
job.setMapperClass(ChecksumMapper.class);
job.setReducerClass(ChecksumReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Unsigned16.class);
// force a single reducer
job.setNumReduceTasks(1);
job.setInputFormatClass(TeraInputFormat.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public Job createJob()
throws IOException {
Configuration conf = getConf();
conf.setInt(MRJobConfig.NUM_MAPS, 1);
Job job = Job.getInstance(conf, "test");
job.setNumReduceTasks(1);
job.setJarByClass(CredentialsTestJob.class);
job.setNumReduceTasks(1);
job.setMapperClass(CredentialsTestJob.CredentialsTestMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(CredentialsTestJob.CredentialsTestReducer.class);
job.setInputFormatClass(SleepJob.SleepInputFormat.class);
job.setPartitionerClass(SleepJob.SleepJobPartitioner.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setSpeculativeExecution(false);
job.setJobName("test job");
FileInputFormat.addInputPath(job, new Path("ignored"));
return job;
}
/**
* 具体处理数据的方法
*
* @param clientInfo
* @param context
* @param event
* @throws InterruptedException
* @throws IOException
*/
private void handleData(Map<String, String> clientInfo, EventEnum event,
Context context) throws IOException, InterruptedException {
String uuid = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_UUID);
String memberId = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_MEMBER_ID);
String serverTime = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME);
if (StringUtils.isNotBlank(serverTime)) {
// 要求服务器时间不为空
clientInfo.remove(EventLogConstants.LOG_COLUMN_NAME_USER_AGENT); // 浏览器信息去掉
String rowkey = this.generateRowKey(uuid, memberId, event.alias, serverTime); // timestamp
// +
// (uuid+memberid+event).crc
Put put = new Put(Bytes.toBytes(rowkey));
for (Map.Entry<String, String> entry : clientInfo.entrySet()) {
if (StringUtils.isNotBlank(entry.getKey()) && StringUtils.isNotBlank(entry.getValue())) {
put.addColumn(family, Bytes.toBytes(entry.getKey()), Bytes.toBytes(entry.getValue()));
}
}
context.write(NullWritable.get(), put);
this.outputRecords++;
} else {
this.filterRecords++;
}
}
/**
* 具体处理数据的方法
*
* @param clientInfo
* @param context
* @param event
* @throws InterruptedException
* @throws IOException
*/
private void handleData(Map<String, String> clientInfo, EventEnum event, Context context) throws IOException, InterruptedException {
String uuid = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_UUID);
String memberId = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_MEMBER_ID);
String serverTime = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME);
if (StringUtils.isNotBlank(serverTime)) {
// 要求服务器时间不为空
clientInfo.remove(EventLogConstants.LOG_COLUMN_NAME_USER_AGENT); // 浏览器信息去掉
String rowkey = this.generateRowKey(uuid, memberId, event.alias, serverTime); // timestamp
// +
// (uuid+memberid+event).crc
Put put = new Put(Bytes.toBytes(rowkey));
for (Map.Entry<String, String> entry : clientInfo.entrySet()) {
if (StringUtils.isNotBlank(entry.getKey()) && StringUtils.isNotBlank(entry.getValue())) {
put.addColumn(family, Bytes.toBytes(entry.getKey()), Bytes.toBytes(entry.getValue()));
}
}
context.write(NullWritable.get(), put);
this.outputRecords++;
} else {
this.filterRecords++;
}
}
@Override
protected void configureMapper(Job job, String tableName,
String tableClassName) throws ClassNotFoundException, IOException {
if (isHCatJob) {
throw new IOException("Sqoop-HCatalog Integration is not supported.");
}
switch (getInputFileType()) {
case AVRO_DATA_FILE:
throw new IOException("Avro data file is not supported.");
case SEQUENCE_FILE:
case UNKNOWN:
default:
job.setMapperClass(getMapperClass());
}
// Concurrent writes of the same records would be problematic.
ConfigurationHelper.setJobMapSpeculativeExecution(job, false);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(NullWritable.class);
}
@Override
public void map(NullWritable ignored, GridmixRecord rec,
Context context) throws IOException, InterruptedException {
acc += ratio;
while (acc >= 1.0 && !reduces.isEmpty()) {
key.setSeed(r.nextLong());
val.setSeed(r.nextLong());
final int idx = r.nextInt(reduces.size());
final RecordFactory f = reduces.get(idx);
if (!f.next(key, val)) {
reduces.remove(idx);
continue;
}
context.write(key, val);
acc -= 1.0;
// match inline
try {
matcher.match();
} catch (Exception e) {
LOG.debug("Error in resource usage emulation! Message: ", e);
}
}
}
@Override
public List<Object> call(Tuple2<AvroWrapper, NullWritable> avroTuple)
{
final GenericData.Record datum = (GenericData.Record) avroTuple._1().datum();
List<Object> row = new ArrayList<>(this.headers.size());
for (String header : this.headers)
{
Object value = datum.get(header);
if (value instanceof CharSequence) // Avro Utf8 type
{
value = value.toString();
}
row.add(value);
}
return row;
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
val.setSeed(r.nextLong());
while (factory.next(null, val)) {
context.write(NullWritable.get(), val);
val.setSeed(r.nextLong());
// match inline
try {
matcher.match();
} catch (Exception e) {
LOG.debug("Error in resource usage emulation! Message: ", e);
}
}
}
public synchronized void write(K key, V value)
throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(newline);
}
/**
* Resolves a given identifier. This method has to be called before calling
* any of the getters.
*/
public void resolve(String identifier) {
if (identifier.equalsIgnoreCase(RAW_BYTES_ID)) {
setInputWriterClass(RawBytesInputWriter.class);
setOutputReaderClass(RawBytesOutputReader.class);
setOutputKeyClass(BytesWritable.class);
setOutputValueClass(BytesWritable.class);
} else if (identifier.equalsIgnoreCase(TYPED_BYTES_ID)) {
setInputWriterClass(TypedBytesInputWriter.class);
setOutputReaderClass(TypedBytesOutputReader.class);
setOutputKeyClass(TypedBytesWritable.class);
setOutputValueClass(TypedBytesWritable.class);
} else if (identifier.equalsIgnoreCase(KEY_ONLY_TEXT_ID)) {
setInputWriterClass(KeyOnlyTextInputWriter.class);
setOutputReaderClass(KeyOnlyTextOutputReader.class);
setOutputKeyClass(Text.class);
setOutputValueClass(NullWritable.class);
} else { // assume TEXT_ID
setInputWriterClass(TextInputWriter.class);
setOutputReaderClass(TextOutputReader.class);
setOutputKeyClass(Text.class);
setOutputValueClass(Text.class);
}
}
/**
* Tests the class loader set by
* {@link Configuration#setClassLoader(ClassLoader)}
* is inherited by any {@link WrappedRecordReader}s created by
* {@link CompositeRecordReader}
*/
public void testClassLoader() throws Exception {
Configuration conf = new Configuration();
Fake_ClassLoader classLoader = new Fake_ClassLoader();
conf.setClassLoader(classLoader);
assertTrue(conf.getClassLoader() instanceof Fake_ClassLoader);
FileSystem fs = FileSystem.get(conf);
Path testdir = new Path(System.getProperty("test.build.data", "/tmp"))
.makeQualified(fs);
Path base = new Path(testdir, "/empty");
Path[] src = { new Path(base, "i0"), new Path("i1"), new Path("i2") };
conf.set(CompositeInputFormat.JOIN_EXPR,
CompositeInputFormat.compose("outer", IF_ClassLoaderChecker.class, src));
CompositeInputFormat<NullWritable> inputFormat =
new CompositeInputFormat<NullWritable>();
// create dummy TaskAttemptID
TaskAttemptID tid = new TaskAttemptID("jt", 1, TaskType.MAP, 0, 0);
conf.set(MRJobConfig.TASK_ATTEMPT_ID, tid.toString());
inputFormat.createRecordReader
(inputFormat.getSplits(Job.getInstance(conf)).get(0),
new TaskAttemptContextImpl(conf, tid));
}
public void testTotalOrderBinarySearch() throws Exception {
TotalOrderPartitioner<Text,NullWritable> partitioner =
new TotalOrderPartitioner<Text,NullWritable>();
Configuration conf = new Configuration();
Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
"totalorderbinarysearch", conf, splitStrings);
conf.setBoolean(TotalOrderPartitioner.NATURAL_ORDER, false);
conf.setClass(MRJobConfig.MAP_OUTPUT_KEY_CLASS, Text.class, Object.class);
try {
partitioner.setConf(conf);
NullWritable nw = NullWritable.get();
for (Check<Text> chk : testStrings) {
assertEquals(chk.data.toString(), chk.part,
partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
}
} finally {
p.getFileSystem(conf).delete(p, true);
}
}
public static boolean run(Configuration config, Map<String, String> paths)
throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
String jobName = "step1";
Job job = Job.getInstance(config, jobName);
job.setJarByClass(Step1.class);
job.setJar("export\\ItemCF.jar");
job.setMapperClass(Step1_Mapper.class);
job.setReducerClass(Step1_Reducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
Path inPath = new Path(paths.get("Step1Input"));
Path outpath = new Path(paths.get("Step1Output"));
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outpath);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outpath)) {
fs.delete(outpath, true);
}
return job.waitForCompletion(true);
}
@Override
public boolean next( final NullWritable key, final ColumnAndIndex value ) throws IOException {
if( currentSpread == null || currentIndex == currentIndexList.size() ){
if( ! nextReader() ){
updateCounter( reader.getReadStats() );
isEnd = true;
return false;
}
}
spreadColumn.setSpread( currentSpread );
value.column = spreadColumn;
value.index = currentIndexList.get( currentIndex );
value.columnIndex = spreadCounter.get();
currentIndex++;
return true;
}
SequenceFileIterator(String path) throws IOException {
final SequenceFile.Reader.Option fileOption = SequenceFile.Reader.file(new Path(path));
this.sequenceFileReader = new SequenceFile.Reader(new Configuration(true), fileOption);
Validate.isTrue(this.sequenceFileReader.getKeyClass().equals(NullWritable.class));
Validate.isTrue(this.sequenceFileReader.getValueClass().equals(BytesWritable.class));
this.tryAdvance();
}
@Override
public void map(NullWritable key, LongWritable value, Context context)
throws IOException, InterruptedException {
for (long bytes = value.get(); bytes > 0; bytes -= val.getLength()) {
r.nextBytes(val.getBytes());
val.setSize((int)Math.min(val.getLength(), bytes));
context.write(key, val);
}
}
public void map(LongWritable k, Text v, Context c)
throws IOException, InterruptedException {
int max = Integer.valueOf(v.toString());
for (int i = 0; i < max; i++) {
c.write(new Text("" + i), NullWritable.get());
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
this.processArgs(conf, args);
Job job = Job.getInstance(conf, "analyser_logdata");
// 设置本地提交job,集群运行,需要代码
// File jarFile = EJob.createTempJar("target/classes");
// ((JobConf) job.getConfiguration()).setJar(jarFile.toString());
// 设置本地提交job,集群运行,需要代码结束
job.setJarByClass(AnalyserLogDataRunner.class);
job.setMapperClass(AnalyserLogDataMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Put.class);
// 设置reducer配置
// 1. 集群上运行,打成jar运行(要求addDependencyJars参数为true,默认就是true)
// TableMapReduceUtil.initTableReducerJob(EventLogConstants.HBASE_NAME_EVENT_LOGS, null, job);
// 2. 本地运行,要求参数addDependencyJars为false
TableMapReduceUtil.initTableReducerJob(EventLogConstants.HBASE_NAME_EVENT_LOGS, null, job, null, null, null, null, false);
job.setNumReduceTasks(0);
// 设置输入路径
this.setJobInputPaths(job);
return job.waitForCompletion(true) ? 0 : -1;
}
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
// 1. 获取platform、time、url
String platform = this.getPlatform(value);
String serverTime = this.getServerTime(value);
String url = this.getCurrentUrl(value);
// 2. 过滤数据
if (StringUtils.isBlank(platform) || StringUtils.isBlank(url) || StringUtils.isBlank(serverTime) || !StringUtils.isNumeric(serverTime.trim())) {
logger.warn("平台&服务器时间&当前url不能为空,而且服务器时间必须为时间戳形式的字符串");
return ;
}
// 3. 创建platform维度信息
List<PlatformDimension> platforms = PlatformDimension.buildList(platform);
// 4. 创建browser维度信息
String browserName = this.getBrowserName(value);
String browserVersion = this.getBrowserVersion(value);
List<BrowserDimension> browsers = BrowserDimension.buildList(browserName, browserVersion);
// 5. 创建date维度信息
DateDimension dayOfDimenion = DateDimension.buildDate(Long.valueOf(serverTime.trim()), DateEnum.DAY);
// 6. 输出的写出
StatsCommonDimension statsCommon = this.statsUserDimension.getStatsCommon();
statsCommon.setDate(dayOfDimenion); // 设置date dimension
statsCommon.setKpi(this.websitePageViewDimension); // 设置kpi dimension
for (PlatformDimension pf : platforms) {
statsCommon.setPlatform(pf); // 设置platform dimension
for (BrowserDimension br : browsers) {
this.statsUserDimension.setBrowser(br); // 设置browser dimension
// 输出
context.write(this.statsUserDimension, NullWritable.get());
}
}
}
@Override
public boolean next(FloatWritable key, NullWritable value)
throws IOException {
progress = key;
index++;
return index <= 10;
}
@SuppressWarnings("unchecked")
public RR_ClassLoaderChecker(JobConf job) {
assertTrue("The class loader has not been inherited from "
+ CompositeRecordReader.class.getSimpleName(),
job.getClassLoader() instanceof Fake_ClassLoader);
keyclass = (Class<? extends K>) job.getClass("test.fakeif.keyclass",
NullWritable.class, WritableComparable.class);
valclass = (Class<? extends V>) job.getClass("test.fakeif.valclass",
NullWritable.class, WritableComparable.class);
}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
try {
if (taskNumber < 0 || numTasks < 1) {
throw new IllegalArgumentException("TaskNumber: " + taskNumber + ", numTasks: " + numTasks);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Opening stream for output (" + (taskNumber + 1) + "/" + numTasks + "). WriteMode=" + writeMode +
", OutputDirectoryMode=" + outputDirectoryMode);
}
Path p = this.outputFilePath;
if (p == null) {
throw new IOException("The file path is null.");
}
final FileSystem fs = p.getFileSystem();
if(fs.exists(p)) {
fs.delete(p, true);
}
this.fileCreated = true;
final SequenceFile.Writer.Option fileOption = SequenceFile.Writer.file(new org.apache.hadoop.fs.Path(p.toString()));
final SequenceFile.Writer.Option keyClassOption = SequenceFile.Writer.keyClass(NullWritable.class);
final SequenceFile.Writer.Option valueClassOption = SequenceFile.Writer.valueClass(BytesWritable.class);
writer = SequenceFile.createWriter(new org.apache.hadoop.conf.Configuration(true), fileOption, keyClassOption, valueClassOption);
}catch (Exception e){
e.printStackTrace();
}
}
private void setupReducer(Path output, CubeSegment cubeSeg) throws IOException {
int hllShardBase = MapReduceUtil.getCuboidHLLCounterReducerNum(cubeSeg.getCubeInstance());
job.setReducerClass(CalculateStatsFromBaseCuboidReducer.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(hllShardBase);
FileOutputFormat.setOutputPath(job, output);
job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
deletePath(job.getConfiguration(), output);
}
private void outputDict(TblColRef col, Dictionary<String> dict) throws IOException, InterruptedException {
// output written to baseDir/colName/colName.rldict-r-00000 (etc)
String dictFileName = col.getIdentity() + "/" + col.getName() + DICT_FILE_POSTFIX;
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream outputStream = new DataOutputStream(baos);) {
outputStream.writeUTF(dict.getClass().getName());
dict.write(outputStream);
mos.write(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(),
new ArrayPrimitiveWritable(baos.toByteArray()), dictFileName);
}
}
private void calculateColData(Iterator<Tuple2<SelfDefineSortableKey, Text>> tuple2Iterator) {
while (tuple2Iterator.hasNext()) {
Tuple2<SelfDefineSortableKey, Text> tuple = tuple2Iterator.next();
String value = Bytes.toString(tuple._1.getText().getBytes(), 1, tuple._1.getText().getLength() - 1);
logAFewRows(value);
// if dimension col, compute max/min value
// include the col which is both dict col and dim col
if (isDimensionCol) {
if (minValue == null || col.getType().compare(minValue, value) > 0) {
minValue = value;
}
if (maxValue == null || col.getType().compare(maxValue, value) < 0) {
maxValue = value;
}
}
//if dict column
if (isDictCol) {
if (buildDictInReducer) {
builder.addValue(value);
} else {
// output written to baseDir/colName/-r-00000 (etc)
result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(
BatchConstants.CFG_OUTPUT_COLUMN, new Tuple3<Writable, Writable, String>(
NullWritable.get(), new Text(value.getBytes(StandardCharsets.UTF_8)), col.getIdentity() + "/")));
}
}
rowCount++;
}
}
/**
* Write a partition file for the given job, using the Sampler provided.
* Queries the sampler for a sample keyset, sorts by the output key
* comparator, selects the keys for each rank, and writes to the destination
* returned from {@link TotalOrderPartitioner#getPartitionFile}.
*/
@SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator
public static <K,V> void writePartitionFile(Job job, Sampler<K,V> sampler)
throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = job.getConfiguration();
final InputFormat inf =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
int numPartitions = job.getNumReduceTasks();
K[] samples = (K[])sampler.getSample(inf, job);
LOG.info("Using " + samples.length + " samples");
RawComparator<K> comparator =
(RawComparator<K>) job.getSortComparator();
Arrays.sort(samples, comparator);
Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf));
FileSystem fs = dst.getFileSystem(conf);
if (fs.exists(dst)) {
fs.delete(dst, false);
}
SequenceFile.Writer writer = SequenceFile.createWriter(fs,
conf, dst, job.getMapOutputKeyClass(), NullWritable.class);
NullWritable nullValue = NullWritable.get();
float stepSize = samples.length / (float) numPartitions;
int last = -1;
for(int i = 1; i < numPartitions; ++i) {
int k = Math.round(stepSize * i);
while (last >= k && comparator.compare(samples[last], samples[k]) == 0) {
++k;
}
writer.append(samples[k], nullValue);
last = k;
}
writer.close();
}
@Override
public void reduce(Text key, Iterable<Text> values, Context ctx)
throws IOException, InterruptedException {
count = 0;
for (Text value : values)
count++;
ctx.write(NullWritable.get(), new Text(key.toString() + " " + count));
}
@Override
protected void map(NullWritable key, MapWritable value,
Mapper<NullWritable, MapWritable, Text, MapWritable>.Context context)
throws IOException, InterruptedException {
Text deltaObjectId = (Text) value.get(new Text("device_id"));
long timestamp = ((LongWritable) value.get(new Text("time_stamp"))).get();
if (timestamp % 100000 == 0) {
context.write(deltaObjectId, new MapWritable(value));
}
}