下面列出了怎么用org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat的API类实例代码及写法,或者点击链接到github查看源代码。
public static DataSet parseInputPath(String inputPath, FileSystem fs, ExecutionEnvironment env, Class keyClass,
Class valueClass) throws IOException {
List<String> inputFolders = Lists.newArrayList();
Path inputHDFSPath = new Path(inputPath);
FileStatus[] fileStatuses = fs.listStatus(inputHDFSPath);
boolean hasDir = false;
for (FileStatus stat : fileStatuses) {
if (stat.isDirectory() && !stat.getPath().getName().startsWith("_")) {
hasDir = true;
inputFolders.add(stat.getPath().toString());
}
}
if (!hasDir) {
return env.createInput(HadoopInputs.readSequenceFile(keyClass, valueClass, inputHDFSPath.toString()));
}
Job job = Job.getInstance();
FileInputFormat.setInputPaths(job, StringUtil.join(inputFolders, ","));
return env.createInput(HadoopInputs.createHadoopInput(new SequenceFileInputFormat(), keyClass, valueClass, job));
}
public static DataSet<String[]> readHiveRecords(boolean isSequenceFile, ExecutionEnvironment env, String inputPath, String hiveTable, Job job) throws IOException {
DataSet<String[]> recordDataSet;
if (isSequenceFile) {
recordDataSet = env
.createInput(HadoopInputs.readHadoopFile(new SequenceFileInputFormat(), BytesWritable.class, Text.class, inputPath, job),
TypeInformation.of(new TypeHint<Tuple2<BytesWritable, Text>>() {}))
.map(new MapFunction<Tuple2<BytesWritable, Text>, String[]>() {
@Override
public String[] map(Tuple2<BytesWritable, Text> tuple2) throws Exception {
String s = Bytes.toString(tuple2.f1.getBytes(), 0, tuple2.f1.getLength());
return s.split(BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER);
}
});
} else {
throw new UnsupportedOperationException("Currently, Flink does not support read hive table directly.");
}
return recordDataSet;
}
public static void printFinalRanks (Configuration conf, FileSystem fs, String inputPath, String outputPath) throws Exception {
Path outFile = new Path (outputPath);
if (fs.exists(outFile)) {
fs.delete(outFile, true);
}
Job job = Job.getInstance(conf);
job.setMapperClass(RankPrinter.class);
job.setReducerClass(Reducer.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setJarByClass(RankPrinter.class);
FileInputFormat.addInputPath(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, outFile);
job.waitForCompletion(true);
}
private static void joinAs(String jointype,
Class<? extends SimpleCheckerMapBase<?>> map,
Class<? extends SimpleCheckerReduceBase> reduce) throws Exception {
final int srcs = 4;
Configuration conf = new Configuration();
Path base = cluster.getFileSystem().makeQualified(new Path("/"+jointype));
Path[] src = writeSimpleSrc(base, conf, srcs);
conf.set(CompositeInputFormat.JOIN_EXPR, CompositeInputFormat.compose(jointype,
SequenceFileInputFormat.class, src));
conf.setInt("testdatamerge.sources", srcs);
Job job = Job.getInstance(conf);
job.setInputFormatClass(CompositeInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(base, "out"));
job.setMapperClass(map);
job.setReducerClass(reduce);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true);
assertTrue("Job failed", job.isSuccessful());
if ("outer".equals(jointype)) {
checkOuterConsistency(job, src);
}
base.getFileSystem(conf).delete(base, true);
}
private static void joinAs(String jointype,
Class<? extends SimpleCheckerMapBase<?>> map,
Class<? extends SimpleCheckerReduceBase> reduce) throws Exception {
final int srcs = 4;
Configuration conf = new Configuration();
Path base = cluster.getFileSystem().makeQualified(new Path("/"+jointype));
Path[] src = writeSimpleSrc(base, conf, srcs);
conf.set(CompositeInputFormat.JOIN_EXPR, CompositeInputFormat.compose(jointype,
SequenceFileInputFormat.class, src));
conf.setInt("testdatamerge.sources", srcs);
Job job = Job.getInstance(conf);
job.setInputFormatClass(CompositeInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(base, "out"));
job.setMapperClass(map);
job.setReducerClass(reduce);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true);
assertTrue("Job failed", job.isSuccessful());
if ("outer".equals(jointype)) {
checkOuterConsistency(job, src);
}
base.getFileSystem(conf).delete(base, true);
}
public static DataSet parseInputPath(String inputPath, FileSystem fs, ExecutionEnvironment env, Class keyClass,
Class valueClass) throws IOException {
List<String> inputFolders = Lists.newArrayList();
Path inputHDFSPath = new Path(inputPath);
FileStatus[] fileStatuses = fs.listStatus(inputHDFSPath);
boolean hasDir = false;
for (FileStatus stat : fileStatuses) {
if (stat.isDirectory() && !stat.getPath().getName().startsWith("_")) {
hasDir = true;
inputFolders.add(stat.getPath().toString());
}
}
if (!hasDir) {
return env.createInput(HadoopInputs.readSequenceFile(keyClass, valueClass, inputHDFSPath.toString()));
}
Job job = Job.getInstance();
FileInputFormat.setInputPaths(job, StringUtil.join(inputFolders, ","));
return env.createInput(HadoopInputs.createHadoopInput(new SequenceFileInputFormat(), keyClass, valueClass, job));
}
public static DataSet<String[]> readHiveRecords(boolean isSequenceFile, ExecutionEnvironment env, String inputPath, String hiveTable, Job job) throws IOException {
DataSet<String[]> recordDataSet;
if (isSequenceFile) {
recordDataSet = env
.createInput(HadoopInputs.readHadoopFile(new SequenceFileInputFormat(), BytesWritable.class, Text.class, inputPath, job),
TypeInformation.of(new TypeHint<Tuple2<BytesWritable, Text>>() {}))
.map(new MapFunction<Tuple2<BytesWritable, Text>, String[]>() {
@Override
public String[] map(Tuple2<BytesWritable, Text> tuple2) throws Exception {
String s = Bytes.toString(tuple2.f1.getBytes(), 0, tuple2.f1.getLength());
return s.split(BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER);
}
});
} else {
throw new UnsupportedOperationException("Currently, Flink does not support read hive table directly.");
}
return recordDataSet;
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf1 = new Configuration();
Job job1 = new Job(conf1, "wiki job one");
job1.setOutputFormatClass(SequenceFileOutputFormat.class);
job1.setInputFormatClass(SequenceFileInputFormat.class);
job1.setNumReduceTasks(1);
job1.setJarByClass(Step32.class);
job1.setMapperClass(WikiMapper32.class);
job1.setMapOutputKeyClass(IntWritable.class);
job1.setMapOutputValueClass(VectorOrPrefWritable.class);
job1.setReducerClass(WiKiReducer32.class);
job1.setOutputKeyClass(IntWritable.class);
job1.setOutputValueClass(VectorOrPrefWritable.class);
// the WiKiDriver's out put is this one's input
SequenceFileInputFormat.addInputPath(job1, new Path(INPUT_PATH));
SequenceFileOutputFormat.setOutputPath(job1, new Path(OUTPUT_PATH));
if(!job1.waitForCompletion(true)){
System.exit(1); // run error then exit
}
}
public static void calculateNextRanks (Configuration conf, FileSystem fs, String inputPath, String outputPath) throws Exception {
Path outFile = new Path (outputPath);
if (fs.exists(outFile)) {
fs.delete(outFile, true);
}
Job job = Job.getInstance(conf);
job.setJarByClass(PageRankMapper.class);
job.setMapperClass(PageRankMapper.class);
job.setReducerClass(PageRankReducer.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Message.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Message.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, outFile);
job.waitForCompletion(true);
}
public static void initSumMRJob(Job job, String inputPath, String outtable, String auths) throws AccumuloSecurityException, IOException {
Configuration conf = job.getConfiguration();
String username = conf.get(USERNAME);
String password = conf.get(PASSWORD);
String instance = conf.get(INSTANCE);
String zookeepers = conf.get(ZOOKEEPERS);
if (zookeepers != null) {
AccumuloOutputFormat.setConnectorInfo(job, username, new PasswordToken(password));
AccumuloOutputFormat.setZooKeeperInstance(job, instance, zookeepers);
} else {
throw new IllegalArgumentException("Must specify zookeepers");
}
SequenceFileInputFormat.addInputPath(job, new Path(inputPath));
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setMapOutputKeyClass(TripleEntry.class);
job.setMapOutputValueClass(CardList.class);
AccumuloOutputFormat.setDefaultTableName(job, outtable);
job.setOutputFormatClass(AccumuloOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Mutation.class);
}
@Test
public void testMRImportWithTransform() throws Exception {
Path sample = new Path(temp.newFile("sample.sequence").toString())
.makeQualified(getDFS().getUri(), new Path("/"));
writeSequenceFile(getDFS(), sample); // HDFS sequence file
// Reusing records is okay when running in MR
command.inFormatClass = SequenceFileInputFormat.class.getName();
command.targets = Lists.newArrayList(sample.toString(), datasetUri);
command.noCompaction = true; // no need to run reducers
command.transform = TransformMeasurement.class.getName();
int rc = command.run();
Assert.assertEquals("Should return success", 0, rc);
verify(console).info("Added {} records to \"{}\"", 3L, datasetUri);
verifyNoMoreInteractions(console);
Set<Measurement> datasetContent = materialize(
Datasets.load(datasetUri, Measurement.class));
Set<Measurement> expected = Sets.newHashSet(Iterables.transform(
measurements, new TransformMeasurement()));
Assert.assertEquals(expected, datasetContent);
}
public static void convertPointsSequenceFileToText (Configuration conf, FileSystem fs, String seqFilePath, String outputPath) throws Exception {
Path seqFile = new Path (seqFilePath);
Path output = new Path (outputPath);
if (fs.exists(output)) {
fs.delete(output, true);
}
Job job = Job.getInstance(conf);
job.setMapperClass(PointSequenceToTextConverter.class);
job.setReducerClass(Reducer.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
FileInputFormat.addInputPath(job, seqFile);
FileOutputFormat.setOutputPath(job, output);
job.waitForCompletion(true);
}
@Test
public void testMRImport() throws Exception {
Path sample = new Path(temp.newFile("sample.sequence").toString())
.makeQualified(getDFS().getUri(), new Path("/"));
writeSequenceFile(getDFS(), sample); // HDFS sequence file
// Reusing records is okay when running in MR
command.inFormatClass = SequenceFileInputFormat.class.getName();
command.targets = Lists.newArrayList(sample.toString(), datasetUri);
command.noCompaction = true; // no need to run reducers
int rc = command.run();
Assert.assertEquals("Should return success", 0, rc);
verify(console).info("Added {} records to \"{}\"", 3L, datasetUri);
verifyNoMoreInteractions(console);
Set<Measurement> datasetContent = materialize(
Datasets.load(datasetUri, Measurement.class));
Assert.assertEquals(Sets.newHashSet(measurements), datasetContent);
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf1 = new Configuration();
Job job1 = new Job(conf1, "wiki job five");
job1.setNumReduceTasks(1);
job1.setJarByClass(Step5.class);
job1.setInputFormatClass(SequenceFileInputFormat.class);
job1.setMapperClass(WikiMapper5.class);
job1.setMapOutputKeyClass(VarLongWritable.class);
job1.setMapOutputValueClass(VectorWritable.class);
job1.setCombinerClass(WiKiCombiner5.class);
job1.setReducerClass(WiKiReducer5.class);
job1.setOutputKeyClass(VarLongWritable.class);
job1.setOutputValueClass(RecommendedItemsWritable.class);
// job1.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileInputFormat.addInputPath(job1, new Path(INPUT_PATH));
FileOutputFormat.setOutputPath(job1, new Path(OUTPUT_PATH));
if(!job1.waitForCompletion(true)){
System.exit(1); // run error then exit
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf1 = new Configuration();
Job job1 = new Job(conf1, "wiki job three1");
job1.setOutputFormatClass(SequenceFileOutputFormat.class);
job1.setInputFormatClass(SequenceFileInputFormat.class);
job1.setNumReduceTasks(1);
job1.setJarByClass(Step31.class);
job1.setMapperClass(WikiMapper31.class);
job1.setMapOutputKeyClass(IntWritable.class);
job1.setMapOutputValueClass(VectorOrPrefWritable.class);
// set a reducer only to use SequenceFileOutputFormat
job1.setReducerClass(WiKiReducer31.class);
job1.setOutputKeyClass(IntWritable.class);
job1.setOutputValueClass(VectorOrPrefWritable.class);
// this MR's input is the MR2's output
SequenceFileInputFormat.addInputPath(job1, new Path(INPUT_PATH));
SequenceFileOutputFormat.setOutputPath(job1, new Path(OUTPUT_PATH));
if(!job1.waitForCompletion(true)){
System.exit(1); // run error then exit
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf1 = new Configuration();
Job job1 = new Job(conf1, "wiki job two");
job1.setNumReduceTasks(1);
job1.setJarByClass(Step2.class);
job1.setInputFormatClass(SequenceFileInputFormat.class);
job1.setMapperClass(WikiMapper2.class);
job1.setMapOutputKeyClass(IntWritable.class);
job1.setMapOutputValueClass(IntWritable.class);
job1.setReducerClass(WiKiReducer2.class);
job1.setOutputKeyClass(IntWritable.class);
job1.setOutputValueClass(VectorWritable.class);
job1.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileInputFormat.addInputPath(job1, new Path(INPUT_PATH));
SequenceFileOutputFormat.setOutputPath(job1, new Path(OUTPUT_PATH));
if(!job1.waitForCompletion(true)){
System.exit(1); // run error then exit
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf1 = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf1, args).getRemainingArgs();
Job job1 = new Job(conf1, "wiki job four");
job1.setNumReduceTasks(1);
job1.setJarByClass(Step4.class);
job1.setInputFormatClass(SequenceFileInputFormat.class);
job1.setMapperClass(WikiMapper4.class);
job1.setMapOutputKeyClass(IntWritable.class);
job1.setMapOutputValueClass(VectorOrPrefWritable.class);
job1.setReducerClass(WiKiReducer4.class);
job1.setOutputKeyClass(IntWritable.class);
job1.setOutputValueClass(VectorAndPrefsWritable.class);
job1.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileInputFormat.addInputPath(job1, new Path(INPUT1_PATH));
SequenceFileInputFormat.addInputPath(job1, new Path(INPUT2_PATH));
SequenceFileOutputFormat.setOutputPath(job1, new Path(OUTPUT_PATH));
if(!job1.waitForCompletion(true)){
System.exit(1); // run error then exit
}
}
private void setupMapper(Path input) throws IOException {
FileInputFormat.setInputPaths(job, input);
job.setMapperClass(CalculateStatsFromBaseCuboidMapper.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
}
@Override
public void configureJob(Job job) {
job.setInputFormatClass(SequenceFileInputFormat.class);
String jobId = job.getConfiguration().get(BatchConstants.ARG_CUBING_JOB_ID);
IJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(cubeSegment);
String inputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc,
JobBuilderSupport.getJobWorkingDir(conf, jobId));
try {
FileInputFormat.addInputPath(job, new Path(inputPath));
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
@SuppressWarnings("unchecked")
public RecordReader<K, V> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
InputFormat<K, V> indirIF = (InputFormat)ReflectionUtils.newInstance(
conf.getClass(INDIRECT_INPUT_FORMAT,
SequenceFileInputFormat.class), conf);
IndirectSplit is = ((IndirectSplit)split);
return indirIF.createRecordReader(new FileSplit(is.getPath(), 0,
is.getLength(), (String[])null), context);
}
@SuppressWarnings("unchecked")
public RecordReader<K, V> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
InputFormat<K, V> indirIF = (InputFormat)ReflectionUtils.newInstance(
conf.getClass(INDIRECT_INPUT_FORMAT,
SequenceFileInputFormat.class), conf);
IndirectSplit is = ((IndirectSplit)split);
return indirIF.createRecordReader(new FileSplit(is.getPath(), 0,
is.getLength(), (String[])null), context);
}
/**
* Restore a {@code JavaPairRDD<Long,List<Writable>>} previously saved with {@link #saveMapFile(String, JavaRDD)}}<br>
* Note that if the keys are not required, simply use {@code restoreMapFile(...).values()}
*
* @param path Path of the MapFile
* @param sc Spark context
* @return The restored RDD, with their unique indices as the key
*/
public static JavaPairRDD<Long, List<Writable>> restoreMapFile(String path, JavaSparkContext sc) {
Configuration c = new Configuration();
c.set(FileInputFormat.INPUT_DIR, FilenameUtils.normalize(path, true));
JavaPairRDD<LongWritable, RecordWritable> pairRDD =
sc.newAPIHadoopRDD(c, SequenceFileInputFormat.class, LongWritable.class, RecordWritable.class);
return pairRDD.mapToPair(new RecordLoadPairFunction());
}
/**
* Restore a {@code JavaPairRDD<Long,List<List<Writable>>>} previously saved with {@link #saveMapFile(String, JavaRDD)}}<br>
* Note that if the keys are not required, simply use {@code restoreMapFileSequences(...).values()}
*
* @param path Path of the MapFile
* @param sc Spark context
* @return The restored RDD, with their unique indices as the key
*/
public static JavaPairRDD<Long, List<List<Writable>>> restoreMapFileSequences(String path, JavaSparkContext sc) {
Configuration c = new Configuration();
c.set(FileInputFormat.INPUT_DIR, FilenameUtils.normalize(path, true));
JavaPairRDD<LongWritable, SequenceRecordWritable> pairRDD = sc.newAPIHadoopRDD(c, SequenceFileInputFormat.class,
LongWritable.class, SequenceRecordWritable.class);
return pairRDD.mapToPair(new SequenceRecordLoadPairFunction());
}
/**
* Creates, but does not submit the job. This is the core MapReduce mrJob. Empty input path
* results in a null to be returned instead of creating the job.
*/
public Job createJob(Optional<String> jobName
,Path basePath
, Path jobOutputPath
, long beginNS
, long endNS
, int numReducers
, T fields
, Configuration conf
, FileSystem fs
, PcapFilterConfigurator<T> filterImpl
) throws IOException
{
Iterable<String> filteredPaths = FileFilterUtil.getPathsInTimeRange(beginNS, endNS, listFiles(fs, basePath));
String inputPaths = Joiner.on(',').join(filteredPaths);
if (StringUtils.isEmpty(inputPaths)) {
return null;
}
conf.set(START_TS_CONF, Long.toUnsignedString(beginNS));
conf.set(END_TS_CONF, Long.toUnsignedString(endNS));
conf.set(WIDTH_CONF, "" + findWidth(beginNS, endNS, numReducers));
filterImpl.addToConfig(fields, conf);
Job job = Job.getInstance(conf);
jobName.ifPresent(job::setJobName);
job.setJarByClass(PcapJob.class);
job.setMapperClass(PcapJob.PcapMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setNumReduceTasks(numReducers);
job.setReducerClass(PcapReducer.class);
job.setPartitionerClass(PcapPartitioner.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(BytesWritable.class);
SequenceFileInputFormat.addInputPaths(job, inputPaths);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setOutputPath(job, jobOutputPath);
return job;
}
/**
* Restore a {@code JavaPairRDD<Long,List<List<Writable>>>} previously saved with {@link #saveMapFile(String, JavaRDD)}}<br>
* Note that if the keys are not required, simply use {@code restoreMapFileSequences(...).values()}
*
* @param path Path of the MapFile
* @param sc Spark context
* @return The restored RDD, with their unique indices as the key
*/
public static JavaPairRDD<Long, List<List<Writable>>> restoreMapFileSequences(String path, JavaSparkContext sc) {
Configuration c = new Configuration();
c.set(FileInputFormat.INPUT_DIR, FilenameUtils.normalize(path, true));
JavaPairRDD<LongWritable, SequenceRecordWritable> pairRDD = sc.newAPIHadoopRDD(c, SequenceFileInputFormat.class,
LongWritable.class, SequenceRecordWritable.class);
return pairRDD.mapToPair(new SequenceRecordLoadPairFunction());
}
@Override
public <K, V> Iterator<KeyValue<K, V>> head(final String location, final String memoryKey, final Class readerClass, final int totalLines) {
if (!readerClass.equals(SequenceFileInputFormat.class))
throw new IllegalArgumentException("Only " + SequenceFileInputFormat.class.getCanonicalName() + " memories are supported");
final Configuration configuration = new Configuration();
try {
return IteratorUtils.limit((Iterator) new ObjectWritableIterator(configuration, new Path(Constants.getMemoryLocation(location, memoryKey))), totalLines);
} catch (final IOException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
public <K, V> JavaPairRDD<K, V> readMemoryRDD(final Configuration configuration, final String memoryKey, final JavaSparkContext sparkContext) {
return sparkContext.newAPIHadoopRDD(ConfUtil.makeHadoopConfiguration(configuration),
SequenceFileInputFormat.class,
ObjectWritable.class,
ObjectWritable.class)
.mapToPair(tuple -> new Tuple2<>((K) ((Tuple2<ObjectWritable, ObjectWritable>) tuple)._1().get(), (V) ((Tuple2<ObjectWritable, ObjectWritable>) tuple)._2().get()));
}
@Test
@LoadGraphWith(LoadGraphWith.GraphData.MODERN)
public void shouldSupportHeadMethods() throws Exception {
// Make sure Spark is shut down before deleting its files and directories,
// which are locked under Windows and fail the tests. See FileSystemStorageCheck
graph.configuration().setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false);
final Storage storage = FileSystemStorage.open(ConfUtil.makeHadoopConfiguration(graph.configuration()));
final String inputLocation = Constants.getSearchGraphLocation(graph.configuration().getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION), storage).get();
final String outputLocation = graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
// TestHelper creates the directory and we need it not to exist
deleteDirectory(outputLocation);
super.checkHeadMethods(storage, inputLocation, outputLocation, InputOutputHelper.getInputFormat((Class) Class.forName(graph.configuration().getString(Constants.GREMLIN_HADOOP_GRAPH_WRITER))), SequenceFileInputFormat.class);
}
@Test
@LoadGraphWith(LoadGraphWith.GraphData.MODERN)
public void shouldSupportCopyMethods() throws Exception {
// Make sure Spark is shut down before deleting its files and directories,
// which are locked under Windows and fail the tests. See FileSystemStorageCheck
graph.configuration().setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false);
final Storage storage = FileSystemStorage.open(ConfUtil.makeHadoopConfiguration(graph.configuration()));
final String outputLocation = graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
final String newOutputLocation = TestHelper.makeTestDataDirectory(FileSystemStorageCheck.class, "new-location-for-copy");
// TestHelper creates the directory and we need it not to exist
deleteDirectory(newOutputLocation);
super.checkCopyMethods(storage, outputLocation, newOutputLocation, InputOutputHelper.getInputFormat((Class) Class.forName(graph.configuration().getString(Constants.GREMLIN_HADOOP_GRAPH_WRITER))), SequenceFileInputFormat.class);
}
public static void initJoinMRJob(Job job, String prospectsPath, String spoPath, Class<? extends Mapper<CompositeType,TripleCard,?,?>> mapperClass,
String outPath, String auths) throws AccumuloSecurityException {
MultipleInputs.addInputPath(job, new Path(prospectsPath), SequenceFileInputFormat.class, mapperClass);
MultipleInputs.addInputPath(job, new Path(spoPath), SequenceFileInputFormat.class, mapperClass);
job.setMapOutputKeyClass(CompositeType.class);
job.setMapOutputValueClass(TripleCard.class);
SequenceFileOutputFormat.setOutputPath(job, new Path(outPath));
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(TripleEntry.class);
job.setOutputValueClass(CardList.class);
}