org.apache.hadoop.io.LongWritable#get ( )源码实例Demo

下面列出了org.apache.hadoop.io.LongWritable#get ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hadoop   文件: CompressionEmulationUtil.java
/**
 * Emits random words sequence of desired size. Note that the desired output
 * size is passed as the value parameter to this map.
 */
@Override
public void map(NullWritable key, LongWritable value, Context context)
throws IOException, InterruptedException {
  //TODO Control the extra data written ..
  //TODO Should the key\tvalue\n be considered for measuring size?
  //     Can counters like BYTES_WRITTEN be used? What will be the value of
  //     such counters in LocalJobRunner?
  for (long bytes = value.get(); bytes > 0;) {
    String randomKey = rtg.getRandomWord();
    String randomValue = rtg.getRandomWord();
    context.write(new Text(randomKey), new Text(randomValue));
    bytes -= (randomValue.getBytes(charsetUTF8).length +
        randomKey.getBytes(charsetUTF8).length);
  }
}
 
源代码2 项目: pulsar   文件: FileStoreBackedReadHandleImpl.java
private FileStoreBackedReadHandleImpl(ExecutorService executor, MapFile.Reader reader, long ledgerId) throws IOException {
    this.ledgerId = ledgerId;
    this.executor = executor;
    this.reader = reader;
    LongWritable key = new LongWritable();
    BytesWritable value = new BytesWritable();
    try {
        key.set(FileSystemManagedLedgerOffloader.METADATA_KEY_INDEX);
        reader.get(key, value);
        this.ledgerMetadata = parseLedgerMetadata(value.copyBytes());
    } catch (IOException e) {
        log.error("Fail to read LedgerMetadata for ledgerId {}",
                ledgerId);
        throw new IOException("Fail to read LedgerMetadata for ledgerId " + key.get());
    }
}
 
源代码3 项目: aerospike-hadoop   文件: AggregateIntInput.java
public void reduce(LongWritable mod,
                   Iterable<LongWritable> values,
                   Context context)
    throws IOException, InterruptedException {

    long num = 0;   // number of elements
    long sum = 0;   // sum of elements
    long min = Long.MAX_VALUE;  // minimum element
    long max = Long.MIN_VALUE;  // maximum element

    for (LongWritable val : values) {
        long vv = val.get();
        num += 1;
        sum += vv;
        if (vv < min) min = vv;
        if (vv > max) max = vv;
    }

    String rec = String.format("%d %d %d %d", num, min, max, sum);

    context.write(mod, new Text(rec));
}
 
源代码4 项目: hadoop   文件: IOMapperBase.java
/**
 * Map file name and offset into statistical data.
 * <p>
 * The map task is to get the 
 * <tt>key</tt>, which contains the file name, and the 
 * <tt>value</tt>, which is the offset within the file.
 * 
 * The parameters are passed to the abstract method 
 * {@link #doIO(Reporter,String,long)}, which performs the io operation, 
 * usually read or write data, and then 
 * {@link #collectStats(OutputCollector,String,long,Object)} 
 * is called to prepare stat data for a subsequent reducer.
 */
public void map(Text key, 
                LongWritable value,
                OutputCollector<Text, Text> output, 
                Reporter reporter) throws IOException {
  String name = key.toString();
  long longValue = value.get();
  
  reporter.setStatus("starting " + name + " ::host = " + hostName);

  this.stream = getIOStream(name);
  T statValue = null;
  long tStart = System.currentTimeMillis();
  try {
    statValue = doIO(reporter, name, longValue);
  } finally {
    if(stream != null) stream.close();
  }
  long tEnd = System.currentTimeMillis();
  long execTime = tEnd - tStart;
  collectStats(output, name, execTime, statValue);
  
  reporter.setStatus("finished " + name + " ::host = " + hostName);
}
 
源代码5 项目: incubator-hivemall   文件: SessionizeUDF.java
@Nullable
public Text evaluate(@Nullable LongWritable time, @Nullable LongWritable threshold) {
    if (time == null || threshold == null) {
        return null;
    }

    final long thisTime = time.get();
    final long diff = thisTime - lastTime;
    if (diff < threshold.get()) {
        this.lastTime = thisTime;
        return sessionId;
    }

    sessionId.set(UUID.randomUUID().toString());
    this.lastTime = time.get();
    return sessionId;
}
 
private void loadGenerations() throws IOException {
  FileSystem fileSystem = _path.getFileSystem(_configuration);
  FileStatus[] listStatus = fileSystem.listStatus(_path);
  SortedSet<FileStatus> existing = new TreeSet<FileStatus>(Arrays.asList(listStatus));
  if (existing.isEmpty()) {
    return;
  }
  FileStatus last = existing.last();
  Reader reader = new SequenceFile.Reader(fileSystem, last.getPath(), _configuration);
  Text key = new Text();
  LongWritable value = new LongWritable();
  while (reader.next(key, value)) {
    String name = key.toString();
    long gen = value.get();
    _namesToGenerations.put(name, gen);
    Set<String> names = _generationsToNames.get(gen);
    if (names == null) {
      names = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
      _generationsToNames.put(gen, names);
    }
    names.add(name);
  }
  reader.close();
  existing.remove(last);
  cleanupOldFiles(fileSystem, existing);
}
 
源代码7 项目: rheem   文件: PageRankAlgorithm.java
@Override
public Vertex<LongWritable, DoubleWritable, FloatWritable>
getCurrentVertex() throws IOException {
    Vertex<LongWritable, DoubleWritable, FloatWritable> vertex =
            getConf().createVertex();
    LongWritable vertexId = new LongWritable(
            (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
    DoubleWritable vertexValue = new DoubleWritable(vertexId.get() * 10d);
    long targetVertexId =
            (vertexId.get() + 1) %
                    (inputSplit.getNumSplits() * totalRecords);
    float edgeValue = vertexId.get() * 100f;
    List<Edge<LongWritable, FloatWritable>> edges = Lists.newLinkedList();
    edges.add(EdgeFactory.create(new LongWritable(targetVertexId),
            new FloatWritable(edgeValue)));
    vertex.initialize(vertexId, vertexValue, edges);
    ++recordsRead;
    if (LOG.isInfoEnabled()) {
        LOG.info("next: Return vertexId=" + vertex.getId().get() +
                ", vertexValue=" + vertex.getValue() +
                ", targetVertexId=" + targetVertexId + ", edgeValue=" + edgeValue);
    }
    return vertex;
}
 
源代码8 项目: RDFS   文件: TeraGen.java
public void map(LongWritable row, NullWritable ignored,
                OutputCollector<Text, Text> output,
                Reporter reporter) throws IOException {
  long rowId = row.get();
  if (rand == null) {
    // we use 3 random numbers per a row
    rand = new RandomGenerator(rowId*3);
  }
  addKey();
  value.clear();
  addRowId(rowId);
  addFiller(rowId);
  output.collect(key, value);
}
 
源代码9 项目: hadoop-gpu   文件: LongSumReducer.java
public void reduce(KEY key, Iterable<LongWritable> values,
                   Context context) throws IOException, InterruptedException {
  long sum = 0;
  for (LongWritable val : values) {
    sum += val.get();
  }
  result.set(sum);
  context.write(key, result);
}
 
源代码10 项目: rya   文件: AccumuloRdfCountTool.java
@Override
protected void reduce(final Text key, final Iterable<LongWritable> values, final Context context) throws IOException, InterruptedException {
    long count = 0;
    for (final LongWritable lw : values) {
        count += lw.get();
    }

    if (count <= TOO_LOW) {
        return;
    }

    valOut.set(count);
    context.write(key, valOut);
}
 
源代码11 项目: big-c   文件: GenerateData.java
@Override
public void map(NullWritable key, LongWritable value, Context context)
    throws IOException, InterruptedException {
  for (long bytes = value.get(); bytes > 0; bytes -= val.getLength()) {
    r.nextBytes(val.getBytes());
    val.setSize((int)Math.min(val.getLength(), bytes));
    context.write(key, val);
  }
}
 
源代码12 项目: big-c   文件: SleepJob.java
@Override
public void map(LongWritable key, LongWritable value, Context context)
throws IOException, InterruptedException {
  context.setStatus("Sleeping... " + value.get() + " ms left");
  long now = System.currentTimeMillis();
  if (now < key.get()) {
    TimeUnit.MILLISECONDS.sleep(key.get() - now);
  }
}
 
源代码13 项目: hadoop   文件: TestFileSystem.java
public void map(Text key, LongWritable value,
                OutputCollector<Text, LongWritable> collector,
                Reporter reporter)
  throws IOException {
  
  String name = key.toString();
  long size = value.get();
  long seed = Long.parseLong(name);

  random.setSeed(seed);
  reporter.setStatus("creating " + name);

  // write to temp file initially to permit parallel execution
  Path tempFile = new Path(DATA_DIR, name+suffix);
  OutputStream out = fs.create(tempFile);

  long written = 0;
  try {
    while (written < size) {
      if (fastCheck) {
        Arrays.fill(buffer, (byte)random.nextInt(Byte.MAX_VALUE));
      } else {
        random.nextBytes(buffer);
      }
      long remains = size - written;
      int length = (remains<=buffer.length) ? (int)remains : buffer.length;
      out.write(buffer, 0, length);
      written += length;
      reporter.setStatus("writing "+name+"@"+written+"/"+size);
    }
  } finally {
    out.close();
  }
  // rename to final location
  fs.rename(tempFile, new Path(DATA_DIR, name));

  collector.collect(new Text("bytes"), new LongWritable(written));

  reporter.setStatus("wrote " + name);
}
 
源代码14 项目: Kylin   文件: RangeKeyDistributionReducer.java
@Override
public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
    for (LongWritable v : values) {
        bytesRead += v.get();
    }
    
    if (bytesRead >= ONE_GIGA_BYTES) {
        gbPoints.add(new Text(key));
        bytesRead = 0; // reset bytesRead
    }
}
 
源代码15 项目: big-c   文件: LongSumReducer.java
public void reduce(KEY key, Iterable<LongWritable> values,
                   Context context) throws IOException, InterruptedException {
  long sum = 0;
  for (LongWritable val : values) {
    sum += val.get();
  }
  result.set(sum);
  context.write(key, result);
}
 
源代码16 项目: ApprovalTests.Java   文件: HadoopApprovalsTest.java
public void reduce(Text token, Iterator<LongWritable> counts, OutputCollector<Text, LongWritable> context,
    Reporter arg3) throws IOException
{
  long n = 0;
  for (LongWritable count : ArrayUtils.asIterable(counts))
  {
    n += count.get();
  }
  context.collect(token, new LongWritable(n));
}
 
源代码17 项目: Halyard   文件: HalyardPreSplit.java
@Override
public void reduce(ImmutableBytesWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
           byte region = key.get()[key.getOffset()];
           if (lastRegion != region || size > splitLimit) {
               byte[] split = lastRegion != region ? new byte[]{region} : key.copyBytes();
               splits.add(split);
               context.setStatus("#" + splits.size() + " " + Arrays.toString(split));
               lastRegion = key.get()[key.getOffset()];
               size = 0;
           }
           for (LongWritable val : values) {
                   size += val.get();
           }
}
 
源代码18 项目: hadoop   文件: DistributedCacheEmulator.java
/**
 * Write the list of distributed cache files in the decreasing order of
 * file sizes into the sequence file. This file will be input to the job
 * {@link GenerateDistCacheData}.
 * Also validates if -generate option is missing and distributed cache files
 * are missing.
 * @return exit code
 * @throws IOException
 */
private int writeDistCacheFilesList()
    throws IOException {
  // Sort the distributed cache files in the decreasing order of file sizes.
  List dcFiles = new ArrayList(distCacheFiles.entrySet());
  Collections.sort(dcFiles, new Comparator() {
    public int compare(Object dc1, Object dc2) {
      return ((Comparable) ((Map.Entry) (dc2)).getValue())
          .compareTo(((Map.Entry) (dc1)).getValue());
    }
  });

  // write the sorted distributed cache files to the sequence file
  FileSystem fs = FileSystem.get(conf);
  Path distCacheFilesList = new Path(distCachePath, "_distCacheFiles.txt");
  conf.set(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_LIST,
      distCacheFilesList.toString());
  SequenceFile.Writer src_writer = SequenceFile.createWriter(fs, conf,
      distCacheFilesList, LongWritable.class, BytesWritable.class,
      SequenceFile.CompressionType.NONE);

  // Total number of unique distributed cache files
  int fileCount = dcFiles.size();
  long byteCount = 0;// Total size of all distributed cache files
  long bytesSync = 0;// Bytes after previous sync;used to add sync marker

  for (Iterator it = dcFiles.iterator(); it.hasNext();) {
    Map.Entry entry = (Map.Entry)it.next();
    LongWritable fileSize =
        new LongWritable(Long.parseLong(entry.getValue().toString()));
    BytesWritable filePath =
        new BytesWritable(
        entry.getKey().toString().getBytes(charsetUTF8));

    byteCount += fileSize.get();
    bytesSync += fileSize.get();
    if (bytesSync > AVG_BYTES_PER_MAP) {
      src_writer.sync();
      bytesSync = fileSize.get();
    }
    src_writer.append(fileSize, filePath);
  }
  if (src_writer != null) {
    src_writer.close();
  }
  // Set delete on exit for 'dist cache files list' as it is not needed later.
  fs.deleteOnExit(distCacheFilesList);

  conf.setInt(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_COUNT, fileCount);
  conf.setLong(GenerateDistCacheData.GRIDMIX_DISTCACHE_BYTE_COUNT, byteCount);
  LOG.info("Number of HDFS based distributed cache files to be generated is "
      + fileCount + ". Total size of HDFS based distributed cache files "
      + "to be generated is " + byteCount);

  if (!shouldGenerateDistCacheData() && fileCount > 0) {
    LOG.error("Missing " + fileCount + " distributed cache files under the "
        + " directory\n" + distCachePath + "\nthat are needed for gridmix"
        + " to emulate distributed cache load. Either use -generate\noption"
        + " to generate distributed cache data along with input data OR "
        + "disable\ndistributed cache emulation by configuring '"
        + DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE
        + "' to false.");
    return Gridmix.MISSING_DIST_CACHE_FILES_ERROR;
  }
  return 0;
}
 
源代码19 项目: MLHadoop   文件: total_records_reducer.java
public void reduce(LongWritable key, Iterable<LongWritable> values, Context context)
		throws IOException, InterruptedException {
	for(LongWritable val:values){
		this.countRows += val.get();
	}
}
 
源代码20 项目: hadoop-gpu   文件: TestFileSystem.java
public void map(WritableComparable key, LongWritable value,
                OutputCollector<K, LongWritable> collector,
                Reporter reporter)
  throws IOException {
  String name = key.toString();
  long size = value.get();
  long seed = Long.parseLong(name);

  if (size == 0) return;

  reporter.setStatus("opening " + name);

  FSDataInputStream in = fs.open(new Path(DATA_DIR, name));
    
  try {
    for (int i = 0; i < SEEKS_PER_FILE; i++) {
      // generate a random position
      long position = Math.abs(random.nextLong()) % size;
      
      // seek file to that position
      reporter.setStatus("seeking " + name);
      in.seek(position);
      byte b = in.readByte();
      
      // check that byte matches
      byte checkByte = 0;
      // advance random state to that position
      random.setSeed(seed);
      for (int p = 0; p <= position; p+= check.length) {
        reporter.setStatus("generating data for " + name);
        if (fastCheck) {
          checkByte = (byte)random.nextInt(Byte.MAX_VALUE);
        } else {
          random.nextBytes(check);
          checkByte = check[(int)(position % check.length)];
        }
      }
      assertEquals(b, checkByte);
    }
  } finally {
    in.close();
  }
}