下面列出了怎么用org.apache.hadoop.io.IntWritable的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void collect(Configuration conf, BaseDimension key, BaseStatsValueWritable value,
PreparedStatement preparedStatement, IDimensionConverter converter)
throws SQLException, IOException {
StatsUserDimension statsUser = (StatsUserDimension)key;
MapWritableValue mapWritableValue = (MapWritableValue)value;
IntWritable activeUserValue = (IntWritable) mapWritableValue.getValue().get(new IntWritable(-1));
int i = 0;
preparedStatement.setInt(++i, converter.getDimensionIdByValue(statsUser.getStatsCommon().getPlatform()));
preparedStatement.setInt(++i, converter.getDimensionIdByValue(statsUser.getStatsCommon().getDate()));
preparedStatement.setInt(++i, activeUserValue.get());
preparedStatement.setString(++i, conf.get(GlobalConstants.RUNNING_DATE_PARAMS));
preparedStatement.setInt(++i, activeUserValue.get());
preparedStatement.addBatch();
}
/** Queries the job tracker for a set of outputs ready to be copied
* @param fromEventId the first event ID we want to start from, this is
* modified by the call to this method
* @param jobClient the job tracker
* @return a set of locations to copy outputs from
* @throws IOException
*/
private List<TaskCompletionEvent> queryJobTracker(IntWritable fromEventId,
JobID jobId,
InterTrackerProtocol jobClient)
throws IOException {
TaskCompletionEvent t[] = jobClient.getTaskCompletionEvents(
jobId,
fromEventId.get(),
probe_sample_size);
//we are interested in map task completion events only. So store
//only those
List <TaskCompletionEvent> recentMapEvents =
new ArrayList<TaskCompletionEvent>();
for (int i = 0; i < t.length; i++) {
if (t[i].isMap) {
recentMapEvents.add(t[i]);
}
}
fromEventId.set(fromEventId.get() + t.length);
return recentMapEvents;
}
@Test
public void testConfigurableMapper() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
JobConf conf = new JobConf();
conf.set("my.filterPrefix", "Hello");
DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
DataSet<Tuple2<IntWritable, Text>> hellos = ds.
flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new ConfigurableMapper(), conf));
String resultPath = tempFolder.newFile().toURI().toString();
hellos.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
env.execute();
String expected = "(2,Hello)\n" +
"(3,Hello world)\n" +
"(4,Hello world, how are you?)\n";
compareResultsByLinesInMemory(expected, resultPath);
}
@Override
public void collect(Configuration conf, BaseDimension key, BaseStatsValueWritable value, PreparedStatement pstmt, IDimensionConverter converter) throws SQLException, IOException {
// 进行强制后获取对应的值
StatsUserDimension statsUser = (StatsUserDimension) key;
IntWritable activeUserValue = (IntWritable) ((MapWritableValue) value).getValue().get(new IntWritable(-1));
// 进行参数设置
int i = 0;
pstmt.setInt(++i, converter.getDimensionIdByValue(statsUser.getStatsCommon().getPlatform()));
pstmt.setInt(++i, converter.getDimensionIdByValue(statsUser.getStatsCommon().getDate()));
pstmt.setInt(++i, converter.getDimensionIdByValue(statsUser.getBrowser()));
pstmt.setInt(++i, activeUserValue.get());
pstmt.setString(++i, conf.get(GlobalConstants.RUNNING_DATE_PARAMES));
pstmt.setInt(++i, activeUserValue.get());
// 添加到batch中
pstmt.addBatch();
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf1 = new Configuration();
Job job1 = new Job(conf1, "wiki job one");
job1.setOutputFormatClass(SequenceFileOutputFormat.class);
job1.setInputFormatClass(SequenceFileInputFormat.class);
job1.setNumReduceTasks(1);
job1.setJarByClass(Step32.class);
job1.setMapperClass(WikiMapper32.class);
job1.setMapOutputKeyClass(IntWritable.class);
job1.setMapOutputValueClass(VectorOrPrefWritable.class);
job1.setReducerClass(WiKiReducer32.class);
job1.setOutputKeyClass(IntWritable.class);
job1.setOutputValueClass(VectorOrPrefWritable.class);
// the WiKiDriver's out put is this one's input
SequenceFileInputFormat.addInputPath(job1, new Path(INPUT_PATH));
SequenceFileOutputFormat.setOutputPath(job1, new Path(OUTPUT_PATH));
if(!job1.waitForCompletion(true)){
System.exit(1); // run error then exit
}
}
public void run() throws Exception{
long startTime = System.currentTimeMillis();
Configuration conf = new Configuration();
conf.set(TableOutputFormat.OUTPUT_TABLE, Constants.hbase_user_item_pref_table);
Job job = Job.getInstance(conf, "hbasewriter"+System.currentTimeMillis());
job.setJarByClass(UpdateCFJob.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(HBaseWriteReducer.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(TableOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(input));
long endTime = System.currentTimeMillis();
boolean isFinish = job.waitForCompletion(true);
if(isFinish){
logger.info("UpdateCFJob job ["+job.getJobName()+"] run finish.it costs"+ (endTime - startTime) / 1000 +"s.");
} else {
logger.error("UpdateCFJob job ["+job.getJobName()+"] run failed.");
}
}
public static Job createJob() throws IOException {
final Configuration conf = new Configuration();
final Job baseJob = Job.getInstance(conf);
baseJob.setOutputKeyClass(Text.class);
baseJob.setOutputValueClass(IntWritable.class);
baseJob.setMapperClass(NewMapTokenizer.class);
baseJob.setCombinerClass(NewSummer.class);
baseJob.setReducerClass(NewSummer.class);
baseJob.setNumReduceTasks(1);
baseJob.getConfiguration().setInt(JobContext.IO_SORT_MB, 1);
baseJob.getConfiguration().set(JobContext.MAP_SORT_SPILL_PERCENT, "0.50");
baseJob.getConfiguration().setInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setMinInputSplitSize(
baseJob, Long.MAX_VALUE);
return baseJob;
}
@Test
public void TestIntParse() throws Exception {
Configuration config = new Configuration();
Text value = new Text();
AbstractSerDe jserde = new EsriJsonSerDe();
Properties proptab = new Properties();
proptab.setProperty(HiveShims.serdeConstants.LIST_COLUMNS, "num");
proptab.setProperty(HiveShims.serdeConstants.LIST_COLUMN_TYPES, "int");
jserde.initialize(config, proptab);
StructObjectInspector rowOI = (StructObjectInspector)jserde.getObjectInspector();
//value.set("{\"attributes\":{\"num\":7},\"geometry\":null}");
value.set("{\"attributes\":{\"num\":7}}");
Object row = jserde.deserialize(value);
StructField f0 = rowOI.getStructFieldRef("num");
Object fieldData = rowOI.getStructFieldData(row, f0);
Assert.assertEquals(7, ((IntWritable)fieldData).get());
value.set("{\"attributes\":{\"num\":9}}");
row = jserde.deserialize(value);
f0 = rowOI.getStructFieldRef("num");
fieldData = rowOI.getStructFieldData(row, f0);
Assert.assertEquals(9, ((IntWritable)fieldData).get());
}
@Test
//Test appendValue with DataInputBuffer
public void testAppendValueWithDataInputBuffer() throws IOException {
List<KVPair> data = KVDataGen.generateTestData(false, rnd.nextInt(100));
IFile.Writer writer = new IFile.Writer(defaultConf, localFs, outputPath,
Text.class, IntWritable.class, codec, null, null);
final DataInputBuffer previousKey = new DataInputBuffer();
DataInputBuffer key = new DataInputBuffer();
DataInputBuffer value = new DataInputBuffer();
for (KVPair kvp : data) {
populateData(kvp, key, value);
if ((previousKey != null && BufferUtils.compare(key, previousKey) == 0)) {
writer.appendValue(value);
} else {
writer.append(key, value);
}
previousKey.reset(k.getData(), 0, k.getLength());
}
writer.close();
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
}
private DAG createDAG(int numGenTasks, int totalSourceDataSize, int numFetcherTasks) {
int bytesPerSource = totalSourceDataSize / numGenTasks;
LOG.info("DataPerSourceTask(bytes)=" + bytesPerSource);
ByteBuffer payload = ByteBuffer.allocate(4);
payload.putInt(0, bytesPerSource);
Vertex broadcastVertex = Vertex.create("DataGen",
ProcessorDescriptor.create(InputGenProcessor.class.getName())
.setUserPayload(UserPayload.create(payload)), numGenTasks);
Vertex fetchVertex = Vertex.create("FetchVertex",
ProcessorDescriptor.create(InputFetchProcessor.class.getName()), numFetcherTasks);
UnorderedKVEdgeConfig edgeConf = UnorderedKVEdgeConfig.newBuilder(NullWritable.class
.getName(), IntWritable.class.getName()).setCompression(false, null, null).build();
DAG dag = DAG.create("BroadcastLoadGen");
dag.addVertex(broadcastVertex).addVertex(fetchVertex).addEdge(
Edge.create(broadcastVertex, fetchVertex, edgeConf.createDefaultBroadcastEdgeProperty()));
return dag;
}
@Test(timeout = 5000)
// Test empty file case
public void testEmptyFileBackedInMemIFileWriter() throws IOException {
List<KVPair> data = new ArrayList<>();
TezTaskOutputFiles
tezTaskOutput = new TezTaskOutputFiles(defaultConf, "uniqueId", 1);
IFile.FileBackedInMemIFileWriter writer = new IFile.FileBackedInMemIFileWriter(defaultConf, localFs, tezTaskOutput,
Text.class, IntWritable.class, codec, null, null,
100);
// empty ifile
writer.close();
byte[] bytes = new byte[(int) writer.getRawLength()];
IFile.Reader.readToMemory(bytes,
new ByteArrayInputStream(ByteString.copyFrom(writer.getData()).toByteArray()),
(int) writer.getCompressedLength(), codec, false, -1);
readUsingInMemoryReader(bytes, data);
}
@Override
public void collect(Configuration conf, BaseDimension key, BaseStatsValueWritable value,
PreparedStatement pstmt, IDimensionConverter converter)
throws SQLException, IOException {
StatsUserDimension statsUserDimension = (StatsUserDimension) key;
MapWritableValue mapWritableValue = (MapWritableValue) value;
IntWritable newInstallUsers = (IntWritable) mapWritableValue.getValue().get(new IntWritable(-1));
int i = 0;
pstmt.setInt(++i, converter.getDimensionIdByValue(statsUserDimension.getStatsCommon().getPlatform()));
pstmt.setInt(++i, converter.getDimensionIdByValue(statsUserDimension.getStatsCommon().getDate()));
pstmt.setInt(++i, newInstallUsers.get());
pstmt.setString(++i, conf.get(GlobalConstants.RUNNING_DATE_PARAMES));
pstmt.setInt(++i, newInstallUsers.get());
pstmt.addBatch();//往批处理放入数据
}
private List<TezMerger.Segment> createInMemorySegments(int segmentCount, int keysPerSegment)
throws IOException {
List<TezMerger.Segment> segmentList = Lists.newLinkedList();
Random rnd = new Random();
DataInputBuffer key = new DataInputBuffer();
DataInputBuffer value = new DataInputBuffer();
for (int i = 0; i < segmentCount; i++) {
BoundedByteArrayOutputStream stream = new BoundedByteArrayOutputStream(10000);
InMemoryWriter writer = new InMemoryWriter(stream);
for (int j = 0; j < keysPerSegment; j++) {
populateData(new IntWritable(rnd.nextInt()), new LongWritable(rnd.nextLong()), key, value);
writer.append(key, value);
}
writer.close();
InMemoryReader reader = new InMemoryReader(merger, null, stream.getBuffer(), 0, stream.getLimit());
segmentList.add(new TezMerger.Segment(reader, null));
}
return segmentList;
}
public void testNestedIterable() throws Exception {
Random r = new Random();
Writable[] writs = {
new BooleanWritable(r.nextBoolean()),
new FloatWritable(r.nextFloat()),
new FloatWritable(r.nextFloat()),
new IntWritable(r.nextInt()),
new LongWritable(r.nextLong()),
new BytesWritable("dingo".getBytes()),
new LongWritable(r.nextLong()),
new IntWritable(r.nextInt()),
new BytesWritable("yak".getBytes()),
new IntWritable(r.nextInt())
};
TupleWritable sTuple = makeTuple(writs);
assertTrue("Bad count", writs.length == verifIter(writs, sTuple, 0));
}
public void map(IntWritable key, IntWritable value,
OutputCollector<IntWritable, NullWritable> output, Reporter reporter)
throws IOException {
//it is expected that every map processes mapSleepCount number of records.
try {
reporter.setStatus("Sleeping... ("
+ (mapSleepDuration * (mapSleepCount - count)) + ") ms left");
Thread.sleep(mapSleepDuration);
} catch (InterruptedException ex) {
throw (IOException) new IOException(
"Interrupted while sleeping").initCause(ex);
}
++count;
// output reduceSleepCount * numReduce number of random values, so that
// each reducer will get reduceSleepCount number of keys.
int k = key.get();
for (int i = 0; i < value.get(); ++i) {
output.collect(new IntWritable(k + i), NullWritable.get());
}
}
public DoubleWritable evaluate(Map<IntWritable, FloatWritable> map, List<IntWritable> keys) {
double sum = 0d;
for (IntWritable k : keys) {
FloatWritable v = map.get(k);
if (v != null) {
sum += (double) v.get();
}
}
return val(sum);
}
/** {@inheritDoc} */
@Override public void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException,
InterruptedException {
while (reduceLockFile.exists())
Thread.sleep(50);
int wordCnt = 0;
for (IntWritable value : values)
wordCnt += value.get();
totalWordCnt.set(wordCnt);
ctx.write(key, totalWordCnt);
}
@Override
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
if (key.equals(OPEN_EXECTIME)){
executionTime[OPEN] = sum;
} else if (key.equals(NUMOPS_OPEN)){
numOfOps[OPEN] = sum;
} else if (key.equals(LIST_EXECTIME)){
executionTime[LIST] = sum;
} else if (key.equals(NUMOPS_LIST)){
numOfOps[LIST] = sum;
} else if (key.equals(DELETE_EXECTIME)){
executionTime[DELETE] = sum;
} else if (key.equals(NUMOPS_DELETE)){
numOfOps[DELETE] = sum;
} else if (key.equals(CREATE_EXECTIME)){
executionTime[CREATE] = sum;
} else if (key.equals(NUMOPS_CREATE)){
numOfOps[CREATE] = sum;
} else if (key.equals(WRITE_CLOSE_EXECTIME)){
System.out.println(WRITE_CLOSE_EXECTIME + " = " + sum);
executionTime[WRITE_CLOSE]= sum;
} else if (key.equals(NUMOPS_WRITE_CLOSE)){
numOfOps[WRITE_CLOSE] = sum;
} else if (key.equals(TOTALOPS)){
totalOps = sum;
} else if (key.equals(ELAPSED_TIME)){
totalTime = sum;
}
result.set(sum);
output.collect(key, result);
// System.out.println("Key = " + key + " Sum is =" + sum);
// printResults(System.out);
}
@Override
public void collect(Configuration conf, BaseDimension key, BaseStatsValueWritable value, PreparedStatement pstmt, IDimensionConverter converter) throws SQLException, IOException {
StatsUserDimension statsUser = (StatsUserDimension) key;
MapWritableValue mapWritableValue = (MapWritableValue) value;
int i = 0;
// 设置参数
switch (mapWritableValue.getKpi()) {
case NEW_MEMBER: // 统计new member的kpi
IntWritable v1 = (IntWritable) mapWritableValue.getValue().get(new IntWritable(-1));
pstmt.setInt(++i, converter.getDimensionIdByValue(statsUser.getStatsCommon().getPlatform()));
pstmt.setInt(++i, converter.getDimensionIdByValue(statsUser.getStatsCommon().getDate()));
pstmt.setInt(++i, v1.get());
pstmt.setString(++i, conf.get(GlobalConstants.RUNNING_DATE_PARAMES));
pstmt.setInt(++i, v1.get());
break;
case BROWSER_NEW_MEMBER: // 统计browser new member 的kpi
IntWritable v2 = (IntWritable) mapWritableValue.getValue().get(new IntWritable(-1));
pstmt.setInt(++i, converter.getDimensionIdByValue(statsUser.getStatsCommon().getPlatform()));
pstmt.setInt(++i, converter.getDimensionIdByValue(statsUser.getStatsCommon().getDate()));
pstmt.setInt(++i, converter.getDimensionIdByValue(statsUser.getBrowser()));
pstmt.setInt(++i, v2.get());
pstmt.setString(++i, conf.get(GlobalConstants.RUNNING_DATE_PARAMES));
pstmt.setInt(++i, v2.get());
break;
case INSERT_MEMBER_INFO: // 插入member info信息
Text v3 = (Text) mapWritableValue.getValue().get(new IntWritable(-1));
pstmt.setString(++i, v3.toString());
pstmt.setString(++i, conf.get(GlobalConstants.RUNNING_DATE_PARAMES));
pstmt.setString(++i, conf.get(GlobalConstants.RUNNING_DATE_PARAMES));
pstmt.setString(++i, conf.get(GlobalConstants.RUNNING_DATE_PARAMES));
break;
default:
throw new RuntimeException("不支持该种kpi输出操作" + mapWritableValue.getKpi());
}
// 添加batch
pstmt.addBatch();
}
public void reduce(IntWritable key, Iterator<IntWritable> it,
OutputCollector<IntWritable, IntWritable> out,
Reporter reporter) throws IOException {
while (it.hasNext()) {
out.collect(it.next(), null);
}
}
public int run(String[] args) throws Exception {
GfxdDataSerializable.initTypes();
JobConf conf = new JobConf(getConf());
conf.setJobName("Busy Airport Count");
Path outputPath = new Path(args[0]);
String hdfsHomeDir = args[1];
String tableName = args[2];
outputPath.getFileSystem(conf).delete(outputPath, true);
conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
conf.set(RowInputFormat.INPUT_TABLE, tableName);
conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
conf.setInputFormat(RowInputFormat.class);
conf.setMapperClass(SampleMapper.class);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(IntWritable.class);
conf.setReducerClass(SampleReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
FileOutputFormat.setOutputPath(conf, outputPath);
JobClient.runJob(conf);
return 0;
}
@Override
public IntWritable reduce(IntWritable k1, IntWritable k2) {
if (k1 != null && k2 != null) {
k1.set(k1.get() + k2.get());
}
return k1;
}
private void handleBrowserSessions(StatsUserDimension key, Iterable<TimeOutputValue> values,
Context context)
throws IOException, InterruptedException {
//开始计算memberId的个数
for (TimeOutputValue value : values) {
// this.unique.add(value.getId());
TimeChain chain = this.timeChainMap.get(value.getId());
if (chain == null) {
chain = new TimeChain(value.getTime())
;
this.timeChainMap.put(value.getId(), chain);//保存
}
chain.addTime(value.getTime());
}
//计算间隔秒数
int sessionsLength = 0;
// 1计算间隔毫秒数
for (Map.Entry<String, TimeChain> entry : this.timeChainMap.entrySet()) {
long tmp = entry.getValue().getTimeOfMillis();
if (tmp < 0 || tmp > GlobalConstants.DAY_OF_MILLISECONDS) {
continue;//如果计算的值小于0 或者大于一天的毫秒数,直接过滤
}
sessionsLength += tmp;
}
//2计算间隔秒数
if (sessionsLength % 1000 == 0) {
sessionsLength = sessionsLength / 1000;
} else {
sessionsLength = sessionsLength / 1000 + 1;
}
//设置value
this.map.put(new IntWritable(-1), new IntWritable(this.timeChainMap.size()));
this.map.put(new IntWritable(-2), new IntWritable(sessionsLength));
outputValue.setValue(this.map);
//设置kpi
outputValue.setKpi(KpiType.BROWSER_SESSIONS);
context.write(key, outputValue);
}
private static void parseLine(@Nonnull String line, @Nonnull IntWritable user,
@Nonnull IntWritable posItem, @Nonnull IntWritable negItem) {
String[] cols = StringUtils.split(line, ' ');
Assert.assertEquals(3, cols.length);
user.set(Integer.parseInt(cols[0]));
posItem.set(Integer.parseInt(cols[1]));
negItem.set(Integer.parseInt(cols[2]));
}
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, one);
}
}
static private IntWritable deduceInputFile(JobConf job) {
Path[] inputPaths = FileInputFormat.getInputPaths(job);
Path inputFile = new Path(job.get(JobContext.MAP_INPUT_FILE));
// value == one for sort-input; value == two for sort-output
return (inputFile.getParent().equals(inputPaths[0])) ?
sortInput : sortOutput;
}
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
for (Map.Entry<Integer, List<VectorWritable>> entry : referencePoints.entrySet()) {
IntWritable iw = new IntWritable(entry.getKey());
for (VectorWritable vw : entry.getValue()) {
context.write(iw, vw);
}
}
super.cleanup(context);
}
@SuppressWarnings("unchecked")
@Test
public void testMapperOnComma() throws IOException {
mapDriver.clearInput();
LongWritable inputKey1 = new LongWritable(1);
LongWritable inputKey2 = new LongWritable(2);
LongWritable inputKey3 = new LongWritable(3);
LongWritable inputKey4 = new LongWritable(4);
LongWritable inputKey5 = new LongWritable(5);
LongWritable inputKey6 = new LongWritable(6);
LongWritable inputKey7 = new LongWritable(7);
mapDriver.addInput(inputKey1, new Text());
mapDriver.addInput(inputKey2, new Text(strArr));
mapDriver.addInput(inputKey3, new Text(strArr));
mapDriver.addInput(inputKey4, new Text(strArr));
mapDriver.addInput(inputKey5, new Text(strArr));
mapDriver.addInput(inputKey6, new Text(strArr));
mapDriver.addInput(inputKey7, new Text(strArr));
List<Pair<IntWritable, BytesWritable>> result = mapDriver.run();
assertEquals(9, result.size());
int key1 = result.get(0).getFirst().get();
BytesWritable value1 = result.get(0).getSecond();
byte[] bytes = value1.getBytes();
HyperLogLogPlusCounter hllc = new HyperLogLogPlusCounter();
hllc.readRegisters(ByteBuffer.wrap(bytes));
System.out.println("ab\177ab".length());
assertTrue(key1 > 0);
assertEquals(1, hllc.getCountEstimate());
}
public void reduce(IntWritable key,Iterable<VectorOrPrefWritable> values ,Context context ) throws IOException, InterruptedException{
for(VectorOrPrefWritable va:values){
context.write(key, va);
System.err.println("key"+key.toString()+",vlaue"+va);
}
}