下面列出了怎么用org.apache.hadoop.mapreduce.lib.input.CombineFileSplit的API类实例代码及写法,或者点击链接到github查看源代码。
public CombineFileLineRecordReader(CombineFileSplit split,
TaskAttemptContext context, Integer index) throws IOException {
this.path = split.getPath(index);
fs = this.path.getFileSystem(context.getConfiguration());
this.startOffset = split.getOffset(index);
this.end = startOffset + split.getLength(index);
boolean skipFirstLine = false;
//open the file
fileIn = fs.open(path);
if (startOffset != 0) {
skipFirstLine = true;
--startOffset;
fileIn.seek(startOffset);
}
reader = new LineReader(fileIn);
if (skipFirstLine) { // skip first line and re-establish "startOffset".
startOffset += reader.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end - startOffset));
}
this.pos = startOffset;
}
public GridmixSplit(CombineFileSplit cfsplit, int maps, int id,
long inputBytes, long inputRecords, long outputBytes,
long outputRecords, double[] reduceBytes, double[] reduceRecords,
long[] reduceOutputBytes, long[] reduceOutputRecords)
throws IOException {
super(cfsplit);
this.id = id;
this.maps = maps;
reduces = reduceBytes.length;
this.inputRecords = inputRecords;
this.outputBytes = outputBytes;
this.outputRecords = outputRecords;
this.reduceBytes = reduceBytes;
this.reduceRecords = reduceRecords;
nSpec = reduceOutputBytes.length;
this.reduceOutputBytes = reduceOutputBytes;
this.reduceOutputRecords = reduceOutputRecords;
}
public LoadSplit(CombineFileSplit cfsplit, int maps, int id, long inputBytes,
long inputRecords, long outputBytes, long outputRecords,
double[] reduceBytes, double[] reduceRecords,
long[] reduceOutputBytes, long[] reduceOutputRecords,
ResourceUsageMetrics metrics,
ResourceUsageMetrics[] rMetrics)
throws IOException {
super(cfsplit);
this.id = id;
this.maps = maps;
reduces = reduceBytes.length;
this.inputRecords = inputRecords;
this.outputBytes = outputBytes;
this.outputRecords = outputRecords;
this.reduceBytes = reduceBytes;
this.reduceRecords = reduceRecords;
nSpec = reduceOutputBytes.length;
this.reduceOutputBytes = reduceOutputBytes;
this.reduceOutputRecords = reduceOutputRecords;
this.mapMetrics = metrics;
this.reduceMetrics = rMetrics;
}
@Test
public void testRepeat() throws Exception {
final Configuration conf = new Configuration();
Arrays.fill(loc, "");
Arrays.fill(start, 0L);
Arrays.fill(len, BLOCK);
final ByteArrayOutputStream out = fillVerif();
final FileQueue q =
new FileQueue(new CombineFileSplit(paths, start, len, loc), conf);
final byte[] verif = out.toByteArray();
final byte[] check = new byte[2 * NFILES * BLOCK];
q.read(check, 0, NFILES * BLOCK);
assertArrayEquals(verif, Arrays.copyOf(check, NFILES * BLOCK));
final byte[] verif2 = new byte[2 * NFILES * BLOCK];
System.arraycopy(verif, 0, verif2, 0, verif.length);
System.arraycopy(verif, 0, verif2, verif.length, verif.length);
q.read(check, 0, 2 * NFILES * BLOCK);
assertArrayEquals(verif2, check);
}
private LoadSplit getLoadSplit() throws Exception {
Path[] files = {new Path("one"), new Path("two")};
long[] start = {1, 2};
long[] lengths = {100, 200};
String[] locations = {"locOne", "loctwo"};
CombineFileSplit cfSplit = new CombineFileSplit(files, start, lengths,
locations);
ResourceUsageMetrics metrics = new ResourceUsageMetrics();
metrics.setCumulativeCpuUsage(200);
ResourceUsageMetrics[] rMetrics = {metrics};
double[] reduceBytes = {8.1d, 8.2d};
double[] reduceRecords = {9.1d, 9.2d};
long[] reduceOutputBytes = {101L, 102L};
long[] reduceOutputRecords = {111L, 112L};
return new LoadSplit(cfSplit, 2, 1, 4L, 5L, 6L, 7L,
reduceBytes, reduceRecords, reduceOutputBytes, reduceOutputRecords,
metrics, rMetrics);
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException {
Configuration conf = context.getConfiguration();
CombineFileSplit cSplit = (CombineFileSplit) split;
Path[] path = cSplit.getPaths();
long[] start = cSplit.getStartOffsets();
long[] len = cSplit.getLengths();
FileSystem fs = cSplit.getPath(0).getFileSystem(conf);
long startTS = conf.getLong(RowInputFormat.START_TIME_MILLIS, 0l);
long endTS = conf.getLong(RowInputFormat.END_TIME_MILLIS, 0l);
this.splitIterator = HDFSSplitIterator.newInstance(fs, path, start, len, startTS, endTS);
instantiateGfxdLoner(conf);
}
public HDFSSplitIterator(FileSystem fs, Path[] paths, long[] offsets, long[] lengths, long startTime, long endTime) throws IOException {
this.fs = fs;
this.split = new CombineFileSplit(paths, offsets, lengths, null);
while(currentHopIndex < split.getNumPaths() && !fs.exists(split.getPath(currentHopIndex))){
logger.warning(LocalizedStrings.HOPLOG_CLEANED_UP_BY_JANITOR, split.getPath(currentHopIndex));
currentHopIndex++;
}
if(currentHopIndex == split.getNumPaths()){
this.hoplog = null;
iterator = null;
} else {
this.hoplog = getHoplog(fs,split.getPath(currentHopIndex));
iterator = hoplog.getReader().scan(split.getOffset(currentHopIndex), split.getLength(currentHopIndex));
}
this.startTime = startTime;
this.endTime = endTime;
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
if (delegate != null) {
delegate.close();
}
if (split instanceof CombineFileSplit) {
CombineFileSplit combineSplit = (CombineFileSplit) split;
FileSplit fileSplit = new FileSplit(combineSplit.getPath(idx), combineSplit.getOffset(idx),
combineSplit.getLength(idx), combineSplit.getLocations());
delegate = getInputFormat().createRecordReader(fileSplit, context);
delegate.initialize(fileSplit, context);
} else {
throw new DatasetOperationException(
"Split is not a CombineFileSplit: %s:%s",
split.getClass().getCanonicalName(), split);
}
}
public CombineFileLineRecordReader(CombineFileSplit split,
TaskAttemptContext context, Integer index) throws IOException {
this.path = split.getPath(index);
fs = this.path.getFileSystem(context.getConfiguration());
this.startOffset = split.getOffset(index);
this.end = startOffset + split.getLength(index);
boolean skipFirstLine = false;
//open the file
fileIn = fs.open(path);
if (startOffset != 0) {
skipFirstLine = true;
--startOffset;
fileIn.seek(startOffset);
}
reader = new LineReader(fileIn);
if (skipFirstLine) { // skip first line and re-establish "startOffset".
startOffset += reader.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end - startOffset));
}
this.pos = startOffset;
}
public GridmixSplit(CombineFileSplit cfsplit, int maps, int id,
long inputBytes, long inputRecords, long outputBytes,
long outputRecords, double[] reduceBytes, double[] reduceRecords,
long[] reduceOutputBytes, long[] reduceOutputRecords)
throws IOException {
super(cfsplit);
this.id = id;
this.maps = maps;
reduces = reduceBytes.length;
this.inputRecords = inputRecords;
this.outputBytes = outputBytes;
this.outputRecords = outputRecords;
this.reduceBytes = reduceBytes;
this.reduceRecords = reduceRecords;
nSpec = reduceOutputBytes.length;
this.reduceOutputBytes = reduceOutputBytes;
this.reduceOutputRecords = reduceOutputRecords;
}
public LoadSplit(CombineFileSplit cfsplit, int maps, int id, long inputBytes,
long inputRecords, long outputBytes, long outputRecords,
double[] reduceBytes, double[] reduceRecords,
long[] reduceOutputBytes, long[] reduceOutputRecords,
ResourceUsageMetrics metrics,
ResourceUsageMetrics[] rMetrics)
throws IOException {
super(cfsplit);
this.id = id;
this.maps = maps;
reduces = reduceBytes.length;
this.inputRecords = inputRecords;
this.outputBytes = outputBytes;
this.outputRecords = outputRecords;
this.reduceBytes = reduceBytes;
this.reduceRecords = reduceRecords;
nSpec = reduceOutputBytes.length;
this.reduceOutputBytes = reduceOutputBytes;
this.reduceOutputRecords = reduceOutputRecords;
this.mapMetrics = metrics;
this.reduceMetrics = rMetrics;
}
@Test
public void testRepeat() throws Exception {
final Configuration conf = new Configuration();
Arrays.fill(loc, "");
Arrays.fill(start, 0L);
Arrays.fill(len, BLOCK);
final ByteArrayOutputStream out = fillVerif();
final FileQueue q =
new FileQueue(new CombineFileSplit(paths, start, len, loc), conf);
final byte[] verif = out.toByteArray();
final byte[] check = new byte[2 * NFILES * BLOCK];
q.read(check, 0, NFILES * BLOCK);
assertArrayEquals(verif, Arrays.copyOf(check, NFILES * BLOCK));
final byte[] verif2 = new byte[2 * NFILES * BLOCK];
System.arraycopy(verif, 0, verif2, 0, verif.length);
System.arraycopy(verif, 0, verif2, verif.length, verif.length);
q.read(check, 0, 2 * NFILES * BLOCK);
assertArrayEquals(verif2, check);
}
@Test
public void testUneven() throws Exception {
final Configuration conf = new Configuration();
Arrays.fill(loc, "");
Arrays.fill(start, 0L);
Arrays.fill(len, BLOCK);
final int B2 = BLOCK / 2;
for (int i = 0; i < NFILES; i += 2) {
start[i] += B2;
len[i] -= B2;
}
final FileQueue q =
new FileQueue(new CombineFileSplit(paths, start, len, loc), conf);
final ByteArrayOutputStream out = fillVerif();
final byte[] verif = out.toByteArray();
final byte[] check = new byte[NFILES / 2 * BLOCK + NFILES / 2 * B2];
q.read(check, 0, verif.length);
assertArrayEquals(verif, Arrays.copyOf(check, verif.length));
q.read(check, 0, verif.length);
assertArrayEquals(verif, Arrays.copyOf(check, verif.length));
}
private LoadSplit getLoadSplit() throws Exception {
Path[] files = {new Path("one"), new Path("two")};
long[] start = {1, 2};
long[] lengths = {100, 200};
String[] locations = {"locOne", "loctwo"};
CombineFileSplit cfSplit = new CombineFileSplit(files, start, lengths,
locations);
ResourceUsageMetrics metrics = new ResourceUsageMetrics();
metrics.setCumulativeCpuUsage(200);
ResourceUsageMetrics[] rMetrics = {metrics};
double[] reduceBytes = {8.1d, 8.2d};
double[] reduceRecords = {9.1d, 9.2d};
long[] reduceOutputBytes = {101L, 102L};
long[] reduceOutputRecords = {111L, 112L};
return new LoadSplit(cfSplit, 2, 1, 4L, 5L, 6L, 7L,
reduceBytes, reduceRecords, reduceOutputBytes, reduceOutputRecords,
metrics, rMetrics);
}
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
List<InputSplit> splits = super.getSplits(job);
long maxSize = MAX_SINGLE_FILE_MULTIPLIER * job.getConfiguration().getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
if (maxSize > 0) {
List<InputSplit> newSplits = new ArrayList<>();
for (InputSplit spl : splits) {
CombineFileSplit cfs = (CombineFileSplit)spl;
for (int i=0; i<cfs.getNumPaths(); i++) {
long length = cfs.getLength();
if (length > maxSize) {
int replicas = (int)Math.ceil((double)length / (double)maxSize);
Path path = cfs.getPath(i);
for (int r=1; r<replicas; r++) {
newSplits.add(new CombineFileSplit(new Path[]{path}, new long[]{r}, new long[]{length}, cfs.getLocations()));
}
}
}
}
splits.addAll(newSplits);
}
return splits;
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException {
Configuration conf = context.getConfiguration();
CombineFileSplit cSplit = (CombineFileSplit) split;
Path[] path = cSplit.getPaths();
long[] start = cSplit.getStartOffsets();
long[] len = cSplit.getLengths();
FileSystem fs = cSplit.getPath(0).getFileSystem(conf);
long startTS = conf.getLong(RowInputFormat.START_TIME_MILLIS, 0l);
long endTS = conf.getLong(RowInputFormat.END_TIME_MILLIS, 0l);
this.splitIterator = HDFSSplitIterator.newInstance(fs, path, start, len, startTS, endTS);
instantiateGfxdLoner(conf);
}
public HDFSSplitIterator(FileSystem fs, Path[] paths, long[] offsets, long[] lengths, long startTime, long endTime) throws IOException {
this.fs = fs;
this.split = new CombineFileSplit(paths, offsets, lengths, null);
while(currentHopIndex < split.getNumPaths() && !fs.exists(split.getPath(currentHopIndex))){
logger.warning(LocalizedStrings.HOPLOG_CLEANED_UP_BY_JANITOR, split.getPath(currentHopIndex));
currentHopIndex++;
}
if(currentHopIndex == split.getNumPaths()){
this.hoplog = null;
iterator = null;
} else {
this.hoplog = getHoplog(fs,split.getPath(currentHopIndex));
iterator = hoplog.getReader().scan(split.getOffset(currentHopIndex), split.getLength(currentHopIndex));
}
this.startTime = startTime;
this.endTime = endTime;
}
public CombinedFileRecordReader(InputFormat<K, V> inputFormat,
CombineFileSplit combineFileSplit,
TaskAttemptContext context)
{
this.inputFormat = inputFormat;
this.combineFileSplit = combineFileSplit;
this.context = context;
long[] lengths = combineFileSplit.getLengths();
long totalLength = 0;
for (long length : lengths)
totalLength += length;
fractionLength = new float[lengths.length];
for (int i = 0; i < lengths.length; i++)
fractionLength[i] = ((float) lengths[i]) / totalLength;
}
@Override
protected void map(AvroKey<GenericRecord> key, NullWritable value, Context context)
throws IOException, InterruptedException {
if (context.getNumReduceTasks() == 0) {
context.write(key, NullWritable.get());
} else {
populateComparableKeyRecord(key.datum(), this.outKey.datum());
this.outValue.datum(key.datum());
try {
context.write(this.outKey, this.outValue);
} catch (AvroRuntimeException e) {
final Path[] paths = ((CombineFileSplit) context.getInputSplit()).getPaths();
throw new IOException("Unable to process paths " + StringUtils.join(paths, ','), e);
}
}
context.getCounter(EVENT_COUNTER.RECORD_COUNT).increment(1);
}
/**
* Set the number of locations in the split to SPLIT_MAX_NUM_LOCATIONS if it is larger than
* SPLIT_MAX_NUM_LOCATIONS (MAPREDUCE-5186).
*/
private static List<InputSplit> cleanSplits(List<InputSplit> splits) throws IOException {
if (VersionInfo.getVersion().compareTo("2.3.0") >= 0) {
// This issue was fixed in 2.3.0, if newer version, no need to clean up splits
return splits;
}
List<InputSplit> cleanedSplits = Lists.newArrayList();
for (int i = 0; i < splits.size(); i++) {
CombineFileSplit oldSplit = (CombineFileSplit) splits.get(i);
String[] locations = oldSplit.getLocations();
Preconditions.checkNotNull(locations, "CombineFileSplit.getLocations() returned null");
if (locations.length > SPLIT_MAX_NUM_LOCATIONS) {
locations = Arrays.copyOf(locations, SPLIT_MAX_NUM_LOCATIONS);
}
cleanedSplits.add(new CombineFileSplit(oldSplit.getPaths(), oldSplit.getStartOffsets(), oldSplit.getLengths(),
locations));
}
return cleanedSplits;
}
/**
* Create a single split from the list of blocks specified in validBlocks
* Add this new split into splitList.
*/
private void addCreatedSplit(List<InputSplit> splitList,
List<String> locations,
ArrayList<OneBlockInfo> validBlocks) {
// create an input split
Path[] fl = new Path[validBlocks.size()];
long[] offset = new long[validBlocks.size()];
long[] length = new long[validBlocks.size()];
for (int i = 0; i < validBlocks.size(); i++) {
fl[i] = validBlocks.get(i).onepath;
offset[i] = validBlocks.get(i).offset;
length[i] = validBlocks.get(i).length;
}
// add this split to the list that is returned
CombineFileSplit thissplit = new CombineFileSplit(fl, offset,
length, locations.toArray(new String[0]));
splitList.add(thissplit);
}
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
List<InputSplit> kiteCombineSplits = Lists.newArrayList();
for (InputSplit inputSplit : super.getSplits(job)) {
kiteCombineSplits.add(new KiteCombineFileSplit((CombineFileSplit) inputSplit));
}
return kiteCombineSplits;
}
public MDSCombineSpreadReader( final CombineFileSplit split , final TaskAttemptContext context , final Integer index ) throws IOException{
Configuration config = context.getConfiguration();
Path path = split.getPath( index );
FileSystem fs = path.getFileSystem( config );
long fileLength = fs.getLength( path );
InputStream in = fs.open( path );
innerReader = new MDSSpreadReader();
innerReader.setStream( in , fileLength , 0 , fileLength );
}
/**
* @param split Description of input sources.
* @param conf Used to resolve FileSystem instances.
*/
public FileQueue(CombineFileSplit split, Configuration conf)
throws IOException {
this.conf = conf;
paths = split.getPaths();
startoffset = split.getStartOffsets();
lengths = split.getLengths();
nextSource();
}
@Test
public void testEmpty() throws Exception {
final Configuration conf = new Configuration();
// verify OK if unused
final FileQueue q = new FileQueue(new CombineFileSplit(
new Path[0], new long[0], new long[0], new String[0]), conf);
}
@Test (timeout=1000)
public void testGridmixSplit() throws Exception {
Path[] files = {new Path("one"), new Path("two")};
long[] start = {1, 2};
long[] lengths = {100, 200};
String[] locations = {"locOne", "loctwo"};
CombineFileSplit cfSplit = new CombineFileSplit(files, start, lengths,
locations);
ResourceUsageMetrics metrics = new ResourceUsageMetrics();
metrics.setCumulativeCpuUsage(200);
double[] reduceBytes = {8.1d, 8.2d};
double[] reduceRecords = {9.1d, 9.2d};
long[] reduceOutputBytes = {101L, 102L};
long[] reduceOutputRecords = {111L, 112L};
GridmixSplit test = new GridmixSplit(cfSplit, 2, 3, 4L, 5L, 6L, 7L,
reduceBytes, reduceRecords, reduceOutputBytes, reduceOutputRecords);
ByteArrayOutputStream data = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(data);
test.write(out);
GridmixSplit copy = new GridmixSplit();
copy.readFields(new DataInputStream(new ByteArrayInputStream(data
.toByteArray())));
// data should be the same
assertEquals(test.getId(), copy.getId());
assertEquals(test.getMapCount(), copy.getMapCount());
assertEquals(test.getInputRecords(), copy.getInputRecords());
assertEquals(test.getOutputBytes()[0], copy.getOutputBytes()[0]);
assertEquals(test.getOutputRecords()[0], copy.getOutputRecords()[0]);
assertEquals(test.getReduceBytes(0), copy.getReduceBytes(0));
assertEquals(test.getReduceRecords(0), copy.getReduceRecords(0));
}
void checkSplitEq(FileSystem fs, CombineFileSplit split, long bytes)
throws Exception {
long splitBytes = 0L;
HashSet<Path> uniq = new HashSet<Path>();
for (int i = 0; i < split.getNumPaths(); ++i) {
splitBytes += split.getLength(i);
assertTrue(
split.getLength(i) <= fs.getFileStatus(split.getPath(i)).getLen());
assertFalse(uniq.contains(split.getPath(i)));
uniq.add(split.getPath(i));
}
assertEquals(bytes, splitBytes);
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
CombineFileSplit cSplit = (CombineFileSplit) split;
Path[] path = cSplit.getPaths();
long[] start = cSplit.getStartOffsets();
long[] len = cSplit.getLengths();
Configuration conf = context.getConfiguration();
FileSystem fs = cSplit.getPath(0).getFileSystem(conf);
this.splitIterator = HDFSSplitIterator.newInstance(fs, path, start, len, 0l, 0l);
}
public void testHfileSplitCompleteness() throws Exception {
cluster = initMiniCluster(CLUSTER_PORT, 1);
int count = 40;
HdfsSortedOplogOrganizer bucket1 = new HdfsSortedOplogOrganizer(
regionManager, 1);
ArrayList<TestEvent> items = new ArrayList<TestEvent>();
for (int i = 0; i < count; i++) {
items.add(new TestEvent(("key-" + i), ("value-" + System.nanoTime())));
}
bucket1.flush(items.iterator(), count);
Configuration conf = hdfsStore.getFileSystem().getConf();
GFInputFormat gfInputFormat = new GFInputFormat();
Job job = Job.getInstance(conf, "test");
conf = job.getConfiguration();
conf.set(GFInputFormat.INPUT_REGION, getName());
conf.set(GFInputFormat.HOME_DIR, testDataDir.getName());
conf.setBoolean(GFInputFormat.CHECKPOINT, false);
List<InputSplit> splits = gfInputFormat.getSplits(job);
assertTrue(1 < splits.size());
long lastBytePositionOfPrevious = 0;
for (InputSplit inputSplit : splits) {
CombineFileSplit split = (CombineFileSplit) inputSplit;
assertEquals(1, split.getPaths().length);
assertEquals(lastBytePositionOfPrevious, split.getOffset(0));
lastBytePositionOfPrevious += split.getLength();
assertEquals(1, split.getLocations().length);
}
Path bucketPath = new Path(regionPath, "1");
Path hopPath = new Path(bucketPath, bucket1.getSortedOplogs().iterator()
.next().get().getFileName());
FileStatus status = hdfsStore.getFileSystem().getFileStatus(hopPath);
assertEquals(status.getLen(), lastBytePositionOfPrevious);
}
/**
* @param split Description of input sources.
* @param conf Used to resolve FileSystem instances.
*/
public FileQueue(CombineFileSplit split, Configuration conf)
throws IOException {
this.conf = conf;
paths = split.getPaths();
startoffset = split.getStartOffsets();
lengths = split.getLengths();
nextSource();
}