下面列出了org.apache.hadoop.io.ArrayPrimitiveWritable#org.apache.spark.api.java.JavaPairRDD 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Tags records location for incoming records.
*
* @param inputRecordRDD {@link JavaRDD} of incoming records
* @param jsc instance of {@link JavaSparkContext} to use
* @param hoodieTable instance of {@link HoodieTable} to use
* @return {@link JavaRDD} of records with record locations set
*/
protected JavaRDD<HoodieRecord<T>> tagLocationInternal(JavaRDD<HoodieRecord<T>> inputRecordRDD, JavaSparkContext jsc,
HoodieTable<T> hoodieTable) {
if (config.getSimpleIndexUseCaching()) {
inputRecordRDD.persist(SparkConfigUtils.getSimpleIndexInputStorageLevel(config.getProps()));
}
JavaPairRDD<HoodieKey, HoodieRecord<T>> keyedInputRecordRDD = inputRecordRDD.mapToPair(record -> new Tuple2<>(record.getKey(), record));
JavaPairRDD<HoodieKey, HoodieRecordLocation> existingLocationsOnTable = fetchRecordLocationsForAffectedPartitions(keyedInputRecordRDD.keys(), jsc, hoodieTable,
config.getSimpleIndexParallelism());
JavaRDD<HoodieRecord<T>> taggedRecordRDD = keyedInputRecordRDD.leftOuterJoin(existingLocationsOnTable)
.map(entry -> {
final HoodieRecord<T> untaggedRecord = entry._2._1;
final Option<HoodieRecordLocation> location = Option.ofNullable(entry._2._2.orNull());
return HoodieIndexUtils.getTaggedRecord(untaggedRecord, location);
});
if (config.getSimpleIndexUseCaching()) {
inputRecordRDD.unpersist();
}
return taggedRecordRDD;
}
/**
* Saves the metrics to a file.
* Note: the SamFileHeader is needed in order to include libraries that didn't have any duplicates.
* @param result metrics object, potentially pre-initialized with headers,
*/
public static void saveMetricsRDD(final MetricsFile<GATKDuplicationMetrics, Double> result, final SAMFileHeader header, final JavaPairRDD<String, GATKDuplicationMetrics> metricsRDD, final String metricsOutputPath) {
final LibraryIdGenerator libraryIdGenerator = new LibraryIdGenerator(header);
final Map<String, GATKDuplicationMetrics> nonEmptyMetricsByLibrary = metricsRDD.collectAsMap(); //Unknown Library
final Map<String, GATKDuplicationMetrics> emptyMapByLibrary = libraryIdGenerator.getMetricsByLibraryMap();//with null
final List<String> sortedListOfLibraryNames = new ArrayList<>(Sets.union(emptyMapByLibrary.keySet(), nonEmptyMetricsByLibrary.keySet()));
sortedListOfLibraryNames.sort(Utils.COMPARE_STRINGS_NULLS_FIRST);
for (final String library : sortedListOfLibraryNames) {
//if a non-empty exists, take it, otherwise take from the the empties. This is done to include libraries with zero data in them.
//But not all libraries are listed in the header (esp in testing data) so we union empty and non-empty
final GATKDuplicationMetrics metricsToAdd = nonEmptyMetricsByLibrary.containsKey(library) ? nonEmptyMetricsByLibrary.get(library) : emptyMapByLibrary.get(library);
metricsToAdd.calculateDerivedFields();
result.addMetric(metricsToAdd);
}
if (nonEmptyMetricsByLibrary.size() == 1) {
result.setHistogram(nonEmptyMetricsByLibrary.values().iterator().next().calculateRoiHistogram());
}
MetricsUtils.saveMetrics(result, metricsOutputPath);
}
/**
* Given a bunch of hoodie keys, fetches all the individual records out as a data frame.
*
* @return a dataframe
*/
public Dataset<Row> readROView(JavaRDD<HoodieKey> hoodieKeys, int parallelism) {
assertSqlContext();
JavaPairRDD<HoodieKey, Option<Pair<String, String>>> lookupResultRDD =
index.fetchRecordLocation(hoodieKeys, jsc, hoodieTable);
JavaPairRDD<HoodieKey, Option<String>> keyToFileRDD =
lookupResultRDD.mapToPair(r -> new Tuple2<>(r._1, convertToDataFilePath(r._2)));
List<String> paths = keyToFileRDD.filter(keyFileTuple -> keyFileTuple._2().isPresent())
.map(keyFileTuple -> keyFileTuple._2().get()).collect();
// record locations might be same for multiple keys, so need a unique list
Set<String> uniquePaths = new HashSet<>(paths);
Dataset<Row> originalDF = sqlContextOpt.get().read().parquet(uniquePaths.toArray(new String[uniquePaths.size()]));
StructType schema = originalDF.schema();
JavaPairRDD<HoodieKey, Row> keyRowRDD = originalDF.javaRDD().mapToPair(row -> {
HoodieKey key = new HoodieKey(row.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD),
row.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
return new Tuple2<>(key, row);
});
// Now, we need to further filter out, for only rows that match the supplied hoodie keys
JavaRDD<Row> rowRDD = keyRowRDD.join(keyToFileRDD, parallelism).map(tuple -> tuple._2()._1());
return sqlContextOpt.get().createDataFrame(rowRDD, schema);
}
/**
* An implementation of {@link
* org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly} for the Spark runner.
*/
public static <K, V> JavaRDD<KV<K, Iterable<WindowedValue<V>>>> groupByKeyOnly(
JavaRDD<WindowedValue<KV<K, V>>> rdd,
Coder<K> keyCoder,
WindowedValueCoder<V> wvCoder,
@Nullable Partitioner partitioner) {
// we use coders to convert objects in the PCollection to byte arrays, so they
// can be transferred over the network for the shuffle.
JavaPairRDD<ByteArray, byte[]> pairRDD =
rdd.map(new ReifyTimestampsAndWindowsFunction<>())
.mapToPair(TranslationUtils.toPairFunction())
.mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder));
// If no partitioner is passed, the default group by key operation is called
JavaPairRDD<ByteArray, Iterable<byte[]>> groupedRDD =
(partitioner != null) ? pairRDD.groupByKey(partitioner) : pairRDD.groupByKey();
return groupedRDD
.mapToPair(CoderHelpers.fromByteFunctionIterable(keyCoder, wvCoder))
.map(new TranslationUtils.FromPairFunction<>());
}
@Override
protected void runTool(final JavaSparkContext ctx) {
String referenceFileName = addReferenceFilesForSpark(ctx, referenceArguments.getReferencePath());
List<String> localKnownSitesFilePaths = addVCFsForSpark(ctx, knownVariants);
//Should this get the getUnfilteredReads? getReads will merge default and command line filters.
//but the code below uses other filters for other parts of the pipeline that do not honor
//the commandline.
final JavaRDD<GATKRead> initialReads = getReads();
// The initial reads have already had the WellformedReadFilter applied to them, which
// is all the filtering that ApplyBQSR wants. BQSR itself wants additional filtering
// performed, so we do that here.
//NOTE: this filter doesn't honor enabled/disabled commandline filters
final ReadFilter bqsrReadFilter = ReadFilter.fromList(BaseRecalibrator.getBQSRSpecificReadFilterList(), getHeaderForReads());
final JavaRDD<GATKRead> filteredReadsForBQSR = initialReads.filter(read -> bqsrReadFilter.test(read));
JavaPairRDD<GATKRead, Iterable<GATKVariant>> readsWithVariants = JoinReadsWithVariants.join(filteredReadsForBQSR, localKnownSitesFilePaths);
//note: we use the reference dictionary from the reads themselves.
final RecalibrationReport bqsrReport = BaseRecalibratorSparkFn.apply(readsWithVariants, getHeaderForReads(), referenceFileName, bqsrArgs);
final Broadcast<RecalibrationReport> reportBroadcast = ctx.broadcast(bqsrReport);
final JavaRDD<GATKRead> finalReads = ApplyBQSRSparkFn.apply(initialReads, reportBroadcast, getHeaderForReads(), applyBqsrArgs.toApplyBQSRArgumentCollection(bqsrArgs));
writeReads(ctx, output, finalReads);
}
/**
* 广播变量测试
* @param args
*/
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder()
.master("local[4]").appName("AttackFind").getOrCreate();
//初始化sparkContext
JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
//在这里假定一份广播变量
//因为我们之前说过,广播变量只可读
final List<String> broadcastList = Arrays.asList("190099HJLL","98392QUEYY","561788LLKK");
//设置广播变量,把broadcast广播出去
final Broadcast<List<String>> broadcast = javaSparkContext.broadcast(broadcastList);
//定义数据
JavaPairRDD<String,String> pairRDD = javaSparkContext.parallelizePairs(Arrays.asList(new Tuple2<>("000", "000")));
JavaPairRDD<String,String> resultPairRDD = pairRDD.filter((Function<Tuple2<String, String>, Boolean>) v1 -> broadcast.value().contains(v1._2));
resultPairRDD.foreach((VoidFunction<Tuple2<String, String>>) System.out::println);
}
public static JavaRDD<String> binaryBlockToCsv(JavaPairRDD<MatrixIndexes,MatrixBlock> in, DataCharacteristics mcIn, FileFormatPropertiesCSV props, boolean strict)
{
JavaPairRDD<MatrixIndexes,MatrixBlock> input = in;
//fast path without, general case with shuffle
if( mcIn.getCols()>mcIn.getBlocksize() ) {
//create row partitioned matrix
input = input
.flatMapToPair(new SliceBinaryBlockToRowsFunction(mcIn.getBlocksize()))
.groupByKey()
.mapToPair(new ConcatenateBlocksFunction(mcIn.getCols(), mcIn.getBlocksize()));
}
//sort if required (on blocks/rows)
if( strict ) {
input = input.sortByKey(true);
}
//convert binary block to csv (from blocks/rows)
JavaRDD<String> out = input
.flatMap(new BinaryBlockToCSVFunction(props));
return out;
}
/**
* Reads the specified fraction [0,1] of randomly selected PDB entries from a Hadoop Sequence file.
*
* @param path Path to Hadoop sequence file
* @param fraction Fraction of entries to be read [0,1]
* @param seed Seed for random number generator
* @param sc Spark context
* @return structure data as keyword/value pairs
*/
public static JavaPairRDD<String, StructureDataInterface> readSequenceFile(String path, double fraction, long seed, JavaSparkContext sc) {
return sc
.sequenceFile(path, Text.class, BytesWritable.class)
.sample(false, fraction, seed)
.mapToPair(new PairFunction<Tuple2<Text, BytesWritable>,String, StructureDataInterface>() {
private static final long serialVersionUID = 3512575873287789314L;
public Tuple2<String, StructureDataInterface> call(Tuple2<Text, BytesWritable> t) throws Exception {
byte[] values = t._2.copyBytes();
// if data are gzipped, unzip them first
try {
values = ReaderUtils.deflateGzip(t._2.copyBytes()); // unzip binary MessagePack data
} catch (ZipException e) {}
// deserialize message pack
MmtfStructure mmtf = new MessagePackSerialization().deserialize(new ByteArrayInputStream(values)); // deserialize message pack
// decode message pack
return new Tuple2<String, StructureDataInterface>(t._1.toString(), new GenericDecoder(mmtf)); // decode message pack
}
});
}
/**
* Calculates all vs. all structural alignments of protein chains using the
* specified alignment algorithm. The input structures must contain single
* protein chains.
*
* @param targets structures containing single protein chains
* @param alignmentAlgorithm name of the algorithm
* @return dataset with alignment metrics
*/
public static Dataset<Row> getAllVsAllAlignments(JavaPairRDD<String, StructureDataInterface> targets,
String alignmentAlgorithm) {
SparkSession session = SparkSession.builder().getOrCreate();
JavaSparkContext sc = new JavaSparkContext(session.sparkContext());
// create a list of chainName/ C Alpha coordinates
List<Tuple2<String, Point3d[]>> chains = targets.mapValues(
s -> new ColumnarStructureX(s,true).getcAlphaCoordinates()).collect();
// create an RDD of all pair indices (0,1), (0,2), ..., (1,2), (1,3), ...
JavaRDD<Tuple2<Integer, Integer>> pairs = getPairs(sc, chains.size());
// calculate structural alignments for all pairs.
// broadcast (copy) chains to all worker nodes for efficient processing.
// for each pair there can be zero or more solutions, therefore we flatmap the pairs.
JavaRDD<Row> rows = pairs.flatMap(new StructuralAlignmentMapper(sc.broadcast(chains), alignmentAlgorithm));
// convert rows to a dataset
return session.createDataFrame(rows, getSchema());
}
@Override
public void processInstruction(ExecutionContext ec) {
SparkExecutionContext sec = (SparkExecutionContext)ec;
//get input
JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec.getBinaryMatrixBlockRDDHandleForVariable( input1.getName() );
//execute unary builtin operation
JavaPairRDD<MatrixIndexes,MatrixBlock> out =
in.mapValues(new RDDBinUaggChainFunction(_bOp, _uaggOp));
//set output RDD
updateUnaryOutputDataCharacteristics(sec);
sec.setRDDHandleForVariable(output.getName(), out);
sec.addLineageRDD(output.getName(), input1.getName());
}
@Override
public void processInstruction(ExecutionContext ec) {
// general case append (map-extend, aggregate)
SparkExecutionContext sec = (SparkExecutionContext)ec;
checkBinaryAppendInputCharacteristics(sec, _cbind, false, true);
DataCharacteristics mc1 = sec.getDataCharacteristics(input1.getName());
JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryMatrixBlockRDDHandleForVariable( input1.getName() );
JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = sec.getBinaryMatrixBlockRDDHandleForVariable( input2.getName() );
JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
// Simple changing of matrix indexes of RHS
long shiftBy = _cbind ? mc1.getNumColBlocks() : mc1.getNumRowBlocks();
out = in2.mapToPair(new ShiftColumnIndex(shiftBy, _cbind));
out = in1.union( out );
//put output RDD handle into symbol table
updateBinaryAppendOutputDataCharacteristics(sec, _cbind);
sec.setRDDHandleForVariable(output.getName(), out);
sec.addLineageRDD(output.getName(), input1.getName());
sec.addLineageRDD(output.getName(), input2.getName());
}
/**
* Execute a join on the specified data
*
* @param join Join to execute
* @param left Left data for join
* @param right Right data for join
* @return Joined data
*/
public static JavaRDD<List<Writable>> executeJoin(Join join, JavaRDD<List<Writable>> left,
JavaRDD<List<Writable>> right) {
String[] leftColumnNames = join.getJoinColumnsLeft();
int[] leftColumnIndexes = new int[leftColumnNames.length];
for (int i = 0; i < leftColumnNames.length; i++) {
leftColumnIndexes[i] = join.getLeftSchema().getIndexOfColumn(leftColumnNames[i]);
}
JavaPairRDD<List<Writable>, List<Writable>> leftJV = left.mapToPair(new ExtractKeysFunction(leftColumnIndexes));
String[] rightColumnNames = join.getJoinColumnsRight();
int[] rightColumnIndexes = new int[rightColumnNames.length];
for (int i = 0; i < rightColumnNames.length; i++) {
rightColumnIndexes[i] = join.getRightSchema().getIndexOfColumn(rightColumnNames[i]);
}
JavaPairRDD<List<Writable>, List<Writable>> rightJV =
right.mapToPair(new ExtractKeysFunction(rightColumnIndexes));
JavaPairRDD<List<Writable>, Tuple2<Iterable<List<Writable>>, Iterable<List<Writable>>>> cogroupedJV =
leftJV.cogroup(rightJV);
return cogroupedJV.flatMap(new ExecuteJoinFromCoGroupFlatMapFunction(join));
}
/**
* Spark-level group by key operation that keeps original Beam {@link KV} pairs unchanged.
*
* @returns {@link JavaPairRDD} where the first value in the pair is the serialized key, and the
* second is an iterable of the {@link KV} pairs with that key.
*/
static <K, V> JavaPairRDD<ByteArray, Iterable<WindowedValue<KV<K, V>>>> groupByKeyPair(
JavaRDD<WindowedValue<KV<K, V>>> rdd, Coder<K> keyCoder, WindowedValueCoder<V> wvCoder) {
// we use coders to convert objects in the PCollection to byte arrays, so they
// can be transferred over the network for the shuffle.
JavaPairRDD<ByteArray, byte[]> pairRDD =
rdd.map(new ReifyTimestampsAndWindowsFunction<>())
.mapToPair(TranslationUtils.toPairFunction())
.mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder));
JavaPairRDD<ByteArray, Iterable<Tuple2<ByteArray, byte[]>>> groupedRDD =
pairRDD.groupBy((value) -> value._1);
return groupedRDD
.mapValues(
it -> Iterables.transform(it, new CoderHelpers.FromByteFunction<>(keyCoder, wvCoder)))
.mapValues(it -> Iterables.transform(it, new TranslationUtils.FromPairFunction<>()))
.mapValues(
it -> Iterables.transform(it, new TranslationUtils.ToKVByWindowInValueFunction<>()));
}
@Override
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
try {
// find the source commit to pull
Option<String> commitToPull = findCommitToPull(lastCheckpointStr);
if (!commitToPull.isPresent()) {
return new InputBatch<>(Option.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : "");
}
// read the files out.
List<FileStatus> commitDeltaFiles = Arrays.asList(fs.listStatus(new Path(incrPullRootPath, commitToPull.get())));
String pathStr = commitDeltaFiles.stream().map(f -> f.getPath().toString()).collect(Collectors.joining(","));
JavaPairRDD<AvroKey, NullWritable> avroRDD = sparkContext.newAPIHadoopFile(pathStr, AvroKeyInputFormat.class,
AvroKey.class, NullWritable.class, sparkContext.hadoopConfiguration());
return new InputBatch<>(Option.of(avroRDD.keys().map(r -> ((GenericRecord) r.datum()))),
String.valueOf(commitToPull.get()));
} catch (IOException ioe) {
throw new HoodieIOException("Unable to read from source from checkpoint: " + lastCheckpointStr, ioe);
}
}
public static Dataset<Row> binaryBlockToDataFrame(SparkSession sparkSession,
JavaPairRDD<Long,FrameBlock> in, DataCharacteristics mc, ValueType[] schema)
{
if( !mc.colsKnown() )
throw new RuntimeException("Number of columns needed to convert binary block to data frame.");
//convert binary block to rows rdd
JavaRDD<Row> rowRDD = in.flatMap(
new BinaryBlockToDataFrameFunction());
//create data frame schema
if( schema == null )
schema = UtilFunctions.nCopies((int)mc.getCols(), ValueType.STRING);
StructType dfSchema = convertFrameSchemaToDFSchema(schema, true);
//rdd to data frame conversion
return sparkSession.createDataFrame(rowRDD, dfSchema);
}
public static void interleaveSplitFastq(FileStatus fst, FileStatus fst2, String splitDir, int splitlen, JavaSparkContext sc) throws IOException {
String[] ns = fst.getPath().getName().split("\\.");
//TODO: Handle also compressed files
List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen);
List<FileSplit> nlif2 = NLineInputFormat.getSplitsForFile(fst2, sc.hadoopConfiguration(), splitlen);
JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);
JavaRDD<FileSplit> splitRDD2 = sc.parallelize(nlif2);
JavaPairRDD<FileSplit, FileSplit> zips = splitRDD.zip(splitRDD2);
zips.foreach( splits -> {
Path path = splits._1.getPath();
FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), splits._1);
FastqRecordReader fqreader2 = new FastqRecordReader(new Configuration(), splits._2);
writeInterleavedSplits(fqreader, fqreader2, new Configuration(), splitDir, path.getParent().getName()+"_"+splits._1.getStart()+".fq");
});
}
public static JavaPairRDD<Integer, Geometry> generateHullsRDD(
final JavaPairRDD<Integer, Iterable<Vector>> groupedPoints) {
// Create the convex hull for each kmeans centroid
final JavaPairRDD<Integer, Geometry> hullRDD = groupedPoints.mapValues(point -> {
final Iterable<Coordinate> coordIt =
Iterables.transform(point, new com.google.common.base.Function<Vector, Coordinate>() {
@Override
public Coordinate apply(final Vector input) {
if (input != null) {
return new Coordinate(input.apply(0), input.apply(1));
}
return new Coordinate();
}
});
final Coordinate[] coordArray = Iterables.toArray(coordIt, Coordinate.class);
return new ConvexHull(coordArray, GeometryUtils.GEOMETRY_FACTORY).getConvexHull();
});
return hullRDD;
}
public static JavaPairRDD<MatrixIndexes, MatrixBlock> sortDataByVals( JavaPairRDD<MatrixIndexes, MatrixBlock> val,
JavaPairRDD<MatrixIndexes, MatrixBlock> data, boolean asc, long rlen, long clen, long clen2, int blen )
{
//create value-index rdd from inputs
JavaPairRDD<ValuesIndexPair, double[]> dvals = val
.flatMapToPair(new ExtractDoubleValuesWithIndexFunction2(blen));
//sort (creates sorted range per partition)
int numPartitions = SparkUtils.getNumPreferredPartitions(
new MatrixCharacteristics(rlen, clen2+1, blen, blen));
JavaRDD<ValuesIndexPair> sdvals = dvals
.sortByKey(new IndexComparator2(asc), true, numPartitions)
.keys(); //workaround for index comparator
//create target indexes by original index
JavaPairRDD<MatrixIndexes, MatrixBlock> ixmap = sdvals
.zipWithIndex()
.mapToPair(new ExtractIndexFunction2())
.sortByKey()
.mapPartitionsToPair(new ConvertToBinaryBlockFunction4(rlen, blen));
ixmap = RDDAggregateUtils.mergeByKey(ixmap, false);
//actual data sort
return sortDataByIx(data, ixmap, rlen, clen, blen);
}
/**
* It tests if the extractor can group by key players data set
*/
@Test
protected void testGroupByKey() {
DeepSparkContext context = getDeepSparkContext();
try {
JavaPairRDD<Long, PlayerEntity> playersRDD = preparePlayerRDD(context);
assertEquals(playersRDD.count(), 5);
assertEquals(playersRDD.groupByKey().count(), 4);
} finally {
context.stop();
}
}
public static void wordCountJava8( String filename )
{
// Define a configuration to use to interact with Spark
SparkConf conf = new SparkConf().setMaster("local").setAppName("Work Count App");
// Create a Java version of the Spark Context from the configuration
JavaSparkContext sc = new JavaSparkContext(conf);
// Load the input data, which is a text file read from the command line
JavaRDD<String> input = sc.textFile( filename );
// Java 8 with lambdas: split the input string into words
// TODO here a change has happened
JavaRDD<String> words = input.flatMap( s -> Arrays.asList( s.split( " " ) ).iterator() );
// Java 8 with lambdas: transform the collection of words into pairs (word and 1) and then count them
JavaPairRDD<Object, Object> counts = words.mapToPair( t -> new Tuple2( t, 1 ) ).reduceByKey( (x, y) -> (int)x + (int)y );
// Save the word count back out to a text file, causing evaluation.
counts.saveAsTextFile( "output" );
}
public static JavaPairRDD<MatrixIndexes, MatrixBlock> sortByVal( JavaPairRDD<MatrixIndexes, MatrixBlock> in,
JavaPairRDD<MatrixIndexes, MatrixBlock> in2, long rlen, int blen )
{
//create value-index rdd from inputs
JavaRDD<DoublePair> dvals = in.join(in2).values()
.flatMap(new ExtractDoubleValuesFunction2());
//sort (creates sorted range per partition)
long hdfsBlocksize = InfrastructureAnalyzer.getHDFSBlockSize();
int numPartitions = (int)Math.ceil(((double)rlen*8)/hdfsBlocksize);
JavaRDD<DoublePair> sdvals = dvals
.sortBy(new CreateDoubleKeyFunction2(), true, numPartitions);
//create binary block output
JavaPairRDD<MatrixIndexes, MatrixBlock> ret = sdvals
.zipWithIndex()
.mapPartitionsToPair(new ConvertToBinaryBlockFunction2(rlen, blen));
ret = RDDAggregateUtils.mergeByKey(ret, false);
return ret;
}
@Override
public void processInstruction(ExecutionContext ec) {
SparkExecutionContext sec = (SparkExecutionContext)ec;
//get input
JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec.getBinaryMatrixBlockRDDHandleForVariable( input1.getName() );
//execute tsmm instruction (always produce exactly one output block)
//(this formulation with values() requires --conf spark.driver.maxResultSize=0)
JavaRDD<MatrixBlock> tmp = in.map(new RDDTSMMFunction(_type));
MatrixBlock out = RDDAggregateUtils.sumStable(tmp);
//put output block into symbol table (no lineage because single block)
//this also includes implicit maintenance of matrix characteristics
sec.setMatrixOutput(output.getName(), out);
}
private static MatrixBlock multiBlockIndexing(JavaPairRDD<MatrixIndexes,MatrixBlock> in1,
DataCharacteristics mcIn, DataCharacteristics mcOut, IndexRange ixrange) {
//create list of all required matrix indexes
List<MatrixIndexes> filter = new ArrayList<>();
long rlix = UtilFunctions.computeBlockIndex(ixrange.rowStart, mcIn.getBlocksize());
long ruix = UtilFunctions.computeBlockIndex(ixrange.rowEnd, mcIn.getBlocksize());
long clix = UtilFunctions.computeBlockIndex(ixrange.colStart, mcIn.getBlocksize());
long cuix = UtilFunctions.computeBlockIndex(ixrange.colEnd, mcIn.getBlocksize());
for( long r=rlix; r<=ruix; r++ )
for( long c=clix; c<=cuix; c++ )
filter.add( new MatrixIndexes(r,c) );
//wrap PartitionPruningRDD around input to exploit pruning for out-of-core datasets
JavaPairRDD<MatrixIndexes,MatrixBlock> out = createPartitionPruningRDD(in1, filter);
out = out.filter(new IsBlockInRange(ixrange.rowStart, ixrange.rowEnd, ixrange.colStart, ixrange.colEnd, mcOut)) //filter unnecessary blocks
.mapToPair(new SliceBlock2(ixrange, mcOut)); //slice relevant blocks
//collect output without shuffle to avoid side-effects with custom PartitionPruningRDD
MatrixBlock mbout = SparkExecutionContext.toMatrixBlock(out, (int)mcOut.getRows(),
(int)mcOut.getCols(), mcOut.getBlocksize(), -1);
return mbout;
}
@SuppressWarnings("unchecked")
public static long writeRDDtoHDFS( RDDObject rdd, String path, OutputInfo oinfo )
{
JavaPairRDD<MatrixIndexes,MatrixBlock> lrdd = (JavaPairRDD<MatrixIndexes, MatrixBlock>) rdd.getRDD();
//piggyback nnz maintenance on write
LongAccumulator aNnz = getSparkContextStatic().sc().longAccumulator("nnz");
lrdd = lrdd.mapValues(new ComputeBinaryBlockNnzFunction(aNnz));
//save file is an action which also triggers nnz maintenance
lrdd.saveAsHadoopFile(path,
oinfo.outputKeyClass,
oinfo.outputValueClass,
oinfo.outputFormatClass);
//return nnz aggregate of all blocks
return aNnz.value();
}
/**
* Reads an MMTF-Hadoop Sequence file. The Hadoop Sequence file may contain
* either gzip compressed or uncompressed values.
* See <a href="https://mmtf.rcsb.org/download.html"> for file download information</a>
*
* @param path Path to Hadoop sequence file
* @param sc Spark context
* @return structure data as keyword/value pairs
*/
public static JavaPairRDD<String, StructureDataInterface> readSequenceFile(String path, JavaSparkContext sc) {
return sc
.sequenceFile(path, Text.class, BytesWritable.class)
.mapToPair(new PairFunction<Tuple2<Text, BytesWritable>,String, StructureDataInterface>() {
private static final long serialVersionUID = 3512575873287789314L;
public Tuple2<String, StructureDataInterface> call(Tuple2<Text, BytesWritable> t) throws Exception {
byte[] values = t._2.copyBytes();
// if data are gzipped, unzip them first
try {
values = ReaderUtils.deflateGzip(t._2.copyBytes());
} catch (ZipException e) {}
// deserialize message pack
MmtfStructure mmtf = new MessagePackSerialization().deserialize(new ByteArrayInputStream(values));
// decode message pack
return new Tuple2<String, StructureDataInterface>(t._1.toString(), new GenericDecoder(mmtf));
}
});
}
@SuppressWarnings("unchecked")
public void cacheMatrixObject( String var ) {
//get input rdd and default storage level
MatrixObject mo = getMatrixObject(var);
//double check size to avoid unnecessary spark context creation
if( !OptimizerUtils.exceedsCachingThreshold(mo.getNumColumns(),
OptimizerUtils.estimateSizeExactSparsity(mo.getDataCharacteristics())) )
return;
JavaPairRDD<MatrixIndexes,MatrixBlock> in = (JavaPairRDD<MatrixIndexes, MatrixBlock>)
getRDDHandleForMatrixObject(mo, InputInfo.BinaryBlockInputInfo);
//persist rdd (force rdd caching, if not already cached)
if( !isRDDCached(in.id()) )
in.count(); //trigger caching to prevent contention
}
private static void compute(JavaSparkContext javaSparkContext, Configuration conf) {
JavaPairRDD<LongWritable, JsonObject> tableData = javaSparkContext.newAPIHadoopRDD(
conf,
GsonBigQueryInputFormat.class,
LongWritable.class,
JsonObject.class);
JavaPairRDD<String, Long> wordCounts = tableData
.map(entry -> toTuple(entry._2))
.keyBy(tuple -> tuple._1)
.mapValues(tuple -> tuple._2)
.reduceByKey((count1, count2) -> count1 + count2);
wordCounts
.map(tuple -> new Text(toJson(tuple).toString()))
.keyBy(jsonText -> jsonText)
.mapValues(jsonText -> NullWritable.get()) // Values do not matter.
.saveAsNewAPIHadoopDataset(conf);
}
private static JavaRDD<String> getUniqueKmers(JavaPairRDD<Text, SequencedFragment> fastqRDD, int k) {
JavaRDD<String> rdd = fastqRDD.mapPartitions(records -> {
HashSet<String> umer_set = new HashSet<String>();
while (records.hasNext()) {
Tuple2<Text, SequencedFragment> fastq = records.next();
String seq = fastq._2.getSequence().toString();
//HashSet<String> umer_in_seq = new HashSet<String>();
for (int i = 0; i < seq.length() - k - 1; i++) {
String kmer = seq.substring(i, i + k);
umer_set.add(kmer);
}
}
return umer_set.iterator();
});
JavaRDD<String> umersRDD = rdd.distinct();
//umersRDD.sortBy(s -> s, true, 4);
return umersRDD;
}
private void runMLContextOutputBlocksizeTest(String format)
{
try
{
double[][] A = getRandomMatrix(rows, cols, -10, 10, sparsity, 76543);
MatrixBlock mbA = DataConverter.convertToMatrixBlock(A);
int blksz = ConfigurationManager.getBlocksize();
MatrixCharacteristics mc = new MatrixCharacteristics(rows, cols, blksz, mbA.getNonZeros());
//create input dataset
JavaPairRDD<MatrixIndexes,MatrixBlock> in = SparkExecutionContext.toMatrixJavaPairRDD(sc, mbA, blksz);
Matrix m = new Matrix(in, new MatrixMetadata(mc));
ml.setExplain(true);
ml.setExplainLevel(ExplainLevel.HOPS);
//execute script
String s ="if( sum(X) > 0 )"
+ " X = X/2;"
+ "R = X;"
+ "write(R, \"/tmp\", format=\""+format+"\");";
Script script = dml(s).in("X", m).out("R");
MLResults results = ml.execute(script);
//compare output matrix characteristics
MatrixCharacteristics mcOut = results.getMatrix("R")
.getMatrixMetadata().asMatrixCharacteristics();
Assert.assertEquals(blksz, mcOut.getBlocksize());
}
catch(Exception ex) {
ex.printStackTrace();
throw new RuntimeException(ex);
}
}
@SuppressWarnings("unchecked")
protected JavaPairRDD<MatrixIndexes,MatrixBlock> processMatrixCSVReblockInstruction(SparkExecutionContext sec, DataCharacteristics mcOut) {
//get input rdd (needs to be longwritable/text for consistency with meta data, in case of
//serialization issues create longwritableser/textser as serializable wrappers
JavaPairRDD<LongWritable, Text> in = (JavaPairRDD<LongWritable, Text>)
sec.getRDDHandleForMatrixObject(sec.getMatrixObject(input1), InputInfo.CSVInputInfo);
//reblock csv to binary block
return RDDConverterUtils.csvToBinaryBlock(sec.getSparkContext(),
in, mcOut, _hasHeader, _delim, _fill, _fillValue);
}