下面列出了org.apache.hadoop.io.LongWritable#get ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Emits random words sequence of desired size. Note that the desired output
* size is passed as the value parameter to this map.
*/
@Override
public void map(NullWritable key, LongWritable value, Context context)
throws IOException, InterruptedException {
//TODO Control the extra data written ..
//TODO Should the key\tvalue\n be considered for measuring size?
// Can counters like BYTES_WRITTEN be used? What will be the value of
// such counters in LocalJobRunner?
for (long bytes = value.get(); bytes > 0;) {
String randomKey = rtg.getRandomWord();
String randomValue = rtg.getRandomWord();
context.write(new Text(randomKey), new Text(randomValue));
bytes -= (randomValue.getBytes(charsetUTF8).length +
randomKey.getBytes(charsetUTF8).length);
}
}
private FileStoreBackedReadHandleImpl(ExecutorService executor, MapFile.Reader reader, long ledgerId) throws IOException {
this.ledgerId = ledgerId;
this.executor = executor;
this.reader = reader;
LongWritable key = new LongWritable();
BytesWritable value = new BytesWritable();
try {
key.set(FileSystemManagedLedgerOffloader.METADATA_KEY_INDEX);
reader.get(key, value);
this.ledgerMetadata = parseLedgerMetadata(value.copyBytes());
} catch (IOException e) {
log.error("Fail to read LedgerMetadata for ledgerId {}",
ledgerId);
throw new IOException("Fail to read LedgerMetadata for ledgerId " + key.get());
}
}
public void reduce(LongWritable mod,
Iterable<LongWritable> values,
Context context)
throws IOException, InterruptedException {
long num = 0; // number of elements
long sum = 0; // sum of elements
long min = Long.MAX_VALUE; // minimum element
long max = Long.MIN_VALUE; // maximum element
for (LongWritable val : values) {
long vv = val.get();
num += 1;
sum += vv;
if (vv < min) min = vv;
if (vv > max) max = vv;
}
String rec = String.format("%d %d %d %d", num, min, max, sum);
context.write(mod, new Text(rec));
}
/**
* Map file name and offset into statistical data.
* <p>
* The map task is to get the
* <tt>key</tt>, which contains the file name, and the
* <tt>value</tt>, which is the offset within the file.
*
* The parameters are passed to the abstract method
* {@link #doIO(Reporter,String,long)}, which performs the io operation,
* usually read or write data, and then
* {@link #collectStats(OutputCollector,String,long,Object)}
* is called to prepare stat data for a subsequent reducer.
*/
public void map(Text key,
LongWritable value,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
String name = key.toString();
long longValue = value.get();
reporter.setStatus("starting " + name + " ::host = " + hostName);
this.stream = getIOStream(name);
T statValue = null;
long tStart = System.currentTimeMillis();
try {
statValue = doIO(reporter, name, longValue);
} finally {
if(stream != null) stream.close();
}
long tEnd = System.currentTimeMillis();
long execTime = tEnd - tStart;
collectStats(output, name, execTime, statValue);
reporter.setStatus("finished " + name + " ::host = " + hostName);
}
@Nullable
public Text evaluate(@Nullable LongWritable time, @Nullable LongWritable threshold) {
if (time == null || threshold == null) {
return null;
}
final long thisTime = time.get();
final long diff = thisTime - lastTime;
if (diff < threshold.get()) {
this.lastTime = thisTime;
return sessionId;
}
sessionId.set(UUID.randomUUID().toString());
this.lastTime = time.get();
return sessionId;
}
private void loadGenerations() throws IOException {
FileSystem fileSystem = _path.getFileSystem(_configuration);
FileStatus[] listStatus = fileSystem.listStatus(_path);
SortedSet<FileStatus> existing = new TreeSet<FileStatus>(Arrays.asList(listStatus));
if (existing.isEmpty()) {
return;
}
FileStatus last = existing.last();
Reader reader = new SequenceFile.Reader(fileSystem, last.getPath(), _configuration);
Text key = new Text();
LongWritable value = new LongWritable();
while (reader.next(key, value)) {
String name = key.toString();
long gen = value.get();
_namesToGenerations.put(name, gen);
Set<String> names = _generationsToNames.get(gen);
if (names == null) {
names = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
_generationsToNames.put(gen, names);
}
names.add(name);
}
reader.close();
existing.remove(last);
cleanupOldFiles(fileSystem, existing);
}
@Override
public Vertex<LongWritable, DoubleWritable, FloatWritable>
getCurrentVertex() throws IOException {
Vertex<LongWritable, DoubleWritable, FloatWritable> vertex =
getConf().createVertex();
LongWritable vertexId = new LongWritable(
(inputSplit.getSplitIndex() * totalRecords) + recordsRead);
DoubleWritable vertexValue = new DoubleWritable(vertexId.get() * 10d);
long targetVertexId =
(vertexId.get() + 1) %
(inputSplit.getNumSplits() * totalRecords);
float edgeValue = vertexId.get() * 100f;
List<Edge<LongWritable, FloatWritable>> edges = Lists.newLinkedList();
edges.add(EdgeFactory.create(new LongWritable(targetVertexId),
new FloatWritable(edgeValue)));
vertex.initialize(vertexId, vertexValue, edges);
++recordsRead;
if (LOG.isInfoEnabled()) {
LOG.info("next: Return vertexId=" + vertex.getId().get() +
", vertexValue=" + vertex.getValue() +
", targetVertexId=" + targetVertexId + ", edgeValue=" + edgeValue);
}
return vertex;
}
public void map(LongWritable row, NullWritable ignored,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
long rowId = row.get();
if (rand == null) {
// we use 3 random numbers per a row
rand = new RandomGenerator(rowId*3);
}
addKey();
value.clear();
addRowId(rowId);
addFiller(rowId);
output.collect(key, value);
}
public void reduce(KEY key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
long sum = 0;
for (LongWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
@Override
protected void reduce(final Text key, final Iterable<LongWritable> values, final Context context) throws IOException, InterruptedException {
long count = 0;
for (final LongWritable lw : values) {
count += lw.get();
}
if (count <= TOO_LOW) {
return;
}
valOut.set(count);
context.write(key, valOut);
}
@Override
public void map(NullWritable key, LongWritable value, Context context)
throws IOException, InterruptedException {
for (long bytes = value.get(); bytes > 0; bytes -= val.getLength()) {
r.nextBytes(val.getBytes());
val.setSize((int)Math.min(val.getLength(), bytes));
context.write(key, val);
}
}
@Override
public void map(LongWritable key, LongWritable value, Context context)
throws IOException, InterruptedException {
context.setStatus("Sleeping... " + value.get() + " ms left");
long now = System.currentTimeMillis();
if (now < key.get()) {
TimeUnit.MILLISECONDS.sleep(key.get() - now);
}
}
public void map(Text key, LongWritable value,
OutputCollector<Text, LongWritable> collector,
Reporter reporter)
throws IOException {
String name = key.toString();
long size = value.get();
long seed = Long.parseLong(name);
random.setSeed(seed);
reporter.setStatus("creating " + name);
// write to temp file initially to permit parallel execution
Path tempFile = new Path(DATA_DIR, name+suffix);
OutputStream out = fs.create(tempFile);
long written = 0;
try {
while (written < size) {
if (fastCheck) {
Arrays.fill(buffer, (byte)random.nextInt(Byte.MAX_VALUE));
} else {
random.nextBytes(buffer);
}
long remains = size - written;
int length = (remains<=buffer.length) ? (int)remains : buffer.length;
out.write(buffer, 0, length);
written += length;
reporter.setStatus("writing "+name+"@"+written+"/"+size);
}
} finally {
out.close();
}
// rename to final location
fs.rename(tempFile, new Path(DATA_DIR, name));
collector.collect(new Text("bytes"), new LongWritable(written));
reporter.setStatus("wrote " + name);
}
@Override
public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
for (LongWritable v : values) {
bytesRead += v.get();
}
if (bytesRead >= ONE_GIGA_BYTES) {
gbPoints.add(new Text(key));
bytesRead = 0; // reset bytesRead
}
}
public void reduce(KEY key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
long sum = 0;
for (LongWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
public void reduce(Text token, Iterator<LongWritable> counts, OutputCollector<Text, LongWritable> context,
Reporter arg3) throws IOException
{
long n = 0;
for (LongWritable count : ArrayUtils.asIterable(counts))
{
n += count.get();
}
context.collect(token, new LongWritable(n));
}
@Override
public void reduce(ImmutableBytesWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
byte region = key.get()[key.getOffset()];
if (lastRegion != region || size > splitLimit) {
byte[] split = lastRegion != region ? new byte[]{region} : key.copyBytes();
splits.add(split);
context.setStatus("#" + splits.size() + " " + Arrays.toString(split));
lastRegion = key.get()[key.getOffset()];
size = 0;
}
for (LongWritable val : values) {
size += val.get();
}
}
/**
* Write the list of distributed cache files in the decreasing order of
* file sizes into the sequence file. This file will be input to the job
* {@link GenerateDistCacheData}.
* Also validates if -generate option is missing and distributed cache files
* are missing.
* @return exit code
* @throws IOException
*/
private int writeDistCacheFilesList()
throws IOException {
// Sort the distributed cache files in the decreasing order of file sizes.
List dcFiles = new ArrayList(distCacheFiles.entrySet());
Collections.sort(dcFiles, new Comparator() {
public int compare(Object dc1, Object dc2) {
return ((Comparable) ((Map.Entry) (dc2)).getValue())
.compareTo(((Map.Entry) (dc1)).getValue());
}
});
// write the sorted distributed cache files to the sequence file
FileSystem fs = FileSystem.get(conf);
Path distCacheFilesList = new Path(distCachePath, "_distCacheFiles.txt");
conf.set(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_LIST,
distCacheFilesList.toString());
SequenceFile.Writer src_writer = SequenceFile.createWriter(fs, conf,
distCacheFilesList, LongWritable.class, BytesWritable.class,
SequenceFile.CompressionType.NONE);
// Total number of unique distributed cache files
int fileCount = dcFiles.size();
long byteCount = 0;// Total size of all distributed cache files
long bytesSync = 0;// Bytes after previous sync;used to add sync marker
for (Iterator it = dcFiles.iterator(); it.hasNext();) {
Map.Entry entry = (Map.Entry)it.next();
LongWritable fileSize =
new LongWritable(Long.parseLong(entry.getValue().toString()));
BytesWritable filePath =
new BytesWritable(
entry.getKey().toString().getBytes(charsetUTF8));
byteCount += fileSize.get();
bytesSync += fileSize.get();
if (bytesSync > AVG_BYTES_PER_MAP) {
src_writer.sync();
bytesSync = fileSize.get();
}
src_writer.append(fileSize, filePath);
}
if (src_writer != null) {
src_writer.close();
}
// Set delete on exit for 'dist cache files list' as it is not needed later.
fs.deleteOnExit(distCacheFilesList);
conf.setInt(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_COUNT, fileCount);
conf.setLong(GenerateDistCacheData.GRIDMIX_DISTCACHE_BYTE_COUNT, byteCount);
LOG.info("Number of HDFS based distributed cache files to be generated is "
+ fileCount + ". Total size of HDFS based distributed cache files "
+ "to be generated is " + byteCount);
if (!shouldGenerateDistCacheData() && fileCount > 0) {
LOG.error("Missing " + fileCount + " distributed cache files under the "
+ " directory\n" + distCachePath + "\nthat are needed for gridmix"
+ " to emulate distributed cache load. Either use -generate\noption"
+ " to generate distributed cache data along with input data OR "
+ "disable\ndistributed cache emulation by configuring '"
+ DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE
+ "' to false.");
return Gridmix.MISSING_DIST_CACHE_FILES_ERROR;
}
return 0;
}
public void reduce(LongWritable key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
for(LongWritable val:values){
this.countRows += val.get();
}
}
public void map(WritableComparable key, LongWritable value,
OutputCollector<K, LongWritable> collector,
Reporter reporter)
throws IOException {
String name = key.toString();
long size = value.get();
long seed = Long.parseLong(name);
if (size == 0) return;
reporter.setStatus("opening " + name);
FSDataInputStream in = fs.open(new Path(DATA_DIR, name));
try {
for (int i = 0; i < SEEKS_PER_FILE; i++) {
// generate a random position
long position = Math.abs(random.nextLong()) % size;
// seek file to that position
reporter.setStatus("seeking " + name);
in.seek(position);
byte b = in.readByte();
// check that byte matches
byte checkByte = 0;
// advance random state to that position
random.setSeed(seed);
for (int p = 0; p <= position; p+= check.length) {
reporter.setStatus("generating data for " + name);
if (fastCheck) {
checkByte = (byte)random.nextInt(Byte.MAX_VALUE);
} else {
random.nextBytes(check);
checkByte = check[(int)(position % check.length)];
}
}
assertEquals(b, checkByte);
}
} finally {
in.close();
}
}