下面列出了怎么用org.apache.hadoop.mapreduce.TaskAttemptID的API类实例代码及写法,或者点击链接到github查看源代码。
public synchronized void addKnownMapOutput(String hostName,
String hostUrl,
TaskAttemptID mapId) {
MapHost host = mapLocations.get(hostName);
if (host == null) {
host = new MapHost(hostName, hostUrl);
mapLocations.put(hostName, host);
}
host.addKnownMap(mapId);
// Mark the host as pending
if (host.getState() == State.PENDING) {
pendingHosts.add(host);
notifyAll();
}
}
@Override
public void open(HadoopInputSplit split) throws IOException {
// enforce sequential open() calls
synchronized (OPEN_MUTEX) {
TaskAttemptContext context = new TaskAttemptContextImpl(configuration, new TaskAttemptID());
try {
this.recordReader = this.mapreduceInputFormat
.createRecordReader(split.getHadoopInputSplit(), context);
this.recordReader.initialize(split.getHadoopInputSplit(), context);
} catch (InterruptedException e) {
throw new IOException("Could not create RecordReader.", e);
} finally {
this.fetched = false;
}
}
}
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
// Make one reducer slower for speculative execution
TaskAttemptID taid = context.getTaskAttemptID();
long sleepTime = 100;
Configuration conf = context.getConfiguration();
boolean test_speculate_reduce =
conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
// IF TESTING REDUCE SPECULATIVE EXECUTION:
// Make the "*_r_000000_0" attempt take much longer than the others.
// When speculative execution is enabled, this should cause the attempt
// to be killed and restarted. At that point, the attempt ID will be
// "*_r_000000_1", so sleepTime will still remain 100ms.
if ( (taid.getTaskType() == TaskType.REDUCE) && test_speculate_reduce
&& (taid.getTaskID().getId() == 0) && (taid.getId() == 0)) {
sleepTime = 10000;
}
try{
Thread.sleep(sleepTime);
} catch(InterruptedException ie) {
// Ignore
}
context.write(key,new IntWritable(0));
}
public void setDatum(Object oDatum) {
this.datum = (MapAttemptFinished)oDatum;
this.attemptId = TaskAttemptID.forName(datum.attemptId.toString());
this.taskType = TaskType.valueOf(datum.taskType.toString());
this.taskStatus = datum.taskStatus.toString();
this.mapFinishTime = datum.mapFinishTime;
this.finishTime = datum.finishTime;
this.hostname = datum.hostname.toString();
this.rackName = datum.rackname.toString();
this.port = datum.port;
this.state = datum.state.toString();
this.counters = EventReader.fromAvro(datum.counters);
this.clockSplits = AvroArrayUtils.fromAvro(datum.clockSplits);
this.cpuUsages = AvroArrayUtils.fromAvro(datum.cpuUsages);
this.vMemKbytes = AvroArrayUtils.fromAvro(datum.vMemKbytes);
this.physMemKbytes = AvroArrayUtils.fromAvro(datum.physMemKbytes);
}
public void setDatum(Object odatum) {
this.datum = (TaskFailed)odatum;
this.id =
TaskID.forName(datum.taskid.toString());
this.taskType =
TaskType.valueOf(datum.taskType.toString());
this.finishTime = datum.finishTime;
this.error = datum.error.toString();
this.failedDueToAttempt =
datum.failedDueToAttempt == null
? null
: TaskAttemptID.forName(
datum.failedDueToAttempt.toString());
this.status = datum.status.toString();
this.counters =
EventReader.fromAvro(datum.counters);
}
@Test
public void readExcelInputFormatExcel2003SingleSheetEncryptedNegativeLowFootprint()
throws IOException, InterruptedException {
Configuration conf = new Configuration(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName = "excel2003encrypt.xls";
String fileNameSpreadSheet = classLoader.getResource(fileName).getFile();
Path file = new Path(fileNameSpreadSheet);
// set locale to the one of the test data
conf.set("hadoopoffice.read.locale.bcp47", "de");
// low footprint
conf.set("hadoopoffice.read.lowFootprint", "true");
// for decryption simply set the password
conf.set("hadoopoffice.read.security.crypt.password", "test2");
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, file);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
ExcelFileInputFormat format = new ExcelFileInputFormat();
List<InputSplit> splits = format.getSplits(job);
assertEquals(1, splits.size(), "Only one split generated for Excel file");
RecordReader<Text, ArrayWritable> reader = format.createRecordReader(splits.get(0), context);
InterruptedException ex = assertThrows(InterruptedException.class,
() -> reader.initialize(splits.get(0), context), "Exception is thrown in case of wrong password");
}
@Override
public void open(String uId) throws Exception {
this.hash = uId.hashCode();
Job job = ((ConfigurableHDFSFileSink<K, V>) getWriteOperation().getSink()).jobInstance();
FileOutputFormat.setOutputPath(job, new Path(path));
// Each Writer is responsible for writing one bundle of elements and is represented by one
// unique Hadoop task based on uId/hash. All tasks share the same job ID. Since Dataflow
// handles retrying of failed bundles, each task has one attempt only.
JobID jobId = job.getJobID();
TaskID taskId = new TaskID(jobId, TaskType.REDUCE, hash);
configure(job);
context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID(taskId, 0));
FileOutputFormat<K, V> outputFormat = formatClass.newInstance();
recordWriter = outputFormat.getRecordWriter(context);
outputCommitter = (FileOutputCommitter) outputFormat.getOutputCommitter(context);
}
public LocalFetcher(JobConf job, TaskAttemptID reduceId,
ShuffleSchedulerImpl<K, V> scheduler,
MergeManager<K,V> merger,
Reporter reporter, ShuffleClientMetrics metrics,
ExceptionReporter exceptionReporter,
SecretKey shuffleKey,
Map<TaskAttemptID, MapOutputFile> localMapFiles) {
super(job, reduceId, scheduler, merger, reporter, metrics,
exceptionReporter, shuffleKey);
this.job = job;
this.localMapFiles = localMapFiles;
setName("localfetcher#" + id);
setDaemon(true);
}
private void printTaskAttempts(TaskReport report) {
if (report.getCurrentStatus() == TIPStatus.COMPLETE) {
System.out.println(report.getSuccessfulTaskAttemptId());
} else if (report.getCurrentStatus() == TIPStatus.RUNNING) {
for (TaskAttemptID t :
report.getRunningTaskAttemptIds()) {
System.out.println(t);
}
}
}
private void setContextTaskId(final int taskId) {
Context context = reduceDriver.getContext();
when(context.getTaskAttemptID()).thenAnswer(new Answer<TaskAttemptID>() {
@Override
public TaskAttemptID answer(InvocationOnMock invocation) throws Throwable {
return TaskAttemptID.forName("attempt__0000_r_" + taskId + "_0");
}
});
}
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
HistoryEventEmitter thatg) {
if (taskAttemptIDName == null) {
return null;
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);
String finishTime = line.get("FINISH_TIME");
String status = line.get("TASK_STATUS");
if (finishTime != null && status != null
&& status.equalsIgnoreCase("success")) {
String hostName = line.get("HOSTNAME");
String counters = line.get("COUNTERS");
String state = line.get("STATE_STRING");
MapAttempt20LineHistoryEventEmitter that =
(MapAttempt20LineHistoryEventEmitter) thatg;
if ("success".equalsIgnoreCase(status)) {
return new MapAttemptFinishedEvent
(taskAttemptID,
that.originalTaskType, status,
Long.parseLong(finishTime),
Long.parseLong(finishTime),
hostName, -1, null, state, maybeParseCounters(counters),
null);
}
}
return null;
}
@SuppressWarnings("deprecation")
public TaskAttemptInfo getTaskAttemptInfo(TaskTracker taskTracker,
TaskAttemptID taskAttemptId) {
JobID jobid = (JobID) taskAttemptId.getJobID();
assert (jobid == getJobID());
return (taskAttemptId.isMap()) ? getMapTaskAttemptInfo(
taskTracker, taskAttemptId)
: getReduceTaskAttemptInfo(taskTracker, taskAttemptId);
}
@SuppressWarnings({"unchecked", "rawtypes"})
@Test (timeout=30000)
public void testSleepMapper() throws Exception {
SleepJob.SleepMapper test = new SleepJob.SleepMapper();
Configuration conf = new Configuration();
conf.setInt(JobContext.NUM_REDUCES, 2);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
TaskAttemptID taskId = new TaskAttemptID();
FakeRecordLLReader reader = new FakeRecordLLReader();
LoadRecordGkNullWriter writer = new LoadRecordGkNullWriter();
OutputCommitter committer = new CustomOutputCommitter();
StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter();
SleepSplit split = getSleepSplit();
MapContext<LongWritable, LongWritable, GridmixKey, NullWritable> mapcontext = new MapContextImpl<LongWritable, LongWritable, GridmixKey, NullWritable>(
conf, taskId, reader, writer, committer, reporter, split);
Context context = new WrappedMapper<LongWritable, LongWritable, GridmixKey, NullWritable>()
.getMapContext(mapcontext);
long start = System.currentTimeMillis();
LOG.info("start:" + start);
LongWritable key = new LongWritable(start + 2000);
LongWritable value = new LongWritable(start + 2000);
// should slip 2 sec
test.map(key, value, context);
LOG.info("finish:" + System.currentTimeMillis());
assertTrue(System.currentTimeMillis() >= (start + 2000));
test.cleanup(context);
assertEquals(1, writer.getData().size());
}
public TaskAttemptUnsuccessfulCompletionEvent
(TaskAttemptID id, TaskType taskType,
String status, long finishTime,
String hostname, int port, String rackName,
String error, int[][] allSplits) {
this(id, taskType, status, finishTime, hostname, port,
rackName, error, EMPTY_COUNTERS, null);
}
@SuppressWarnings({"rawtypes", "unchecked"})
@Test (timeout=10000)
public void testLoadMapper() throws Exception {
Configuration conf = new Configuration();
conf.setInt(JobContext.NUM_REDUCES, 2);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
TaskAttemptID taskId = new TaskAttemptID();
RecordReader<NullWritable, GridmixRecord> reader = new FakeRecordReader();
LoadRecordGkGrWriter writer = new LoadRecordGkGrWriter();
OutputCommitter committer = new CustomOutputCommitter();
StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter();
LoadSplit split = getLoadSplit();
MapContext<NullWritable, GridmixRecord, GridmixKey, GridmixRecord> mapContext = new MapContextImpl<NullWritable, GridmixRecord, GridmixKey, GridmixRecord>(
conf, taskId, reader, writer, committer, reporter, split);
// context
Context ctx = new WrappedMapper<NullWritable, GridmixRecord, GridmixKey, GridmixRecord>()
.getMapContext(mapContext);
reader.initialize(split, ctx);
ctx.getConfiguration().setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
CompressionEmulationUtil.setCompressionEmulationEnabled(
ctx.getConfiguration(), true);
LoadJob.LoadMapper mapper = new LoadJob.LoadMapper();
// setup, map, clean
mapper.run(ctx);
Map<GridmixKey, GridmixRecord> data = writer.getData();
// check result
assertEquals(2, data.size());
}
@Test(expected = RuntimeException.class)
public void testProblemGettingLocalCacheFiles() throws IOException, URISyntaxException {
final URL url = createUrl("full_splits.txt");
MultiTableRangePartitioner.setContext(new MapContextImpl<Key,Value,Text,Mutation>(configuration, new TaskAttemptID(), null, null, null, null, null) {
@Override
public org.apache.hadoop.fs.Path[] getLocalCacheFiles() throws IOException {
throw new IOException("Local cache files failure");
}
});
getPartition("23432");
}
private JsonRecordReader getJsonRecordReader(String file) throws IOException, URISyntaxException {
InputSplit split = ColumnBasedHandlerTestUtil.getSplit(file);
TaskAttemptContext ctx = new TaskAttemptContextImpl(conf, new TaskAttemptID());
TypeRegistry.reset();
TypeRegistry.getInstance(ctx.getConfiguration());
log.debug(TypeRegistry.getContents());
JsonRecordReader reader = new JsonRecordReader();
reader.initialize(split, ctx);
return reader;
}
public IllustratorContextImpl(Job job,
List<Pair<PigNullableWritable, Writable>> input,
POPackage pkg
) throws IOException, InterruptedException {
super(job.getJobConf(), new TaskAttemptID(), new FakeRawKeyValueIterator(input.iterator().hasNext()),
null, null, null, null, new IllustrateDummyReporter(), null, PigNullableWritable.class, NullableTuple.class);
bos = new ByteArrayOutputStream();
dos = new DataOutputStream(bos);
org.apache.hadoop.mapreduce.Job nwJob = new org.apache.hadoop.mapreduce.Job(job.getJobConf());
sortComparator = nwJob.getSortComparator();
groupingComparator = nwJob.getGroupingComparator();
Collections.sort(input, new Comparator<Pair<PigNullableWritable, Writable>>() {
@Override
public int compare(Pair<PigNullableWritable, Writable> o1,
Pair<PigNullableWritable, Writable> o2) {
try {
o1.first.write(dos);
int l1 = bos.size();
o2.first.write(dos);
int l2 = bos.size();
byte[] bytes = bos.toByteArray();
bos.reset();
return sortComparator.compare(bytes, 0, l1, bytes, l1, l2-l1);
} catch (IOException e) {
throw new RuntimeException("Serialization exception in sort:"+e.getMessage());
}
}
}
);
currentValues = new ArrayList<NullableTuple>();
it = input.iterator();
if (it.hasNext()) {
Pair<PigNullableWritable, Writable> entry = it.next();
nextKey = entry.first;
nextValue = (NullableTuple) entry.second;
}
pack = pkg;
}
@Override
public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID)
throws IOException {
try {
return clientCache.getClient(jobID).getLogFilePath(jobID, taskAttemptID);
} catch (YarnException e) {
throw new IOException(e);
}
}
/**
* Mask the job ID part in a {@link TaskAttemptID}.
*
* @param attemptId
* raw {@link TaskAttemptID} read from trace
* @return masked {@link TaskAttemptID} with empty {@link JobID}.
*/
private TaskAttemptID maskAttemptID(TaskAttemptID attemptId) {
JobID jobId = new JobID();
TaskType taskType = attemptId.getTaskType();
TaskID taskId = attemptId.getTaskID();
return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), taskType,
taskId.getId(), attemptId.getId());
}
@Test
public void testReinit() throws Exception {
// Test that a split containing multiple files works correctly,
// with the child RecordReader getting its initialize() method
// called a second time.
TaskAttemptID taskId = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
Configuration conf = new Configuration();
TaskAttemptContext context = new TaskAttemptContextImpl(conf, taskId);
// This will create a CombineFileRecordReader that itself contains a
// DummyRecordReader.
InputFormat inputFormat = new ChildRRInputFormat();
Path [] files = { new Path("file1"), new Path("file2") };
long [] lengths = { 1, 1 };
CombineFileSplit split = new CombineFileSplit(files, lengths);
RecordReader rr = inputFormat.createRecordReader(split, context);
assertTrue("Unexpected RR type!", rr instanceof CombineFileRecordReader);
// first initialize() call comes from MapTask. We'll do it here.
rr.initialize(split, context);
// First value is first filename.
assertTrue(rr.nextKeyValue());
assertEquals("file1", rr.getCurrentValue().toString());
// The inner RR will return false, because it only emits one (k, v) pair.
// But there's another sub-split to process. This returns true to us.
assertTrue(rr.nextKeyValue());
// And the 2nd rr will have its initialize method called correctly.
assertEquals("file2", rr.getCurrentValue().toString());
// But after both child RR's have returned their singleton (k, v), this
// should also return false.
assertFalse(rr.nextKeyValue());
}
@BeforeClass
public void setUp() throws Exception {
m_workdir = new Path(
System.getProperty("test.tmp.dir", DEFAULT_WORK_DIR));
m_conf = new JobConf();
m_rand = Utils.createRandom();
unsafe = Utils.getUnsafe();
try {
m_fs = FileSystem.getLocal(m_conf).getRaw();
m_fs.delete(m_workdir, true);
m_fs.mkdirs(m_workdir);
} catch (IOException e) {
throw new IllegalStateException("bad fs init", e);
}
m_taid = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
m_tacontext = new TaskAttemptContextImpl(m_conf, m_taid);
MneConfigHelper.setDir(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, m_workdir.toString());
MneConfigHelper.setBaseOutputName(m_conf, null, "chunk-data");
MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SERVICE_NAME);
MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SLOT_KEY_ID);
MneConfigHelper.setDurableTypes(m_conf,
MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new DurableType[] {DurableType.CHUNK});
MneConfigHelper.setEntityFactoryProxies(m_conf,
MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new Class<?>[] {});
MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SERVICE_NAME);
MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SLOT_KEY_ID);
MneConfigHelper.setMemPoolSize(m_conf,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, 1024L * 1024 * 1024 * 4);
MneConfigHelper.setDurableTypes(m_conf,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new DurableType[] {DurableType.CHUNK});
MneConfigHelper.setEntityFactoryProxies(m_conf,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new Class<?>[] {});
}
/**
* Do some basic verification on the input received -- Being defensive
* @param compressedLength
* @param decompressedLength
* @param forReduce
* @param remaining
* @param mapId
* @return true/false, based on if the verification succeeded or not
*/
private boolean verifySanity(long compressedLength, long decompressedLength,
int forReduce, Set<TaskAttemptID> remaining, TaskAttemptID mapId) {
if (compressedLength < 0 || decompressedLength < 0) {
wrongLengthErrs.increment(1);
LOG.warn(getName() + " invalid lengths in map output header: id: " +
mapId + " len: " + compressedLength + ", decomp len: " +
decompressedLength);
return false;
}
if (forReduce != reduce) {
wrongReduceErrs.increment(1);
LOG.warn(getName() + " data for the wrong reduce map: " +
mapId + " len: " + compressedLength + " decomp len: " +
decompressedLength + " for reduce " + forReduce);
return false;
}
// Sanity check
if (!remaining.contains(mapId)) {
wrongMapErrs.increment(1);
LOG.warn("Invalid map-output! Received output for " + mapId);
return false;
}
return true;
}
@VisibleForTesting
OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId,
MergeManagerImpl<K,V> merger, long size,
JobConf conf,
MapOutputFile mapOutputFile,
int fetcher, boolean primaryMapOutput,
FileSystem fs, Path outputPath) throws IOException {
super(mapId, size, primaryMapOutput);
this.fs = fs;
this.merger = merger;
this.outputPath = outputPath;
tmpOutputPath = getTempPath(outputPath, fetcher);
disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath));
this.conf = conf;
}
@SuppressWarnings({"rawtypes", "unchecked"})
@Test (timeout=10000)
public void testLoadMapper() throws Exception {
Configuration conf = new Configuration();
conf.setInt(JobContext.NUM_REDUCES, 2);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
TaskAttemptID taskId = new TaskAttemptID();
RecordReader<NullWritable, GridmixRecord> reader = new FakeRecordReader();
LoadRecordGkGrWriter writer = new LoadRecordGkGrWriter();
OutputCommitter committer = new CustomOutputCommitter();
StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter();
LoadSplit split = getLoadSplit();
MapContext<NullWritable, GridmixRecord, GridmixKey, GridmixRecord> mapContext = new MapContextImpl<NullWritable, GridmixRecord, GridmixKey, GridmixRecord>(
conf, taskId, reader, writer, committer, reporter, split);
// context
Context ctx = new WrappedMapper<NullWritable, GridmixRecord, GridmixKey, GridmixRecord>()
.getMapContext(mapContext);
reader.initialize(split, ctx);
ctx.getConfiguration().setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
CompressionEmulationUtil.setCompressionEmulationEnabled(
ctx.getConfiguration(), true);
LoadJob.LoadMapper mapper = new LoadJob.LoadMapper();
// setup, map, clean
mapper.run(ctx);
Map<GridmixKey, GridmixRecord> data = writer.getData();
// check result
assertEquals(2, data.size());
}
public IllustratorContext(Configuration conf, DataBag input,
List<Pair<PigNullableWritable, Writable>> output,
InputSplit split) throws IOException, InterruptedException {
super(conf, new TaskAttemptID(), null, null, null, new IllustrateDummyReporter(), split);
conf.set("inIllustrator", "true");
if (output == null)
throw new IOException("Null output can not be used");
this.input = input; this.output = output;
}
public MapContextImpl(Configuration conf, TaskAttemptID taskid,
RecordReader<KEYIN,VALUEIN> reader,
RecordWriter<KEYOUT,VALUEOUT> writer,
OutputCommitter committer,
TaskContext context,
InputSplit split, Reporter reporter) {
super(conf, taskid, writer, committer, context, reporter);
this.reader = reader;
this.split = split;
}
@SuppressWarnings("unchecked")
@Test
public void testProgressIsReportedIfInputASeriesOfEmptyFiles() throws IOException, InterruptedException {
JobConf conf = new JobConf();
Path[] paths = new Path[3];
File[] files = new File[3];
long[] fileLength = new long[3];
try {
for(int i=0;i<3;i++){
File dir = new File(outDir.toString());
dir.mkdir();
files[i] = new File(dir,"testfile"+i);
FileWriter fileWriter = new FileWriter(files[i]);
fileWriter.flush();
fileWriter.close();
fileLength[i] = i;
paths[i] = new Path(outDir+"/testfile"+i);
}
CombineFileSplit combineFileSplit = new CombineFileSplit(paths, fileLength);
TaskAttemptID taskAttemptID = Mockito.mock(TaskAttemptID.class);
TaskReporter reporter = Mockito.mock(TaskReporter.class);
TaskAttemptContextImpl taskAttemptContext =
new TaskAttemptContextImpl(conf, taskAttemptID,reporter);
CombineFileRecordReader cfrr = new CombineFileRecordReader(combineFileSplit,
taskAttemptContext, TextRecordReaderWrapper.class);
cfrr.initialize(combineFileSplit,taskAttemptContext);
verify(reporter).progress();
Assert.assertFalse(cfrr.nextKeyValue());
verify(reporter, times(3)).progress();
} finally {
FileUtil.fullyDelete(new File(outDir.toString()));
}
}
@Test
public void readEthereumBlockInputFormatBlock3510000to3510010() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
Configuration conf = new Configuration(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="eth351000to3510010.bin";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, file);
EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
List<InputSplit> splits = format.getSplits(job);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
assertEquals( 1, splits.size(),"Only one split generated for block 3510000 .. 3510010");
RecordReader<BytesWritable, EthereumBlock> reader = format.createRecordReader(splits.get(0), context);
assertNotNull( reader,"Format returned null RecordReader");
reader.initialize(splits.get(0),context);
BytesWritable key = new BytesWritable();
EthereumBlock block = new EthereumBlock();
int count=0;
while (count<11) {
if (reader.nextKeyValue()) {
count++;
}
}
assertEquals(11,count,"Block 3510000 .. 3510010 contains 11 blocks");
assertFalse( reader.nextKeyValue(),"No further blocks in block 3510000 .. 3510010");
reader.close();
}
@Override
public void open(HadoopInputSplit split) throws IOException {
TaskAttemptContext context = new TaskAttemptContextImpl(configuration, new TaskAttemptID());
try {
this.recordReader = this.hCatInputFormat
.createRecordReader(split.getHadoopInputSplit(), context);
this.recordReader.initialize(split.getHadoopInputSplit(), context);
} catch (InterruptedException e) {
throw new IOException("Could not create RecordReader.", e);
} finally {
this.fetched = false;
}
}