下面列出了java.lang.InterruptedException#org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void recoverTask(int taskIndex, int attemptId) throws IOException {
if (!initialized) {
throw new RuntimeException("Committer not initialized");
}
TaskAttemptID taskAttemptID = new TaskAttemptID(
Long.toString(getContext().getApplicationId().getClusterTimestamp())
+ String.valueOf(getContext().getVertexIndex()),
getContext().getApplicationId().getId(),
((jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR, false) ?
TaskType.MAP : TaskType.REDUCE)),
taskIndex, attemptId);
TaskAttemptContext taskContext = new TaskAttemptContextImpl(jobConf,
taskAttemptID);
committer.recoverTask(taskContext);
}
@Override
public void init() throws IOException {
super.init();
Configuration taskConf = new Configuration();
Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
taskConf.set(FileOutputFormat.OUTDIR, stagingResultDir.toString());
ExecutionBlockId ebId = taskAttemptId.getTaskId().getExecutionBlockId();
writerContext = new TaskAttemptContextImpl(taskConf,
new TaskAttemptID(ebId.getQueryId().toString(), ebId.getId(), TaskType.MAP,
taskAttemptId.getTaskId().getId(), taskAttemptId.getId()));
HFileOutputFormat2 hFileOutputFormat2 = new HFileOutputFormat2();
try {
writer = hFileOutputFormat2.getRecordWriter(writerContext);
committer = new FileOutputCommitter(FileOutputFormat.getOutputPath(writerContext), writerContext);
workingFilePath = committer.getWorkPath();
} catch (InterruptedException e) {
throw new IOException(e.getMessage(), e);
}
LOG.info("Created hbase file writer: " + workingFilePath);
}
@Override
public void recoverTask(int taskIndex, int attemptId) throws IOException {
if (!initialized) {
throw new RuntimeException("Committer not initialized");
}
TaskAttemptID taskAttemptID = new TaskAttemptID(
Long.toString(context.getApplicationId().getClusterTimestamp())
+ String.valueOf(context.getVertexIndex()),
context.getApplicationId().getId(),
((jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR, false) ?
TaskType.MAP : TaskType.REDUCE)),
taskIndex, attemptId);
TaskAttemptContext taskContext = new TaskAttemptContextImpl(jobConf,
taskAttemptID);
committer.recoverTask(taskContext);
}
@Override
public IPentahoRecordReader createRecordReader( IPentahoInputSplit split ) throws Exception {
return inClassloader( () -> {
PentahoInputSplitImpl pentahoInputSplit = (PentahoInputSplitImpl) split;
InputSplit inputSplit = pentahoInputSplit.getInputSplit();
ReadSupport<RowMetaAndData> readSupport = new PentahoParquetReadSupport();
ParquetRecordReader<RowMetaAndData> nativeRecordReader =
new ParquetRecordReader<>( readSupport, ParquetInputFormat.getFilter( job
.getConfiguration() ) );
TaskAttemptContextImpl task = new TaskAttemptContextImpl( job.getConfiguration(), new TaskAttemptID() );
nativeRecordReader.initialize( inputSplit, task );
return new PentahoParquetRecordReader( nativeRecordReader );
} );
}
@Before
public void setUp() throws Exception {
conf = new Configuration();
conf.addResource("config/all-config.xml");
ctx = new TaskAttemptContextImpl(conf, new TaskAttemptID());
ctx.getConfiguration().setInt(ContentIndexingColumnBasedHandler.NUM_SHARDS, 131);
ctx.getConfiguration().set(ContentIndexingColumnBasedHandler.SHARD_TNAME, "shard");
ctx.getConfiguration().set(ContentIndexingColumnBasedHandler.SHARD_GIDX_TNAME, "shardIndex");
ctx.getConfiguration().set(ContentIndexingColumnBasedHandler.SHARD_GRIDX_TNAME, "shardIndex");
ctx.getConfiguration().set(TypeRegistry.INGEST_DATA_TYPES, "test");
ctx.getConfiguration().set("data.name", "test");
ctx.getConfiguration().set("test.data.auth.id.mode", "NEVER");
ctx.getConfiguration().set("test" + BaseIngestHelper.DEFAULT_TYPE, LcNoDiacriticsType.class.getName());
ctx.getConfiguration().set("test" + TypeRegistry.HANDLER_CLASSES, TestContentIndexingColumnBasedHandler.class.getName());
ctx.getConfiguration().set("test" + TypeRegistry.RAW_READER, TestEventRecordReader.class.getName());
ctx.getConfiguration().set("test" + TypeRegistry.INGEST_HELPER, TestContentBaseIngestHelper.class.getName());
ctx.getConfiguration().set(TypeRegistry.EXCLUDED_HANDLER_CLASSES, "FAKE_HANDLER_CLASS"); // it will die if this field is not faked
helper = new TestContentBaseIngestHelper();
colVis = new ColumnVisibility("");
}
@Before
public void setup() throws Exception {
conf = new Configuration();
conf.set("num.shards", "11");
conf.set("data.name", "testdatatype");
conf.set("testdatatype.ingest.helper.class", TestBaseIngestHelper.class.getName());
conf.set("testdatatype.handler.classes", DateIndexDataTypeHandler.class.getName());
// date index configuration
conf.set("date.index.table.name", TableName.DATE_INDEX);
conf.set("date.index.table.loader.priority", "30");
conf.set("DateIndex.table.config.class", DateIndexTableConfigHelper.class.getName());
conf.set("date.index.table.locality.groups", "activity:ACTIVITY,loaded:LOADED");
// some date index configuration for our "testdatatype" events
conf.set("testdatatype.date.index.type.to.field.map", "ACTIVITY=ACTIVITY_DATE,LOADED=LOAD_DATE");
conf.set("all" + Properties.INGEST_POLICY_ENFORCER_CLASS, IngestPolicyEnforcer.NoOpIngestPolicyEnforcer.class.getName());
TypeRegistry.reset();
TypeRegistry.getInstance(conf);
handler = new DateIndexDataTypeHandler<>();
handler.setup(new TaskAttemptContextImpl(conf, new TaskAttemptID()));
}
@Override
public void loadTestData(SequenceFile.Writer seqFile) throws IOException {
TypeRegistry.reset();
TypeRegistry.getInstance(this.conf);
Path path = new Path(this.uri);
File file = new File(this.uri);
FileSplit split = new FileSplit(path, 0, file.length(), null);
TaskAttemptContext ctx = new TaskAttemptContextImpl(this.conf, new TaskAttemptID());
try (JsonRecordReader reader = new JsonRecordReader()) {
reader.initialize(split, ctx);
while (reader.nextKeyValue()) {
RawRecordContainer raw = reader.getEvent();
seqFile.append(new Text(), raw);
}
}
}
@Test
public void readBitcoinBlockInputFormatBlockVersion4() throws IOException, InterruptedException {
Configuration conf = new Configuration(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="version4.blk";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, file);
BitcoinBlockFileInputFormat format = new BitcoinBlockFileInputFormat();
List<InputSplit> splits = format.getSplits(job);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
assertEquals( 1, splits.size(),"Only one split generated for block version 4");
RecordReader<BytesWritable, BitcoinBlock> reader = format.createRecordReader(splits.get(0), context);
assertNotNull( reader,"Format returned null RecordReader");
reader.initialize(splits.get(0),context);
BytesWritable key = new BytesWritable();
BitcoinBlock block = new BitcoinBlock();
assertTrue( reader.nextKeyValue(),"Input Split for block version contains at least one block");
block=reader.getCurrentValue();
assertEquals( 936, block.getTransactions().size(),"Random block version 4 must contain exactly 936 transactions");
assertFalse( reader.nextKeyValue(),"No further blocks in block version 4");
reader.close();
}
@Test
public void readBitcoinTransactionInputFormatBlockVersion3() throws IOException, InterruptedException {
Configuration conf = new Configuration(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="version3.blk";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, file);
BitcoinTransactionFileInputFormat format = new BitcoinTransactionFileInputFormat();
List<InputSplit> splits = format.getSplits(job);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
assertEquals( 1, splits.size(),"Only one split generated for block version 3");
RecordReader<BytesWritable, BitcoinTransaction> reader = format.createRecordReader(splits.get(0), context);
assertNotNull( reader,"Format returned null RecordReader");
reader.initialize(splits.get(0),context);
int transactCount=0;
while (reader.nextKeyValue()) {
transactCount++;
}
assertEquals( 1645, transactCount,"Block version 3 must contain exactly 1645 transactions");
reader.close();
}
@Test
public void readBitcoinBlockInputFormatGenesisBlock() throws IOException, InterruptedException {
Configuration conf = new Configuration(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="genesis.blk";
String fileNameGenesis=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameGenesis);
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, file);
BitcoinBlockFileInputFormat format = new BitcoinBlockFileInputFormat();
List<InputSplit> splits = format.getSplits(job);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
assertEquals( 1, splits.size(),"Only one split generated for genesis block");
RecordReader<BytesWritable, BitcoinBlock> reader = format.createRecordReader(splits.get(0), context);
assertNotNull( reader,"Format returned null RecordReader");
reader.initialize(splits.get(0),context);
BytesWritable genesisKey = new BytesWritable();
BitcoinBlock genesisBlock = new BitcoinBlock();
assertTrue( reader.nextKeyValue(),"Input Split for genesis block contains at least one block");
genesisBlock=reader.getCurrentValue();
assertEquals( 1, genesisBlock.getTransactions().size(),"Genesis Block must contain exactly one transaction");
assertFalse( reader.nextKeyValue(),"No further blocks in genesis Block");
reader.close();
}
protected List<StatementPatternStorage> createStorages(final String location) throws IOException, InterruptedException {
final List<StatementPatternStorage> storages = new ArrayList<StatementPatternStorage>();
StatementPatternStorage storage = new StatementPatternStorage();
final InputFormat<?, ?> inputFormat = storage.getInputFormat();
Job job = Job.getInstance(new Configuration());
storage.setLocation(location, job);
final List<InputSplit> splits = inputFormat.getSplits(job);
assertNotNull(splits);
for (final InputSplit inputSplit : splits) {
storage = new StatementPatternStorage();
job = Job.getInstance(new Configuration());
storage.setLocation(location, job);
final TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(job.getConfiguration(),
new TaskAttemptID("jtid", 0, TaskType.REDUCE, 0, 0));
final RecordReader<?, ?> recordReader = inputFormat.createRecordReader(inputSplit,
taskAttemptContext);
recordReader.initialize(inputSplit, taskAttemptContext);
storage.prepareToRead(recordReader, null);
storages.add(storage);
}
return storages;
}
@Test
public void readBitcoinTransactionInputFormatBlockVersion1ReqSeek() throws IOException, InterruptedException {
Configuration conf = new Configuration(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="reqseekversion1.blk";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, file);
BitcoinTransactionFileInputFormat format = new BitcoinTransactionFileInputFormat();
List<InputSplit> splits = format.getSplits(job);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
assertEquals( 1, splits.size(),"Only one split generated for block version 1 requiring seek");
RecordReader<BytesWritable, BitcoinTransaction> reader = format.createRecordReader(splits.get(0), context);
assertNotNull( reader,"Format returned null RecordReader");
reader.initialize(splits.get(0),context);
int transactCount=0;
while (reader.nextKeyValue()) {
transactCount++;
}
assertEquals( 2, transactCount,"Block version 1 requiring seek must contain exactly two transactions");
reader.close();
}
@Test
public void readBitcoinRawBlockInputFormatGzipCompressed() throws IOException, InterruptedException {
Configuration conf = new Configuration(defaultConf);
Job job = Job.getInstance(conf);
CompressionCodec gzip = new GzipCodec();
ReflectionUtils.setConf(gzip, conf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="version4comp.blk.gz";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
FileInputFormat.setInputPaths(job, file);
BitcoinRawBlockFileInputFormat format = new BitcoinRawBlockFileInputFormat();
List<InputSplit> splits = format.getSplits(job);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
assertEquals( 1, splits.size(),"Only one split generated for compressed block");
RecordReader<BytesWritable, BytesWritable> reader = format.createRecordReader(splits.get(0), context);
assertNotNull( reader,"Format returned null RecordReader");
reader.initialize(splits.get(0),context);
BytesWritable key = new BytesWritable();
BytesWritable block = new BytesWritable();
assertTrue( reader.nextKeyValue(),"Input Split for block version contains at least one block");
block=reader.getCurrentValue();
assertEquals( 998039, block.getLength(),"Compressed block must have a size of 998.039 bytes");
assertFalse( reader.nextKeyValue(),"No further blocks in compressed block");
reader.close();
}
@Test
public void readEthereumBlockInputFormatBlock1() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
Configuration conf = new Configuration(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="eth1.bin";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, file);
EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
List<InputSplit> splits = format.getSplits(job);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
assertEquals( 1, splits.size(),"Only one split generated for block 1");
RecordReader<BytesWritable, EthereumBlock> reader = format.createRecordReader(splits.get(0), context);
assertNotNull( reader,"Format returned null RecordReader");
reader.initialize(splits.get(0),context);
BytesWritable key = new BytesWritable();
EthereumBlock block = new EthereumBlock();
assertTrue( reader.nextKeyValue(),"Input Split for block 1 contains at least one block");
key=reader.getCurrentKey();
block=reader.getCurrentValue();
assertEquals( 0, block.getEthereumTransactions().size(),"Block 1 must have 0 transactions");
assertFalse( reader.nextKeyValue(),"No further blocks in block 1");
reader.close();
}
@Test
public void readEthereumBlockInputFormatGenesisBlock() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
Configuration conf = new Configuration(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="ethgenesis.bin";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, file);
EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
List<InputSplit> splits = format.getSplits(job);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
assertEquals( 1, splits.size(),"Only one split generated for genesis block");
RecordReader<BytesWritable, EthereumBlock> reader = format.createRecordReader(splits.get(0), context);
assertNotNull( reader,"Format returned null RecordReader");
reader.initialize(splits.get(0),context);
BytesWritable key = new BytesWritable();
EthereumBlock block = new EthereumBlock();
assertTrue( reader.nextKeyValue(),"Input Split for genesis block contains at least one block");
key=reader.getCurrentKey();
block=reader.getCurrentValue();
assertEquals( 0, block.getEthereumTransactions().size(),"Genesis Block must have 0 transactions");
assertFalse( reader.nextKeyValue(),"No further blocks in genesis Block");
reader.close();
}
@Test
public void readEthereumBlockInputFormatBlock1346406GzipCompressed() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
Configuration conf = new Configuration(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="eth1346406.bin.gz";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, file);
EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
List<InputSplit> splits = format.getSplits(job);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
assertEquals( 1, splits.size(),"Only one split generated for block 1346406");
RecordReader<BytesWritable, EthereumBlock> reader = format.createRecordReader(splits.get(0), context);
assertNotNull( reader,"Format returned null RecordReader");
reader.initialize(splits.get(0),context);
BytesWritable key = new BytesWritable();
EthereumBlock block = new EthereumBlock();
assertTrue( reader.nextKeyValue(),"Input Split for block 1346406 contains at least one block");
key=reader.getCurrentKey();
block=reader.getCurrentValue();
assertEquals( 6, block.getEthereumTransactions().size(),"Block 1346406 must have 6 transactions");
assertFalse( reader.nextKeyValue(),"No further blocks in block 1346406");
reader.close();
}
@Test
public void readBitcoinRawBlockInputFormatReqSeekBlockVersion1() throws IOException, InterruptedException {
Configuration conf = new Configuration(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="reqseekversion1.blk";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, file);
BitcoinRawBlockFileInputFormat format = new BitcoinRawBlockFileInputFormat();
List<InputSplit> splits = format.getSplits(job);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
assertEquals( 1, splits.size(),"Only one split generated for block requiring seek version 1");
RecordReader<BytesWritable, BytesWritable> reader = format.createRecordReader(splits.get(0), context);
assertNotNull( reader,"Format returned null RecordReader");
reader.initialize(splits.get(0),context);
BytesWritable key = new BytesWritable();
BytesWritable block = new BytesWritable();
assertTrue( reader.nextKeyValue(),"Input Split for block version contains at least one block");
block=reader.getCurrentValue();
assertEquals( 482, block.getLength(),"Random block requiring seek version 1 must have a size of 482 bytes");
assertFalse( reader.nextKeyValue(),"No further blocks in block requiring seek version 1");
reader.close();
}
@Test
public void readBitcoinRawBlockInputFormatBlockVersion1() throws IOException, InterruptedException {
Configuration conf = new Configuration(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="version1.blk";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, file);
BitcoinRawBlockFileInputFormat format = new BitcoinRawBlockFileInputFormat();
List<InputSplit> splits = format.getSplits(job);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
assertEquals( 1, splits.size(),"Only one split generated for block version 1");
RecordReader<BytesWritable, BytesWritable> reader = format.createRecordReader(splits.get(0), context);
assertNotNull( reader,"Format returned null RecordReader");
reader.initialize(splits.get(0),context);
BytesWritable key = new BytesWritable();
BytesWritable block = new BytesWritable();
assertTrue( reader.nextKeyValue(),"Input Split for block version contains at least one block");
block=reader.getCurrentValue();
assertEquals( 482, block.getLength(),"Random block version 1 must have size of 482 bytes");
assertFalse( reader.nextKeyValue(),"No further blocks in block version 1");
reader.close();
}
@Test
public void readExcelInputFormatExcel2003SingleSheetEncryptedNegativeLowFootprint()
throws IOException, InterruptedException {
Configuration conf = new Configuration(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName = "excel2003encrypt.xls";
String fileNameSpreadSheet = classLoader.getResource(fileName).getFile();
Path file = new Path(fileNameSpreadSheet);
// set locale to the one of the test data
conf.set("hadoopoffice.read.locale.bcp47", "de");
// low footprint
conf.set("hadoopoffice.read.lowFootprint", "true");
// for decryption simply set the password
conf.set("hadoopoffice.read.security.crypt.password", "test2");
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, file);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
ExcelFileInputFormat format = new ExcelFileInputFormat();
List<InputSplit> splits = format.getSplits(job);
assertEquals(1, splits.size(), "Only one split generated for Excel file");
RecordReader<Text, ArrayWritable> reader = format.createRecordReader(splits.get(0), context);
InterruptedException ex = assertThrows(InterruptedException.class,
() -> reader.initialize(splits.get(0), context), "Exception is thrown in case of wrong password");
}
public void testEmptyOutput() throws Exception {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
// setup
committer.setupJob(jContext);
committer.setupTask(tContext);
// Do not write any output
// do commit
committer.commitTask(tContext);
committer.commitJob(jContext);
FileUtil.fullyDelete(new File(outDir.toString()));
}
@Before
public void setup() throws Exception {
Configuration conf = new Configuration();
input = ClassLoader.getSystemClassLoader().getResource("test.cram").getFile();
reference = ClassLoader.getSystemClassLoader().getResource("auxf.fa").toURI().toString();
String referenceIndex = ClassLoader.getSystemClassLoader().getResource("auxf.fa.fai")
.toURI().toString();
conf.set("mapred.input.dir", "file://" + input);
URI hdfsRef = clusterUri.resolve("/tmp/auxf.fa");
URI hdfsRefIndex = clusterUri.resolve("/tmp/auxf.fa.fai");
Files.copy(Paths.get(URI.create(reference)), Paths.get(hdfsRef));
Files.copy(Paths.get(URI.create(referenceIndex)), Paths.get(hdfsRefIndex));
conf.set(CRAMInputFormat.REFERENCE_SOURCE_PATH_PROPERTY, hdfsRef.toString());
taskAttemptContext = new TaskAttemptContextImpl(conf, mock(TaskAttemptID.class));
jobContext = new JobContextImpl(conf, taskAttemptContext.getJobID());
}
/**
* Creates instance of InputFormat class. The InputFormat class name is specified in the Hadoop
* configuration.
*/
@SuppressWarnings("WeakerAccess")
protected void createInputFormatInstance() throws IOException {
if (inputFormatObj == null) {
try {
taskAttemptContext = new TaskAttemptContextImpl(conf.get(), new TaskAttemptID());
inputFormatObj =
(InputFormat<?, ?>)
conf.get()
.getClassByName(conf.get().get("mapreduce.job.inputformat.class"))
.getConstructor()
.newInstance();
/*
* If InputFormat explicitly implements interface {@link Configurable}, then setConf()
* method of {@link Configurable} needs to be explicitly called to set all the
* configuration parameters. For example: InputFormat classes which implement Configurable
* are {@link org.apache.hadoop.mapreduce.lib.db.DBInputFormat DBInputFormat}, {@link
* org.apache.hadoop.hbase.mapreduce.TableInputFormat TableInputFormat}, etc.
*/
if (Configurable.class.isAssignableFrom(inputFormatObj.getClass())) {
((Configurable) inputFormatObj).setConf(conf.get());
}
} catch (InstantiationException
| IllegalAccessException
| ClassNotFoundException
| NoSuchMethodException
| InvocationTargetException e) {
throw new IOException("Unable to create InputFormat object: ", e);
}
}
}
@Test
public void readBitcoinRawBlockInputFormatMultiBlock() throws IOException, InterruptedException {
Configuration conf = new Configuration(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="multiblock.blk";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, file);
BitcoinRawBlockFileInputFormat format = new BitcoinRawBlockFileInputFormat();
List<InputSplit> splits = format.getSplits(job);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
assertEquals( 1, splits.size(),"Only one split generated for multiblock");
RecordReader<BytesWritable, BytesWritable> reader = format.createRecordReader(splits.get(0), context);
assertNotNull( reader,"Format returned null RecordReader");
reader.initialize(splits.get(0),context);
BytesWritable key = new BytesWritable();
BytesWritable block = new BytesWritable();
assertTrue( reader.nextKeyValue(),"Input Split for multi block contains the genesis block");
block=reader.getCurrentValue();
assertEquals( 293, block.getLength(),"Genesis Block must have size of 293");
assertTrue( reader.nextKeyValue(),"Input Split for block version contains block version 1");
block=reader.getCurrentValue();
assertEquals( 482, block.getLength(),"Random block version 1 must have size of 482 bytes");
assertTrue( reader.nextKeyValue(),"Input Split for block version contains block version 2");
block=reader.getCurrentValue();
assertEquals( 191198, block.getLength(),"Random block version 2 must have size of 191.198 bytes");
assertFalse( reader.nextKeyValue(),"No further blocks in multi block");
reader.close();
}
private void testCommitterInternal(int version) throws Exception {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new SafeFileOutputCommitter(outDir, tContext);
// setup
committer.setupJob(jContext);
committer.setupTask(tContext);
// write output
TextOutputFormat theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
writeOutput(theRecordWriter, tContext);
// do commit
committer.commitTask(tContext);
committer.commitJob(jContext);
// validate output
validateContent(outDir);
FileUtil.fullyDelete(new File(outDir.toString()));
}
@Override
public void open(HadoopInputSplit split) throws IOException {
TaskAttemptContext context = new TaskAttemptContextImpl(configuration, new TaskAttemptID());
try {
this.recordReader = this.hCatInputFormat
.createRecordReader(split.getHadoopInputSplit(), context);
this.recordReader.initialize(split.getHadoopInputSplit(), context);
} catch (InterruptedException e) {
throw new IOException("Could not create RecordReader.", e);
} finally {
this.fetched = false;
}
}
private void testSafety(int commitVersion) throws Exception {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
conf.setInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, commitVersion);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new SafeFileOutputCommitter(outDir, tContext);
// setup
committer.setupJob(jContext);
committer.setupTask(tContext);
// write output
TextOutputFormat theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
writeOutput(theRecordWriter, tContext);
// close the job prior to committing task (leaving files in temporary dir
try {
committer.commitJob(jContext);
Assert.fail("Expected commit job to fail");
} catch (Exception e) {
committer.commitTask(tContext);
committer.commitJob(jContext);
}
validateContent(outDir);
FileUtil.fullyDelete(new File(outDir.toString()));
}
private void validateIoRegistryGraph(final HadoopGraph graph,
final Class<? extends GraphComputer> graphComputerClass,
final RecordWriter<NullWritable, VertexWritable> writer) throws Exception {
for (int i = 0; i < NUMBER_OF_VERTICES; i++) {
final StarGraph starGraph = StarGraph.open();
Vertex vertex = starGraph.addVertex(T.label, "place", T.id, i, "point", new ToyPoint(i, i * 10), "message", "I'm " + i, "triangle", new ToyTriangle(i, i * 10, i * 100));
vertex.addEdge("connection", starGraph.addVertex(T.id, i > 0 ? i - 1 : NUMBER_OF_VERTICES - 1));
writer.write(NullWritable.get(), new VertexWritable(starGraph.getStarVertex()));
}
writer.close(new TaskAttemptContextImpl(ConfUtil.makeHadoopConfiguration(graph.configuration()), new TaskAttemptID()));
// OLAP TESTING //
validatePointTriangles(graph.traversal().withComputer(graphComputerClass).V().project("point", "triangle").by("point").by("triangle").toList());
validatePointTriangles(graph.traversal().withComputer(graphComputerClass).V().out().project("point", "triangle").by("point").by("triangle").toList());
validatePointTriangles(graph.traversal().withComputer(graphComputerClass).V().out().out().project("point", "triangle").by("point").by("triangle").toList());
// OLTP TESTING //
validatePointTriangles(graph.traversal().V().project("point", "triangle").by("point").by("triangle").toList());
// HDFS TESTING //
/*validatePointTriangles(IteratorUtils.<Map<String, Object>>asList(IteratorUtils.<Vertex, Map<String, Object>>map(FileSystemStorage.open(ConfUtil.makeHadoopConfiguration(graph.configuration())).head(graph.configuration().getInputLocation(), graph.configuration().getGraphReader()),
vertex -> {
return new HashMap<String, Object>() {{
put("point", vertex.value("point"));
put("triangle", vertex.value("triangle"));
}};
})));*/
}
private JsonRecordReader getJsonRecordReader(String file) throws IOException, URISyntaxException {
InputSplit split = ColumnBasedHandlerTestUtil.getSplit(file);
TaskAttemptContext ctx = new TaskAttemptContextImpl(conf, new TaskAttemptID());
TypeRegistry.reset();
TypeRegistry.getInstance(ctx.getConfiguration());
log.debug(TypeRegistry.getContents());
JsonRecordReader reader = new JsonRecordReader();
reader.initialize(split, ctx);
return reader;
}
@Test
public void testJsonContentHandlers() throws Exception {
conf.addResource(ClassLoader.getSystemResource("config/ingest/all-config.xml"));
conf.addResource(ClassLoader.getSystemResource("config/ingest/tvmaze-ingest-config.xml"));
conf.addResource(ClassLoader.getSystemResource("config/ingest/edge-ingest-config.xml"));
conf.addResource(ClassLoader.getSystemResource("config/ingest/metadata-config.xml"));
TypeRegistry.getInstance(conf);
JsonDataTypeHelper helper = new JsonDataTypeHelper();
helper.setup(conf);
// Set up the IngestHelper
JsonIngestHelper ingestHelper = new JsonIngestHelper();
ingestHelper.setup(conf);
// Set up the ColumnBasedHandler
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
ContentJsonColumnBasedHandler<Text> jsonHandler = new ContentJsonColumnBasedHandler<>();
jsonHandler.setup(context);
// Set up the Reader
JsonRecordReader reader = getJsonRecordReader("/input/tvmaze-seinfeld.json");
Assert.assertTrue("First Record did not read properly?", reader.nextKeyValue());
RawRecordContainer event = reader.getEvent();
Assert.assertNotNull("Event 1 was null.", event);
Assert.assertTrue("Event 1 has parsing errors", event.getErrors().isEmpty());
// Set up the edge
ProtobufEdgeDataTypeHandler<Text,BulkIngestKey,Value> edgeHandler = new ProtobufEdgeDataTypeHandler<>();
edgeHandler.setup(context);
ColumnBasedHandlerTestUtil.processEvent(jsonHandler, edgeHandler, event, 231, 90, 4, 38, PRINT_GENERATED_KEYS_ONLY_ON_FAIL);
reader.close();
}
private void writeOutput(TaskAttempt attempt, Configuration conf)
throws Exception {
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
TypeConverter.fromYarn(attempt.getID()));
TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter = theOutputFormat
.getRecordWriter(tContext);
NullWritable nullWritable = NullWritable.get();
try {
theRecordWriter.write(key1, val1);
theRecordWriter.write(null, nullWritable);
theRecordWriter.write(null, val1);
theRecordWriter.write(nullWritable, val2);
theRecordWriter.write(key2, nullWritable);
theRecordWriter.write(key1, null);
theRecordWriter.write(null, null);
theRecordWriter.write(key2, val2);
} finally {
theRecordWriter.close(tContext);
}
OutputFormat outputFormat = ReflectionUtils.newInstance(
tContext.getOutputFormatClass(), conf);
OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
committer.commitTask(tContext);
}