下面列出了怎么用org.apache.hadoop.io.LongWritable的API类实例代码及写法,或者点击链接到github查看源代码。
public static void writeTest(FileSystem fs, boolean fastCheck)
throws Exception {
fs.delete(DATA_DIR, true);
fs.delete(WRITE_DIR, true);
JobConf job = new JobConf(conf, TestFileSystem.class);
job.setBoolean("fs.test.fastCheck", fastCheck);
FileInputFormat.setInputPaths(job, CONTROL_DIR);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(WriteMapper.class);
job.setReducerClass(LongSumReducer.class);
FileOutputFormat.setOutputPath(job, WRITE_DIR);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
@Override
public void configure(JobConf job) {
this.conf = job;
taskPartition = conf.getInt("mapred.task.partition", -1);
int startingSeed = conf.getInt(SEED, -1) + taskPartition;
random = new Random(startingSeed);
LOG.info("Starting with seed " + startingSeed +
" on partition " + taskPartition);
numKeysPerMapper = conf.getInt(NUM_KEYS_PER_MAPPER, -1);
numValuesPerKey = conf.getInt(NUM_VALUES_PER_KEY, -1);
numMappers = conf.getNumMapTasks();
numReducers = conf.getInt("mapred.reduce.tasks", -1);
maxKeySpace = conf.getInt(MAX_KEY_SPACE, DEFAULT_MAX_KEY_SPACE);
chanceFailure = conf.getFloat(CHANCE_FAILURE, 0.0f);
if (numKeysPerMapper == -1 || numValuesPerKey == -1 || numReducers == -1
|| maxKeySpace == -1) {
throw new IllegalArgumentException(
"Illegal values " + numKeysPerMapper + " " + numValuesPerKey +
" " + numReducers + " " + maxKeySpace);
}
for (int i = 0; i < numMappers; ++i) {
mapperSumList.add(new LongWritable(0));
expectedMapperSumList.add(new LongWritable(-1));
}
}
@Override
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
ClassificationAUCAggregationBuffer myAggr = (ClassificationAUCAggregationBuffer) agg;
Object[] partialResult = new Object[11];
partialResult[0] = new DoubleWritable(myAggr.indexScore);
partialResult[1] = new DoubleWritable(myAggr.area);
partialResult[2] = new LongWritable(myAggr.fp);
partialResult[3] = new LongWritable(myAggr.tp);
partialResult[4] = new LongWritable(myAggr.fpPrev);
partialResult[5] = new LongWritable(myAggr.tpPrev);
partialResult[6] = myAggr.areaPartialMap;
partialResult[7] = myAggr.fpPartialMap;
partialResult[8] = myAggr.tpPartialMap;
partialResult[9] = myAggr.fpPrevPartialMap;
partialResult[10] = myAggr.tpPrevPartialMap;
return partialResult;
}
/** Read a line. */
public synchronized boolean next(LongWritable key, Text value)
throws IOException {
while (pos < end) {
key.set(pos);
int newSize = in.readLine(value, maxLineLength,
Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
maxLineLength));
if (newSize == 0) {
return false;
}
pos += newSize;
if (newSize < maxLineLength) {
return true;
}
// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
}
return false;
}
@Override
public void run() {
for (int i = 0; i < count; i++) {
try {
LongWritable param = new LongWritable(RANDOM.nextLong());
LongWritable value =
(LongWritable)client.call(param, server, null, null, 0, conf);
if (!param.equals(value)) {
LOG.fatal("Call failed!");
failed = true;
break;
}
} catch (Exception e) {
LOG.fatal("Caught: " + StringUtils.stringifyException(e));
failed = true;
}
}
}
@Test
public void TestMapper() throws IOException {
try {
String propertiesFilePath = LindenMapredTest.class.getClassLoader().getResource("linden.properties").getFile();
Files.copy(new File(propertiesFilePath).toPath(), Paths.get("lindenProperties"), StandardCopyOption.REPLACE_EXISTING);
String schemaFilePath = LindenMapredTest.class.getClassLoader().getResource("schema.xml").getFile();
Files.copy(new File(schemaFilePath).toPath(), Paths.get("lindenSchema"), StandardCopyOption.REPLACE_EXISTING);
String json = "{\"id\":0,\"groupid\":\"0\",\"tags\":\"hybrid,leather,moon-roof,reliable\",\"category\":\"compact\",\"mileage\":14900,\"price\":7500,\"contents\":\"yellow compact hybrid leather moon-roof reliable u.s.a. florida tampa asian acura 1.6el \",\"color\":\"yellow\",\"year\":1994,\"makemodel\":\"asian/acura/1.6el\",\"city\":\"u.s.a./florida/tampa\"}";
mDriver.withInput(new LongWritable(1L), new Text(json.getBytes()));
mDriver.run();
} catch (Exception e) {
e.printStackTrace();
Assert.assertTrue(false);
} finally {
FileUtils.deleteQuietly(Paths.get("lindenProperties").toFile());
FileUtils.deleteQuietly(Paths.get("lindenSchema").toFile());
}
}
public void map(LongWritable key, Text value,
OutputCollector<LongWritable, Text> output,
Reporter reporter)
throws IOException {
if (ioEx) {
throw new IOException();
}
if (rtEx) {
throw new RuntimeException();
}
output.collect(key, value);
try {
Thread.sleep(100);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
public void testStandAloneClient() throws Exception {
testParallel(10, false, 2, 4, 2, 4, 100);
Client client = new Client(LongWritable.class, conf);
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
try {
client.call(new LongWritable(RANDOM.nextLong()),
address, null, null, 0);
fail("Expected an exception to have been thrown");
} catch (IOException e) {
String message = e.getMessage();
String addressText = address.toString();
assertTrue("Did not find "+addressText+" in "+message,
message.contains(addressText));
Throwable cause=e.getCause();
assertNotNull("No nested exception in "+e,cause);
String causeText=cause.getMessage();
assertTrue("Did not find " + causeText + " in " + message,
message.contains(causeText));
}
}
/**
* Test with record length set to 0
*/
@Test (timeout=5000)
public void testZeroRecordLength() throws IOException {
localFs.delete(workDir, true);
Path file = new Path(workDir, new String("testFormat.txt"));
createFile(file, null, 10, 10);
// Set the fixed length record length config property
JobConf job = new JobConf(defaultConf);
FileInputFormat.setInputPaths(job, workDir);
FixedLengthInputFormat format = new FixedLengthInputFormat();
format.setRecordLength(job, 0);
format.configure(job);
InputSplit splits[] = format.getSplits(job, 1);
boolean exceptionThrown = false;
for (InputSplit split : splits) {
try {
RecordReader<LongWritable, BytesWritable> reader =
format.getRecordReader(split, job, voidReporter);
} catch(IOException ioe) {
exceptionThrown = true;
LOG.info("Exception message:" + ioe.getMessage());
}
}
assertTrue("Exception for zero record length:", exceptionThrown);
}
/**
* Map file name and offset into statistical data.
* <p>
* The map task is to get the
* <tt>key</tt>, which contains the file name, and the
* <tt>value</tt>, which is the offset within the file.
*
* The parameters are passed to the abstract method
* {@link #doIO(Reporter,String,long)}, which performs the io operation,
* usually read or write data, and then
* {@link #collectStats(OutputCollector,String,long,Object)}
* is called to prepare stat data for a subsequent reducer.
*/
public void map(Text key,
LongWritable value,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
String name = key.toString();
long longValue = value.get();
reporter.setStatus("starting " + name + " ::host = " + hostName);
this.stream = getIOStream(name);
T statValue = null;
long tStart = System.currentTimeMillis();
try {
statValue = doIO(reporter, name, longValue);
} finally {
if(stream != null) stream.close();
}
long tEnd = System.currentTimeMillis();
long execTime = tEnd - tStart;
collectStats(output, name, execTime, statValue);
reporter.setStatus("finished " + name + " ::host = " + hostName);
}
/**
* Create control files before a test run.
* Number of files created is equal to the number of maps specified
*
* @throws IOException on error
*/
private static void createControlFiles(
Configuration config
) throws IOException {
FileSystem tempFS = FileSystem.get(config);
LOG.info("Creating " + numberOfMaps + " control files");
for (int i = 0; i < numberOfMaps; i++) {
String strFileName = "NNBench_Controlfile_" + i;
Path filePath = new Path(new Path(baseDir, CONTROL_DIR_NAME),
strFileName);
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(tempFS, config, filePath, Text.class,
LongWritable.class, CompressionType.NONE);
writer.append(new Text(strFileName), new LongWritable(0l));
} finally {
if (writer != null) {
writer.close();
}
}
}
}
private void report(Context output, IRI property, Value partitionId, long value) throws IOException, InterruptedException {
if (value > 0 && (graphContext == null || graphContext.equals(graph))) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (DataOutputStream dos = new DataOutputStream(baos)) {
dos.writeUTF(graph.stringValue());
dos.writeUTF(property.stringValue());
if (partitionId == null) {
dos.writeInt(0);
} else {
byte b[] = HalyardTableUtils.writeBytes(partitionId);
dos.writeInt(b.length);
dos.write(b);
}
}
output.write(new ImmutableBytesWritable(baos.toByteArray()), new LongWritable(value));
}
}
@Override
public Long call() throws Exception {
RecordReader<LongWritable, Text> reader = _inputFormat.getRecordReader(_split, _jobConf, Reporter.NULL);
LongWritable key = new LongWritable();
Text value = new Text();
long nrows = 0;
try{
// count rows from the first non-header row
if (_hasHeader)
reader.next(key, value);
while (reader.next(key, value))
nrows++;
}
finally {
IOUtilFunctions.closeSilently(reader);
}
return nrows;
}
public void reduce(LongWritable key, Iterator<Text> values,
OutputCollector<LongWritable, Text> output,
Reporter reporter) throws IOException {
while (values.hasNext()) {
Text value = values.next();
writeFlag(conf, "reduce." + name + ".value." + value);
key.set(10);
output.collect(key, value);
if (byValue) {
assertEquals(10, key.get());
} else {
assertNotSame(10, key.get());
}
key.set(11);
}
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] parts = value.toString().split("\\t");
// Processing Upper Triangular Matrix's rows
if (this.upper && !parts[0].contains(",")) {
context.write(new TextPair(parts[0],""), new Text(parts[1]));
}
// Processing Lower Triangular Matrix's rows
if (!this.upper && parts[0].contains(",")) {
String[] rowCol = parts[0].split(",");
String row = rowCol[0];
// Sending first row of Lower Triangular Matrix to the reducer
if (Integer.valueOf(row)-1 == 0) {
for (int i = 0; i < this.total_records; i++) {
context.write(new TextPair("0",String.valueOf(i)), new Text(i+","+((i == 0) ? 1 : 0)));
}
}
String column = rowCol[1];
String element = parts[1];
context.write(new TextPair(row, column), new Text(column+","+element));
}
}
@Override
public byte[] call(final byte[] memento) throws Exception {
LOG.log(Level.FINER, "LineCounting task started");
int numEx = 0;
for (final Pair<LongWritable, Text> keyValue : dataSet) {
// LOG.log(Level.FINEST, "Read line: {0}", keyValue);
++numEx;
}
LOG.log(Level.FINER, "LineCounting task finished: read {0} lines", numEx);
return Integer.toString(numEx).getBytes(StandardCharsets.UTF_8);
}
protected LongWritable narrowToLong(JsonParser parser)
throws IOException {
switch (parser.getCurrentToken()) {
case VALUE_NUMBER_INT:
return new LongWritable(parser.getLongValue());
case VALUE_NUMBER_FLOAT:
return new LongWritable((long) parser.getFloatValue());
default:
return null;
}
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
try {
LogGenericWritable parsedLog = parseLog(value.toString());
context.write(key, parsedLog);
} catch (ParseException e) {
e.printStackTrace();
}
}
private void writeMapFileOutput(RecordWriter theRecordWriter,
TaskAttemptContext context) throws IOException, InterruptedException {
try {
int key = 0;
for (int i = 0 ; i < 10; ++i) {
key = i;
Text val = (i%2 == 1) ? val1 : val2;
theRecordWriter.write(new LongWritable(key),
val);
}
} finally {
theRecordWriter.close(null);
}
}
public void map(
LongWritable k,
Text val,
OutputCollector<Text, Text> oc,
Reporter reporter) throws IOException {
// Pull the key out
List<Text> line = Library.splitLine(val, '');
// Prepend an index to the value so we know which file
// it came from.
oc.collect(line.get(0), new Text("2"));
}
public void testComplexNameWithRegex() throws Exception {
OutputStream os = getFileSystem().create(new Path(getInputDir(),
"text.txt"));
Writer wr = new OutputStreamWriter(os);
wr.write("b a\n");
wr.close();
JobConf conf = createJobConf();
conf.setJobName("name \\Evalue]");
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(IdentityMapper.class);
FileInputFormat.setInputPaths(conf, getInputDir());
FileOutputFormat.setOutputPath(conf, getOutputDir());
JobClient.runJob(conf);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
new Utils.OutputFileUtils.OutputFilesFilter()));
assertEquals(1, outputFiles.length);
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
assertEquals("0\tb a", reader.readLine());
assertNull(reader.readLine());
reader.close();
}
@Test(expected=IOException.class)
public void testInputParserWithMalformedLineAndDelimiter() throws IOException, InterruptedException {
String input = "1,";
when(rr.getCurrentValue()).thenReturn(new Text(input));
EdgeReader ter = createEdgeReader(rr);
ter.setConf(conf);
ter.initialize(null, tac);
assertEquals(ter.getCurrentSourceId(), new Text("1"));
assertEquals(ter.getCurrentEdge().getTargetVertexId(), new Text());
assertEquals(ter.getCurrentEdge().getValue(), new LongWritable(1L));
}
/**
* Accumulate number of points inside/outside results from the mappers.
* @param isInside Is the points inside?
* @param values An iterator to a list of point counts
* @param output dummy, not used here.
* @param reporter
*/
public void reduce(BooleanWritable isInside,
Iterator<LongWritable> values,
Context context) throws IOException {
if (isInside.get()) {
for(; values.hasNext(); numInside += values.next().get());
} else {
for(; values.hasNext(); numOutside += values.next().get());
}
}
public void reduce(KEY key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
long sum = 0;
for (LongWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//"{\"name\":\"Michael\",\"blog\":\"micmiu.com\"}"
String line = value.toString();
LOGGER.info(">>>> log mapper line = " + line);
try {
JSONObject jsonObj = JSON.parseObject(line);
if (jsonObj.containsKey("name")) {
name.set(jsonObj.getString("name"));
context.write(name, one);
}
} catch (Exception e) {
LOGGER.error("map error", e);
}
}
private void runMapReduce(JobConf conf,
List<String> mapperBadRecords, List<String> redBadRecords)
throws Exception {
createInput();
conf.setJobName("mr");
conf.setNumMapTasks(1);
conf.setNumReduceTasks(1);
conf.setInt(JobContext.TASK_TIMEOUT, 30*1000);
SkipBadRecords.setMapperMaxSkipRecords(conf, Long.MAX_VALUE);
SkipBadRecords.setReducerMaxSkipGroups(conf, Long.MAX_VALUE);
SkipBadRecords.setAttemptsToStartSkipping(conf,0);
//the no of attempts to successfully complete the task depends
//on the no of bad records.
conf.setMaxMapAttempts(SkipBadRecords.getAttemptsToStartSkipping(conf)+1+
mapperBadRecords.size());
conf.setMaxReduceAttempts(SkipBadRecords.getAttemptsToStartSkipping(conf)+
1+redBadRecords.size());
FileInputFormat.setInputPaths(conf, getInputDir());
FileOutputFormat.setOutputPath(conf, getOutputDir());
conf.setInputFormat(TextInputFormat.class);
conf.setMapOutputKeyClass(LongWritable.class);
conf.setMapOutputValueClass(Text.class);
conf.setOutputFormat(TextOutputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
RunningJob runningJob = JobClient.runJob(conf);
validateOutput(conf, runningJob, mapperBadRecords, redBadRecords);
}
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] tokens = value.toString().trim().split("\t");
if (3 > tokens.length) {
throw new IllegalArgumentException("Expected 4 cols: got " + tokens.length + " from line: " + tokens.toString());
}
Text outKey = new Text(tokens[1]); // group by community
String edgeListStr = (tokens.length == 3) ? "" : tokens[3];
LouvainVertexWritable outValue = LouvainVertexWritable.fromTokens(tokens[2], edgeListStr);
context.write(outKey, outValue);
}
private static List<Text> readSplit(TextInputFormat format,
InputSplit split,
JobConf job) throws IOException {
List<Text> result = new ArrayList<Text>();
RecordReader<LongWritable, Text> reader =
format.getRecordReader(split, job, voidReporter);
LongWritable key = reader.createKey();
Text value = reader.createValue();
while (reader.next(key, value)) {
result.add(value);
value = reader.createValue();
}
reader.close();
return result;
}
@Override
public void reduce(
final LongWritable key,
final Iterable<DoubleWritable> values,
final Context context) throws IOException, InterruptedException {
double s = 0.0;
for (final DoubleWritable value : values) {
s += value.get();
}
context.write(key, new DoubleWritable(s));
}
private LinkedHashMap<LongWritable, Text> createSplits(int splitCount, Path workDir,
Configuration conf, AtomicLong totalSize) throws Exception {
LinkedHashMap<LongWritable, Text> data = new LinkedHashMap<LongWritable, Text>();
for (int i = 0; i < splitCount; ++i) {
int start = i * 10;
int end = start + 10;
data.putAll(createInputData(localFs, workDir, conf, "file" + i, start, end, totalSize));
}
return data;
}