org.apache.hadoop.mapred.Reporter#setStatus ( )源码实例Demo

下面列出了org.apache.hadoop.mapred.Reporter#setStatus ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hadoop   文件: TestDFSIO.java
@Override // IOMapperBase
public Long doIO(Reporter reporter, 
                   String name, 
                   long totalSize // in bytes
                 ) throws IOException {
  OutputStream out = (OutputStream)this.stream;
  // write to the file
  long nrRemaining;
  for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
    int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining;
    out.write(buffer, 0, curSize);
    reporter.setStatus("writing " + name + "@" + 
                       (totalSize - nrRemaining) + "/" + totalSize 
                       + " ::host = " + hostName);
  }
  return Long.valueOf(totalSize);
}
 
源代码2 项目: hadoop   文件: TestDFSIO.java
@Override // IOMapperBase
public Long doIO(Reporter reporter, 
                   String name, 
                   long totalSize // in bytes
                 ) throws IOException {
  OutputStream out = (OutputStream)this.stream;
  // write to the file
  long nrRemaining;
  for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
    int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining;
    out.write(buffer, 0, curSize);
    reporter.setStatus("writing " + name + "@" + 
                       (totalSize - nrRemaining) + "/" + totalSize 
                       + " ::host = " + hostName);
  }
  return Long.valueOf(totalSize);
}
 
源代码3 项目: hadoop-book   文件: RandomWriter.java
/**
 * Given an output filename, write a bunch of random records to it.
 */
public void map(WritableComparable key,
        Writable value,
        OutputCollector<BytesWritable, BytesWritable> output,
        Reporter reporter) throws IOException {
    int itemCount = 0;
    while (numBytesToWrite > 0) {
        int keyLength = minKeySize
                + (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
        randomKey.setSize(keyLength);
        randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
        int valueLength = minValueSize
                + (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
        randomValue.setSize(valueLength);
        randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
        output.collect(randomKey, randomValue);
        numBytesToWrite -= keyLength + valueLength;
        reporter.incrCounter(Counters.BYTES_WRITTEN, keyLength + valueLength);
        reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
        if (++itemCount % 200 == 0) {
            reporter.setStatus("wrote record " + itemCount + ". "
                    + numBytesToWrite + " bytes left.");
        }
    }
    reporter.setStatus("done with " + itemCount + " records.");
}
 
源代码4 项目: hadoop-gpu   文件: IOMapperBase.java
/**
 * Map file name and offset into statistical data.
 * <p>
 * The map task is to get the 
 * <tt>key</tt>, which contains the file name, and the 
 * <tt>value</tt>, which is the offset within the file.
 * 
 * The parameters are passed to the abstract method 
 * {@link #doIO(Reporter,String,long)}, which performs the io operation, 
 * usually read or write data, and then 
 * {@link #collectStats(OutputCollector,String,long,Object)} 
 * is called to prepare stat data for a subsequent reducer.
 */
public void map(UTF8 key, 
                LongWritable value,
                OutputCollector<UTF8, UTF8> output, 
                Reporter reporter) throws IOException {
  String name = key.toString();
  long longValue = value.get();
  
  reporter.setStatus("starting " + name + " ::host = " + hostName);
  
  long tStart = System.currentTimeMillis();
  Object statValue = doIO(reporter, name, longValue);
  long tEnd = System.currentTimeMillis();
  long execTime = tEnd - tStart;
  collectStats(output, name, execTime, statValue);
  
  reporter.setStatus("finished " + name + " ::host = " + hostName);
}
 
源代码5 项目: big-c   文件: DistCh.java
/** Run a FileOperation */
public void map(Text key, FileOperation value,
    OutputCollector<WritableComparable<?>, Text> out, Reporter reporter
    ) throws IOException {
  try {
    value.run(jobconf);
    ++succeedcount;
    reporter.incrCounter(Counter.SUCCEED, 1);
  } catch (IOException e) {
    ++failcount;
    reporter.incrCounter(Counter.FAIL, 1);

    String s = "FAIL: " + value + ", " + StringUtils.stringifyException(e);
    out.collect(null, new Text(s));
    LOG.info(s);
  } finally {
    reporter.setStatus(getCountString());
  }
}
 
源代码6 项目: RDFS   文件: DistCh.java
/** Run a FileOperation */
public void map(Text key, FileOperation value,
    OutputCollector<WritableComparable<?>, Text> out, Reporter reporter
    ) throws IOException {
  try {
    value.run(jobconf);
    ++succeedcount;
    reporter.incrCounter(Counter.SUCCEED, 1);
  } catch (IOException e) {
    ++failcount;
    reporter.incrCounter(Counter.FAIL, 1);

    String s = "FAIL: " + value + ", " + StringUtils.stringifyException(e);
    out.collect(null, new Text(s));
    LOG.info(s);
  } finally {
    reporter.setStatus(getCountString());
  }
}
 
源代码7 项目: spork   文件: L8.java
public void reduce(
        Text key,
        Iterator<Text> iter, 
        OutputCollector<Text, Text> oc,
        Reporter reporter) throws IOException {
    int tsSum = 0, erCnt = 0;
    double erSum = 0.0;
    while (iter.hasNext()) {
        List<Text> vals = Library.splitLine(iter.next(), '');
        try {
                tsSum += Integer.valueOf(vals.get(0).toString());
             erSum += Double.valueOf(vals.get(1).toString());
             erCnt += Integer.valueOf(vals.get(2).toString());
        	                    
        } catch (NumberFormatException nfe) {
        }
    }
    double erAvg = erSum / erCnt;
    StringBuffer sb = new StringBuffer();
    sb.append((new Integer(tsSum)).toString());
    sb.append("");
    sb.append((new Double(erAvg)).toString());
    oc.collect(key, new Text(sb.toString()));
    reporter.setStatus("OK");
}
 
源代码8 项目: spork   文件: L14.java
public void reduce(
        Text key,
        Iterator<Text> iter, 
        OutputCollector<Text, Text> oc,
        Reporter reporter) throws IOException {
    // For each value, figure out which file it's from and store it
    // accordingly.
    List<String> first = new ArrayList<String>();
    List<String> second = new ArrayList<String>();

    while (iter.hasNext()) {
        Text t = iter.next();
        String value = t.toString();
        if (value.charAt(0) == '1') first.add(value.substring(1));
        else second.add(value.substring(1));
        reporter.setStatus("OK");
    }

    reporter.setStatus("OK");

    if (first.size() == 0 || second.size() == 0) return;

    // Do the cross product, and calculate the sum
    for (String s1 : first) {
        for (String s2 : second) {
            try {
                oc.collect(null, new Text(key.toString() + "\t" + s1 + "\t" + key.toString()));
            } catch (NumberFormatException nfe) {
            }
        }
    }
}
 
源代码9 项目: hadoop-gpu   文件: NLineInputFormat.java
public RecordReader<LongWritable, Text> getRecordReader(
                                          InputSplit genericSplit,
                                          JobConf job,
                                          Reporter reporter) 
throws IOException {
  reporter.setStatus(genericSplit.toString());
  return new LineRecordReader(job, (FileSplit) genericSplit);
}
 
源代码10 项目: RDFS   文件: TestFileSystem.java
public void map(UTF8 key, LongWritable value,
                OutputCollector<UTF8, LongWritable> collector,
                Reporter reporter)
  throws IOException {
  
  String name = key.toString();
  long size = value.get();
  long seed = Long.parseLong(name);

  random.setSeed(seed);
  reporter.setStatus("creating " + name);

  // write to temp file initially to permit parallel execution
  Path tempFile = new Path(DATA_DIR, name+suffix);
  OutputStream out = fs.create(tempFile);

  long written = 0;
  try {
    while (written < size) {
      if (fastCheck) {
        Arrays.fill(buffer, (byte)random.nextInt(Byte.MAX_VALUE));
      } else {
        random.nextBytes(buffer);
      }
      long remains = size - written;
      int length = (remains<=buffer.length) ? (int)remains : buffer.length;
      out.write(buffer, 0, length);
      written += length;
      reporter.setStatus("writing "+name+"@"+written+"/"+size);
    }
  } finally {
    out.close();
  }
  // rename to final location
  fs.rename(tempFile, new Path(DATA_DIR, name));

  collector.collect(new UTF8("bytes"), new LongWritable(written));

  reporter.setStatus("wrote " + name);
}
 
源代码11 项目: RDFS   文件: RandomTextWriter.java
/**
 * Given an output filename, write a bunch of random records to it.
 */
public void map(Text key, Text value,
                OutputCollector<Text, Text> output, 
                Reporter reporter) throws IOException {
  int itemCount = 0;
  while (numBytesToWrite > 0) {
    // Generate the key/value 
    int noWordsKey = minWordsInKey + 
      (wordsInKeyRange != 0 ? random.nextInt(wordsInKeyRange) : 0);
    int noWordsValue = minWordsInValue + 
      (wordsInValueRange != 0 ? random.nextInt(wordsInValueRange) : 0);
    Text keyWords = generateSentence(noWordsKey);
    Text valueWords = generateSentence(noWordsValue);
    
    // Write the sentence 
    output.collect(keyWords, valueWords);
    
    numBytesToWrite -= (keyWords.getLength() + valueWords.getLength());
    
    // Update counters, progress etc.
    reporter.incrCounter(Counters.BYTES_WRITTEN, 
                         (keyWords.getLength()+valueWords.getLength()));
    reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
    if (++itemCount % 200 == 0) {
      reporter.setStatus("wrote record " + itemCount + ". " + 
                         numBytesToWrite + " bytes left.");
    }
  }
  reporter.setStatus("done with " + itemCount + " records.");
}
 
源代码12 项目: hadoop   文件: NLineInputFormat.java
public RecordReader<LongWritable, Text> getRecordReader(
                                          InputSplit genericSplit,
                                          JobConf job,
                                          Reporter reporter) 
throws IOException {
  reporter.setStatus(genericSplit.toString());
  return new LineRecordReader(job, (FileSplit) genericSplit);
}
 
源代码13 项目: RDFS   文件: NNBench.java
/**
 * Open operation
 * @param name of the prefix of the putput file to be read
 * @param reporter an instanse of (@link Reporter) to be used for
 *   status' updates
 */
private void doOpenReadOp(String name,
        Reporter reporter) {
  FSDataInputStream input;
  byte[] buffer = new byte[bytesToWrite];
  
  for (long l = 0l; l < numberOfFiles; l++) {
    Path filePath = new Path(new Path(baseDir, dataDirName), 
            name + "_" + l);

    boolean successfulOp = false;
    while (! successfulOp && numOfExceptions < MAX_OPERATION_EXCEPTIONS) {
      try {
        // Set up timer for measuring AL
        startTimeAL = System.currentTimeMillis();
        input = filesystem.open(filePath);
        totalTimeAL1 += (System.currentTimeMillis() - startTimeAL);
        
        // If the file needs to be read (specified at command line)
        if (readFile) {
          startTimeAL = System.currentTimeMillis();
          input.readFully(buffer);

          totalTimeAL2 += (System.currentTimeMillis() - startTimeAL);
        }
        input.close();
        successfulOp = true;
        successfulFileOps ++;

        reporter.setStatus("Finish "+ l + " files");
      } catch (IOException e) {
        LOG.info("Exception recorded in op: OpenRead " + e);
        numOfExceptions++;
      }
    }
  }
}
 
源代码14 项目: RDFS   文件: NNBench.java
/**
 * Rename operation
 * @param name of prefix of the file to be renamed
 * @param reporter an instanse of (@link Reporter) to be used for
 *   status' updates
 */
private void doRenameOp(String name,
        Reporter reporter) {
  for (long l = 0l; l < numberOfFiles; l++) {
    Path filePath = new Path(new Path(baseDir, dataDirName), 
            name + "_" + l);
    Path filePathR = new Path(new Path(baseDir, dataDirName), 
            name + "_r_" + l);

    boolean successfulOp = false;
    while (! successfulOp && numOfExceptions < MAX_OPERATION_EXCEPTIONS) {
      try {
        // Set up timer for measuring AL
        startTimeAL = System.currentTimeMillis();
        filesystem.rename(filePath, filePathR);
        totalTimeAL1 += (System.currentTimeMillis() - startTimeAL);
        
        successfulOp = true;
        successfulFileOps ++;

        reporter.setStatus("Finish "+ l + " files");
      } catch (IOException e) {
        LOG.info("Exception recorded in op: Rename");

        numOfExceptions++;
      }
    }
  }
}
 
/**
 * @param frame
 * @param intTokNums
 * @param data
 * @param reporter
 * @return
 * @see LRIdentificationModelSingleNode#getFormulaForFrame(String, int[], String[][])
 */
protected LogFormula getFormulaForFrame(String frame, int[] intTokNums, String[][] data, Reporter reporter)	
{
	THashSet<String> hiddenUnits = mFrameMap.get(frame);
	LogFormula result = getFormulaObject(LogFormula.Op.PLUS);
	DependencyParse parse = DependencyParse.processFN(data, 0.0);
	for (String unit : hiddenUnits)
	{
		FeatureExtractor featex = new FeatureExtractor();
		IntCounter<String> valMap = featex.extractFeatures(frame, intTokNums, unit, data, mWNR, mTrainOrTest, mWnRelationsCache,null,parse);	// last arg different from superclass method's call
		Set<String> features = valMap.keySet();
		LogFormula featSum = getFormulaObject(LogFormula.Op.PLUS);
		
		for (String feat : features)
		{
			double val = valMap.getT(feat);
			LogFormula prod = getFormulaObject(LogFormula.Op.TIMES);
			LogFormula featVal = getFormulaObject(LDouble.convertToLogDomain(val));
			prod.add_arg(featVal);
			LogFormula paramFormula = getLazyLookupParam(feat, mTrainOrTest);
			prod.add_arg(paramFormula);
			featSum.add_arg(prod);
			if(reporter!=null)
				reporter.setStatus("Found feature:"+feat);	// not in superclass method
		}
		LogFormula expFormula = getFormulaObject(LogFormula.Op.EXP);
		expFormula.add_arg(featSum);
		result.add_arg(expFormula);
	}
	return result;
}
 
源代码16 项目: hadoop   文件: DistCpV1.java
/**
 * Copies single file to the path specified by tmpfile.
 * @param srcstat  src path and metadata
 * @param tmpfile  temporary file to which copy is to be done
 * @param absdst   actual destination path to which copy is to be done
 * @param reporter
 * @return Number of bytes copied
 */
private long doCopyFile(FileStatus srcstat, Path tmpfile, Path absdst,
                        Reporter reporter) throws IOException {
  long bytesCopied = 0L;
  Path srcPath = srcstat.getPath();
  // open src file
  try (FSDataInputStream in = srcPath.getFileSystem(job).open(srcPath)) {
    reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen());
    // open tmp file
    try (FSDataOutputStream out = create(tmpfile, reporter, srcstat)) {
      LOG.info("Copying file " + srcPath + " of size " +
               srcstat.getLen() + " bytes...");
    
      // copy file
      for(int bytesRead; (bytesRead = in.read(buffer)) >= 0; ) {
        out.write(buffer, 0, bytesRead);
        bytesCopied += bytesRead;
        reporter.setStatus(
            String.format("%.2f ", bytesCopied*100.0/srcstat.getLen())
            + absdst + " [ " +
            TraditionalBinaryPrefix.long2String(bytesCopied, "", 1) + " / "
            + TraditionalBinaryPrefix.long2String(srcstat.getLen(), "", 1)
            + " ]");
      }
    }
  }
  return bytesCopied;
}
 
源代码17 项目: RDFS   文件: PiEstimator.java
/** Map method.
 * @param offset samples starting from the (offset+1)th sample.
 * @param size the number of samples for this map
 * @param out output {ture->numInside, false->numOutside}
 * @param reporter
 */
public void map(LongWritable offset,
                LongWritable size,
                OutputCollector<BooleanWritable, LongWritable> out,
                Reporter reporter) throws IOException {

  final HaltonSequence haltonsequence = new HaltonSequence(offset.get());
  long numInside = 0L;
  long numOutside = 0L;

  for(long i = 0; i < size.get(); ) {
    //generate points in a unit square
    final double[] point = haltonsequence.nextPoint();

    //count points inside/outside of the inscribed circle of the square
    final double x = point[0] - 0.5;
    final double y = point[1] - 0.5;
    if (x*x + y*y > 0.25) {
      numOutside++;
    } else {
      numInside++;
    }

    //report status
    i++;
    if (i % 1000 == 0) {
      reporter.setStatus("Generated " + i + " samples.");
    }
  }

  //output map results
  out.collect(new BooleanWritable(true), new LongWritable(numInside));
  out.collect(new BooleanWritable(false), new LongWritable(numOutside));
}
 
源代码18 项目: RDFS   文件: DistRaid.java
/** Run a FileOperation */
public void map(Text key, PolicyInfo policy,
    OutputCollector<WritableComparable, Text> out, Reporter reporter)
    throws IOException {
  this.reporter = reporter;
  try {
    Codec.initializeCodecs(jobconf);

    LOG.info("Raiding file=" + key.toString() + " policy=" + policy);
    Path p = new Path(key.toString());
    FileStatus fs = p.getFileSystem(jobconf).getFileStatus(p);
    st.clear();
    RaidNode.doRaid(jobconf, policy, fs, st, reporter);

    ++succeedcount;

    reporter.incrCounter(Counter.PROCESSED_BLOCKS, st.numProcessedBlocks);
    reporter.incrCounter(Counter.PROCESSED_SIZE, st.processedSize);
    reporter.incrCounter(Counter.META_BLOCKS, st.numMetaBlocks);
    reporter.incrCounter(Counter.META_SIZE, st.metaSize);
    reporter.incrCounter(Counter.SAVING_SIZE,
        st.processedSize - st.remainingSize - st.metaSize);
    reporter.incrCounter(Counter.FILES_SUCCEEDED, 1);
  } catch (IOException e) {
    ++failcount;
    reporter.incrCounter(Counter.FILES_FAILED, 1);

    String s = "FAIL: " + policy + ", " + key + " "
        + StringUtils.stringifyException(e);
    out.collect(null, new Text(s));
    LOG.info(s);
  } finally {
    reporter.setStatus(getCountString());
  }
}
 
源代码19 项目: RDFS   文件: TestFileSystem.java
public void map(WritableComparable key, LongWritable value,
                OutputCollector<K, LongWritable> collector,
                Reporter reporter)
  throws IOException {
  String name = key.toString();
  long size = value.get();
  long seed = Long.parseLong(name);

  if (size == 0) return;

  reporter.setStatus("opening " + name);

  FSDataInputStream in = fs.open(new Path(DATA_DIR, name));
    
  try {
    for (int i = 0; i < SEEKS_PER_FILE; i++) {
      // generate a random position
      long position = Math.abs(random.nextLong()) % size;
      
      // seek file to that position
      reporter.setStatus("seeking " + name);
      in.seek(position);
      byte b = in.readByte();
      
      // check that byte matches
      byte checkByte = 0;
      // advance random state to that position
      random.setSeed(seed);
      for (int p = 0; p <= position; p+= check.length) {
        reporter.setStatus("generating data for " + name);
        if (fastCheck) {
          checkByte = (byte)random.nextInt(Byte.MAX_VALUE);
        } else {
          random.nextBytes(check);
          checkByte = check[(int)(position % check.length)];
        }
      }
      assertEquals(b, checkByte);
    }
  } finally {
    in.close();
  }
}
 
源代码20 项目: hadoop   文件: SliveMapper.java
/**
 * Logs to the given reporter and logs to the internal logger at info level
 * 
 * @param r
 *          the reporter to set status on
 * @param msg
 *          the message to log
 */
private void logAndSetStatus(Reporter r, String msg) {
  r.setStatus(msg);
  LOG.info(msg);
}