下面列出了org.apache.hadoop.io.ArrayPrimitiveWritable#org.apache.spark.api.java.JavaSparkContext 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testPathExists() throws Exception {
MiniClusterUtils.runOnIsolatedMiniCluster( cluster -> {
//use the HDFS on the mini cluster
final Path workingDirectory = MiniClusterUtils.getWorkingDir(cluster);
final Path tempPath = new Path(workingDirectory, "testFileExists.txt");
final JavaSparkContext ctx = SparkContextFactory.getTestSparkContext();
Assert.assertFalse(SparkUtils.hadoopPathExists(ctx, tempPath.toUri()));
final FileSystem fs = tempPath.getFileSystem(ctx.hadoopConfiguration());
final FSDataOutputStream fsOutStream = fs.create(tempPath);
fsOutStream.close();
fs.deleteOnExit(tempPath);
Assert.assertTrue(SparkUtils.hadoopPathExists(ctx, tempPath.toUri()));
});
}
public static void main(String[] args) {
if (args.length != 2) {
System.err.println("Usage: " + PdbRedoToMmtf.class.getSimpleName() + " <pdb-redo-path> <mmtf-path");
System.exit(1);
}
long start = System.nanoTime();
// instantiate Spark
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName(PdbRedoToMmtf.class.getSimpleName());
JavaSparkContext sc = new JavaSparkContext(conf);
// import PDB-REDO from a local copy
JavaPairRDD<String, StructureDataInterface> pdbredo = MmtfImporter.importPdbRedo(args[0], sc);
// save PDB-REDO as an MMTF-Hadoop Sequence file
MmtfWriter.writeSequenceFile(args[1], sc, pdbredo);
long end = System.nanoTime();
System.out.println("time: " + (end-start)/1E9 + " sec.");
// close Spark
sc.close();
}
@Test(groups = "spark")
public void testDoSetPairFlags() {
final JavaSparkContext ctx = SparkContextFactory.getTestSparkContext();
final SAMSequenceDictionary seq = new SAMSequenceDictionary();
seq.addSequence(new SAMSequenceRecord("test_seq", 1000));
final SAMFileHeader header = new SAMFileHeader(seq);
final List<GATKRead> readList = makeReadSet(header);
final JavaRDD<GATKRead> reads = ctx.parallelize(readList);
;
final List<GATKRead> result = PSFilter.setPairFlags(reads, 100).collect();
Assert.assertEquals(result.size(), 6);
for (final GATKRead read : result) {
if (read.getName().equals("paired_1") || read.getName().equals("paired_2")) {
Assert.assertTrue(read.isPaired());
} else {
Assert.assertFalse(read.isPaired());
}
}
}
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("JavaKernelDensityEstimationExample");
JavaSparkContext jsc = new JavaSparkContext(conf);
// $example on$
// an RDD of sample data
JavaRDD<Double> data = jsc.parallelize(
Arrays.asList(1.0, 1.0, 1.0, 2.0, 3.0, 4.0, 5.0, 5.0, 6.0, 7.0, 8.0, 9.0, 9.0));
// Construct the density estimator with the sample data
// and a standard deviation for the Gaussian kernels
KernelDensity kd = new KernelDensity().setSample(data).setBandwidth(3.0);
// Find density estimates for the given values
double[] densities = kd.estimate(new double[]{-1.0, 2.0, 5.0});
System.out.println(Arrays.toString(densities));
// $example off$
jsc.stop();
}
public static void main(String[] args) throws Exception {
System.out.println(System.getProperty("hadoop.home.dir"));
String inputPath = args[0];
String outputPath = args[1];
FileUtils.deleteQuietly(new File(outputPath));
JavaSparkContext sc = new JavaSparkContext("local", "sparkwordcount");
JavaRDD<String> rdd = sc.textFile(inputPath);
JavaPairRDD<String, Integer> counts = rdd
.flatMap(x -> Arrays.asList(x.split(" ")).iterator())
.mapToPair(x -> new Tuple2<String, Integer>((String) x, 1))
.reduceByKey((x, y) -> x + y);
counts.saveAsTextFile(outputPath);
sc.close();
}
private void assertSingleShardedWritingWorks(String vcf, String outputPath, boolean writeTabixIndex) throws IOException {
JavaSparkContext ctx = SparkContextFactory.getTestSparkContext();
VariantsSparkSource variantsSparkSource = new VariantsSparkSource(ctx);
JavaRDD<VariantContext> variants = variantsSparkSource.getParallelVariantContexts(vcf, null);
if (variants.getNumPartitions() == 1) {
variants = variants.repartition(3); // repartition to more than 1 partition
}
VCFHeader header = getHeader(vcf);
VariantsSparkSink.writeVariants(ctx, outputPath, variants, header, writeTabixIndex);
checkFileExtensionConsistentWithContents(outputPath, writeTabixIndex);
JavaRDD<VariantContext> variants2 = variantsSparkSource.getParallelVariantContexts(outputPath, null);
final List<VariantContext> writtenVariants = variants2.collect();
VariantContextTestUtils.assertEqualVariants(readVariants(vcf), writtenVariants);
}
public void run(String peerServiceTag) {
long microsLower = day.toInstant().toEpochMilli() * 1000;
long microsUpper = day.plus(Period.ofDays(1)).toInstant().toEpochMilli() * 1000 - 1;
log.info("Running Dependencies job for {}: {} ≤ Span.timestamp {}", day, microsLower, microsUpper);
JavaSparkContext sc = new JavaSparkContext(conf);
try {
JavaPairRDD<String, Iterable<Span>> traces = javaFunctions(sc)
.cassandraTable(keyspace, "traces", mapRowTo(CassandraSpan.class))
.where("start_time < ? AND start_time > ?", microsUpper, microsLower)
.mapToPair(span -> new Tuple2<>(span.getTraceId(), span))
.mapValues(span -> (Span) span)
.groupByKey();
List<Dependency> dependencyLinks = DependenciesSparkHelper.derive(traces,peerServiceTag);
store(sc, dependencyLinks);
log.info("Done, {} dependency objects created", dependencyLinks.size());
} finally {
sc.stop();
}
}
@Test
public void testIdentifySamplesWithSuspiciousContigsDelsWithSpark() {
final JavaSparkContext ctx = SparkContextFactory.getTestSparkContext();
final Set<String> gtBlacklistSamples = new HashSet<>();
gtBlacklistSamples.add("sample_1");
gtBlacklistSamples.add("sample_2");
gtBlacklistSamples.add("sample_3");
ReadCountCollection allCoverageProfiles = null;
try {
allCoverageProfiles = ReadCountCollectionUtils.parse(TEST_FILE_DEL);
} catch (final IOException ioe) {
Assert.fail("Could not load test file: " + TEST_FILE_DEL, ioe);
}
final JavaRDD<ReadCountCollection> allSampleTangentNormalizedReadCounts = CoveragePoNQCUtils.createParallelIndividualReadCountCollections(allCoverageProfiles, ctx);
// By the time we are here, input is assumed to have been tangent normalized.
final List<String> blacklistSamples = CoveragePoNQCUtils.identifySamplesWithSuspiciousContigs(allSampleTangentNormalizedReadCounts, ctx, CoveragePoNQCUtils.getContigToMedianCRMap(allCoverageProfiles));
final Set<String> resultSamples = new HashSet<>(blacklistSamples);
Assert.assertEquals(resultSamples.size(), gtBlacklistSamples.size());
Assert.assertEquals(Sets.difference(resultSamples, gtBlacklistSamples).size(), 0);
}
public static void wordCountJava8( String filename )
{
// Define a configuration to use to interact with Spark
SparkConf conf = new SparkConf().setMaster("local").setAppName("Work Count App");
// Create a Java version of the Spark Context from the configuration
JavaSparkContext sc = new JavaSparkContext(conf);
// Load the input data, which is a text file read from the command line
JavaRDD<String> input = sc.textFile( filename );
// Java 8 with lambdas: split the input string into words
// TODO here a change has happened
JavaRDD<String> words = input.flatMap( s -> Arrays.asList( s.split( " " ) ).iterator() );
// Java 8 with lambdas: transform the collection of words into pairs (word and 1) and then count them
JavaPairRDD<Object, Object> counts = words.mapToPair( t -> new Tuple2( t, 1 ) ).reduceByKey( (x, y) -> (int)x + (int)y );
// Save the word count back out to a text file, causing evaluation.
counts.saveAsTextFile( "output" );
}
@Override
protected void runTool(final JavaSparkContext ctx) {
// TODO: 5/9/18 getback sample name in output files
final SAMFileHeader headerForReads = getHeaderForReads();
final Set<VCFHeaderLine> defaultToolVCFHeaderLines = getDefaultToolVCFHeaderLines();
final SvDiscoveryInputMetaData svDiscoveryInputMetaData =
new SvDiscoveryInputMetaData(ctx, discoverStageArgs, nonCanonicalChromosomeNamesFile,
derivedSimpleVCFPrefix,
null, null, null, null,
headerForReads, getReference(), defaultToolVCFHeaderLines, localLogger);
final JavaRDD<VariantContext> complexVariants = new VariantsSparkSource(ctx)
.getParallelVariantContexts(complexVCF, getIntervals());
final JavaRDD<GATKRead> assemblyRawAlignments = getReads();
final SegmentedCpxVariantSimpleVariantExtractor.ExtractedSimpleVariants extract =
SegmentedCpxVariantSimpleVariantExtractor.extract(complexVariants, svDiscoveryInputMetaData, assemblyRawAlignments);
final String derivedOneSegmentSimpleVCF = derivedSimpleVCFPrefix + "_1_seg.vcf";
final String derivedMultiSegmentSimpleVCF = derivedSimpleVCFPrefix + "_multi_seg.vcf";
final VCFHeader vcfHeader = VariantsSparkSource.getHeader(complexVCF);
SVVCFWriter.writeVCF(extract.getReInterpretZeroOrOneSegmentCalls(), derivedOneSegmentSimpleVCF, vcfHeader.getSequenceDictionary(), defaultToolVCFHeaderLines, logger);
SVVCFWriter.writeVCF(extract.getReInterpretMultiSegmentsCalls(), derivedMultiSegmentSimpleVCF, vcfHeader.getSequenceDictionary(), defaultToolVCFHeaderLines, logger);
}
/**
* List of the files in the given directory (path), as a {@code JavaRDD<String>}
*
* @param sc Spark context
* @param path Path to list files in
* @param recursive Whether to walk the directory tree recursively (i.e., include subdirectories)
* @param allowedExtensions If null: all files will be accepted. If non-null: only files with the specified extension will be allowed.
* Exclude the extension separator - i.e., use "txt" not ".txt" here.
* @param config Hadoop configuration to use. Must not be null.
* @return Paths in the directory
* @throws IOException If error occurs getting directory contents
*/
public static JavaRDD<String> listPaths(@NonNull JavaSparkContext sc, String path, boolean recursive,
Set<String> allowedExtensions, @NonNull Configuration config) throws IOException {
List<String> paths = new ArrayList<>();
FileSystem hdfs = FileSystem.get(URI.create(path), config);
RemoteIterator<LocatedFileStatus> fileIter = hdfs.listFiles(new org.apache.hadoop.fs.Path(path), recursive);
while (fileIter.hasNext()) {
String filePath = fileIter.next().getPath().toString();
if(allowedExtensions == null){
paths.add(filePath);
} else {
String ext = FilenameUtils.getExtension(path);
if(allowedExtensions.contains(ext)){
paths.add(filePath);
}
}
}
return sc.parallelize(paths);
}
public static void main(String[] args) {
String master = args[0];
String inFile = args[1];
String outFile = args[2];
String patterns[] = new String[args.length-3];
System.arraycopy(args,3,patterns,0,args.length-3);
System.err.println("Starting spark with master="+master+" in="+inFile);
System.err.println("Using patterns: "+ Arrays.toString(patterns));
SparkConf conf = new SparkConf().setAppName("Grep job").setMaster(master).set("spark.hadoop.validateOutputSpecs", "false");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> file = sc.textFile(inFile);
for(int p = 0; p < patterns.length; p++) {
final String pattern = patterns[p];
JavaRDD<String> res = file.filter(new Function<String, Boolean>() {
private static final long serialVersionUID = 1L;
Pattern p = Pattern.compile(pattern);
@Override
public Boolean call(String value) throws Exception {
if (value == null || value.length() == 0) {
return false;
}
final Matcher m = p.matcher(value);
if (m.find()) {
return true;
}
return false;
}
});
res.saveAsTextFile(outFile+"_"+pattern);
}
}
protected void broadcastMemory(final JavaSparkContext sparkContext) {
this.broadcast.destroy(true); // do we need to block?
final Map<String, Object> toBroadcast = new HashMap<>();
this.sparkMemory.forEach((key, object) -> {
if (!object.value().isEmpty() && this.memoryComputeKeys.get(key).isBroadcast())
toBroadcast.put(key, object.value());
});
this.broadcast = sparkContext.broadcast(toBroadcast);
}
public static void main(String[] args) throws ParseException {
final Validator validator = new Validator(args);
ValidatorParameters params = validator.getParameters();
validator.setDoPrintInProcessRecord(false);
logger.info("Input file is " + params.getDetailsFileName());
SparkConf conf = new SparkConf().setAppName("MarcCompletenessCount");
JavaSparkContext context = new JavaSparkContext(conf);
System.err.println(validator.getParameters().formatParameters());
JavaRDD<String> inputFile = context.textFile(validator.getParameters().getArgs()[0]);
JavaRDD<String> baseCountsRDD = inputFile
.flatMap(content -> {
MarcReader reader = ReadMarc.getMarcStringReader(content);
Record marc4jRecord = reader.next();
MarcRecord marcRecord = MarcFactory.createFromMarc4j(
marc4jRecord, params.getDefaultRecordType(), params.getMarcVersion(), params.fixAlephseq());
validator.processRecord(marcRecord, 1);
return ValidationErrorFormatter
.formatForSummary(marcRecord.getValidationErrors(), params.getFormat())
.iterator();
}
);
baseCountsRDD.saveAsTextFile(validator.getParameters().getDetailsFileName());
}
/**
* Closes all instances in all the VMs involved in the spark context provided.
* @param ctx the spark context.
*/
public static void closeAllDistributedInstances( final JavaSparkContext ctx ) {
Utils.nonNull(ctx, "the context provided cannot be null");
int nJobs = ctx.defaultParallelism();
final List<Integer> jobList = new ArrayList<>(nJobs);
for ( int idx = 0; idx != nJobs; ++idx ) jobList.add(idx);
ctx.parallelize(jobList, nJobs).foreach(idx -> closeInstances());
}
/**
* Kmerize reads having template names in a given set,
* filter out low complexity kmers and kmers that appear too often in the genome to be helpful in localizing reads,
* kill intervals that have too few surviving kmers.
* The return is a Tuple2 in which
* _1 describes the intervals that have been killed for having too few kmers (as a map from intervalId onto an explanatory string),
* and _2 describes the good kmers that we want to use in local assemblies (as a multimap from kmer onto intervalId).
*/
private static Tuple2<List<AlignedAssemblyOrExcuse>, HopscotchUniqueMultiMap<SVKmer, Integer, KmerAndInterval>> getKmerAndIntervalsSet(
final FindBreakpointEvidenceSparkArgumentCollection params,
final ReadMetadata readMetadata,
final JavaSparkContext ctx,
final HopscotchUniqueMultiMap<String, Integer, QNameAndInterval> qNamesMultiMap,
final int nIntervals,
final JavaRDD<GATKRead> unfilteredReads,
final SVReadFilter filter,
final Logger logger)
{
final Set<SVKmer> kmerKillSet =
SVFileUtils.readKmersFile(params.kmersToIgnoreFile, params.kSize);
if ( params.adapterSequence != null ) {
SVKmerizer.stream(params.adapterSequence, params.kSize, 0, new SVKmerLong())
.forEach(kmer -> kmerKillSet.add(kmer.canonical(params.kSize)));
}
log("Ignoring " + kmerKillSet.size() + " genomically common kmers.", logger);
final Tuple2<List<AlignedAssemblyOrExcuse>, List<KmerAndInterval>> kmerIntervalsAndDispositions =
getKmerIntervals(params, readMetadata, ctx, qNamesMultiMap, nIntervals, kmerKillSet,
unfilteredReads, filter, logger);
final HopscotchUniqueMultiMap<SVKmer, Integer, KmerAndInterval> kmerMultiMap =
new HopscotchUniqueMultiMap<>(kmerIntervalsAndDispositions._2());
log("Discovered " + kmerMultiMap.size() + " kmers.", logger);
return new Tuple2<>(kmerIntervalsAndDispositions._1(), kmerMultiMap);
}
public JavaSparkContext getJavaSparkContext() {
if (sparkInterpreter == null) {
return null;
} else {
return new JavaSparkContext(sparkInterpreter.getSparkContext());
}
}
public static JavaPairRDD<MatrixIndexes,MatrixBlock> toMatrixJavaPairRDD(JavaSparkContext sc, MatrixBlock src,
int blen, int numParts, boolean inclEmpty) {
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
List<Tuple2<MatrixIndexes,MatrixBlock>> list = null;
if( src.getNumRows() <= blen && src.getNumColumns() <= blen ) {
list = Arrays.asList(new Tuple2<>(new MatrixIndexes(1,1), src));
}
else {
MatrixCharacteristics mc = new MatrixCharacteristics(
src.getNumRows(), src.getNumColumns(), blen, src.getNonZeros());
list = LongStream.range(0, mc.getNumBlocks()).parallel()
.mapToObj(i -> createIndexedMatrixBlock(src, mc, i))
.filter(kv -> inclEmpty || !kv._2.isEmptyBlock(false))
.collect(Collectors.toList());
}
JavaPairRDD<MatrixIndexes,MatrixBlock> result = (numParts > 1) ?
sc.parallelizePairs(list, numParts) : sc.parallelizePairs(list);
if (DMLScript.STATISTICS) {
Statistics.accSparkParallelizeTime(System.nanoTime() - t0);
Statistics.incSparkParallelizeCount(1);
}
return result;
}
public BulkInsertPreppedDeltaCommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> preppedInputRecordRdd,
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
super(jsc, config, table, instantTime, WriteOperationType.BULK_INSERT);
this.preppedInputRecordRdd = preppedInputRecordRdd;
this.bulkInsertPartitioner = bulkInsertPartitioner;
}
@SuppressWarnings("serial")
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkGetExample ").setMaster("local[2]");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaRDD<String> textFile = jsc.textFile("hdfs://localhost/user/cloudera/data.txt");
JavaPairRDD<String, Integer> pairs = textFile.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s.substring(0, s.indexOf("|")), 1); }
});
JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
});
System.out.println ("We have generaged " + counts.count() + " users");
jsc.close();
}
/**
* Restore a {@code JavaPairRDD<Long,List<Writable>>} previously saved with {@link #saveMapFile(String, JavaRDD)}}<br>
* Note that if the keys are not required, simply use {@code restoreMapFile(...).values()}
*
* @param path Path of the MapFile
* @param sc Spark context
* @return The restored RDD, with their unique indices as the key
*/
public static JavaPairRDD<Long, List<Writable>> restoreMapFile(String path, JavaSparkContext sc) {
Configuration c = new Configuration();
c.set(FileInputFormat.INPUT_DIR, FilenameUtils.normalize(path, true));
JavaPairRDD<LongWritable, RecordWritable> pairRDD =
sc.newAPIHadoopRDD(c, SequenceFileInputFormat.class, LongWritable.class, RecordWritable.class);
return pairRDD.mapToPair(new RecordLoadPairFunction());
}
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("JavaTC")
.getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
Integer slices = (args.length > 0) ? Integer.parseInt(args[0]): 2;
JavaPairRDD<Integer, Integer> tc = jsc.parallelizePairs(generateGraph(), slices).cache();
// Linear transitive closure: each round grows paths by one edge,
// by joining the graph's edges with the already-discovered paths.
// e.g. join the path (y, z) from the TC with the edge (x, y) from
// the graph to obtain the path (x, z).
// Because join() joins on keys, the edges are stored in reversed order.
JavaPairRDD<Integer, Integer> edges = tc.mapToPair(
new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> e) {
return new Tuple2<>(e._2(), e._1());
}
});
long oldCount;
long nextCount = tc.count();
do {
oldCount = nextCount;
// Perform the join, obtaining an RDD of (y, (z, x)) pairs,
// then project the result to obtain the new (x, z) paths.
tc = tc.union(tc.join(edges).mapToPair(ProjectFn.INSTANCE)).distinct().cache();
nextCount = tc.count();
} while (nextCount != oldCount);
System.out.println("TC has " + tc.count() + " edges.");
spark.stop();
}
/**
* Test checks how the cache candidates map is populated by the runner when evaluating the
* pipeline.
*/
@Test
public void cacheCandidatesUpdaterTest() {
SparkPipelineOptions options = createOptions();
Pipeline pipeline = Pipeline.create(options);
PCollection<String> pCollection = pipeline.apply(Create.of("foo", "bar"));
// First use of pCollection.
pCollection.apply(Count.globally());
// Second use of pCollection.
PCollectionView<List<String>> view = pCollection.apply(View.asList());
// Internally View.asList() creates a PCollection that underlies the PCollectionView, that
// PCollection should not be cached as the SparkRunner does not access that PCollection to
// access the PCollectionView.
pipeline
.apply(Create.of("foo", "baz"))
.apply(
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext processContext) {
if (processContext.sideInput(view).contains(processContext.element())) {
processContext.output(processContext.element());
}
}
})
.withSideInputs(view));
JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, options);
SparkRunner.CacheVisitor cacheVisitor =
new SparkRunner.CacheVisitor(new TransformTranslator.Translator(), ctxt);
pipeline.traverseTopologically(cacheVisitor);
assertEquals(2L, (long) ctxt.getCacheCandidates().get(pCollection));
assertEquals(1L, ctxt.getCacheCandidates().values().stream().filter(l -> l > 1).count());
}
@Override
public void run()
throws Exception {
if (!_enableParallelPush) {
super.run();
} else {
List<Path> segmentPathsToPush = getDataFilePaths(_segmentPattern);
retainRecentFiles(segmentPathsToPush, _lookBackPeriod);
List<String> segmentsToPush = new ArrayList<>();
segmentPathsToPush.forEach(path -> {
segmentsToPush.add(path.toString());
});
JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
if (_pushJobParallelism == -1) {
_pushJobParallelism = segmentsToPush.size();
}
JavaRDD<String> pathRDD = sparkContext.parallelize(segmentsToPush, _pushJobParallelism);
pathRDD.foreach(segmentTarPath -> {
try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
FileSystem fileSystem = FileSystem.get(new Path(segmentTarPath).toUri(), new Configuration());
// TODO: Deal with invalid prefixes in the future
List<String> currentSegments = controllerRestApi.getAllSegments("OFFLINE");
controllerRestApi.pushSegments(fileSystem, Arrays.asList(new Path(segmentTarPath)));
if (_deleteExtraSegments) {
controllerRestApi
.deleteSegmentUris(getSegmentsToDelete(currentSegments, Arrays.asList(new Path(segmentTarPath))));
}
}
});
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testStoreDataToIgnite() throws Exception {
JavaSparkContext sc = createContext();
JavaIgniteContext<String, String> ic = null;
try {
ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider(), false);
ic.fromCache(PARTITIONED_CACHE_NAME)
.savePairs(sc.parallelize(F.range(0, KEYS_CNT), GRID_CNT).mapToPair(TO_PAIR_F), true, false);
Ignite ignite = ic.ignite();
IgniteCache<String, String> cache = ignite.cache(PARTITIONED_CACHE_NAME);
for (int i = 0; i < KEYS_CNT; i++) {
String val = cache.get(String.valueOf(i));
assertNotNull("Value was not put to cache for key: " + i, val);
assertEquals("Invalid value stored for key: " + i, "val" + i, val);
}
}
finally {
if (ic != null)
ic.close(true);
sc.stop();
}
}
private static void runJsonDatasetExample(SparkSession spark) {
// $example on:json_dataset$
// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
Dataset<Row> people = spark.read().json(Constant.LOCAL_FILE_PREX +"/data/resources/people.json");
// The inferred schema can be visualized using the printSchema() method
people.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Creates a temporary view using the DataFrame
people.createOrReplaceTempView("people");
// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
namesDF.show();
// +------+
// | name|
// +------+
// |Justin|
// +------+
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD =
new JavaSparkContext(spark.sparkContext()).parallelize(jsonData);
Dataset anotherPeople = spark.read().json(anotherPeopleRDD);
anotherPeople.show();
// +---------------+----+
// | address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+
// $example off:json_dataset$
}
@Test(dataProvider = "mapPairs", groups = "spark")
public void testMapGroupedReadsToTax(final int readLength, final List<Integer> NM1, final List<Integer> NM2,
final List<Integer> clip1, final List<Integer> clip2,
final List<Integer> insert1, final List<Integer> insert2,
final List<Integer> delete1, final List<Integer> delete2,
final List<String> contig1, final List<String> contig2,
final List<Integer> truthTax) {
final JavaSparkContext ctx = SparkContextFactory.getTestSparkContext();
final Broadcast<PSTaxonomyDatabase> taxonomyDatabaseBroadcast = ctx.broadcast(taxonomyDatabase);
//Test with alternate alignments assigned to the XA tag
final List<Iterable<GATKRead>> readListXA = new ArrayList<>();
readListXA.add(generateReadPair(readLength, NM1, NM2, clip1, clip2, insert1, insert2, delete1, delete2, contig1, contig2, "XA"));
final JavaRDD<Iterable<GATKRead>> pairsXA = ctx.parallelize(readListXA);
final JavaRDD<Tuple2<Iterable<GATKRead>, PSPathogenAlignmentHit>> resultXA = PSScorer.mapGroupedReadsToTax(pairsXA,
MIN_IDENT, IDENT_MARGIN, taxonomyDatabaseBroadcast);
final PSPathogenAlignmentHit infoXA = resultXA.first()._2;
Assert.assertNotNull(infoXA);
Assert.assertEquals(infoXA.taxIDs.size(), truthTax.size());
Assert.assertTrue(infoXA.taxIDs.containsAll(truthTax));
Assert.assertEquals(infoXA.numMates, 2);
//Test SA tag
final List<Iterable<GATKRead>> readListSA = new ArrayList<>();
readListSA.add(generateReadPair(readLength, NM1, NM2, clip1, clip2, insert1, insert2, delete1, delete2, contig1, contig2, "SA"));
final JavaRDD<Iterable<GATKRead>> pairsSA = ctx.parallelize(readListSA);
final JavaRDD<Tuple2<Iterable<GATKRead>, PSPathogenAlignmentHit>> resultSA = PSScorer.mapGroupedReadsToTax(pairsSA,
MIN_IDENT, IDENT_MARGIN, taxonomyDatabaseBroadcast);
final PSPathogenAlignmentHit infoSA = resultSA.first()._2;
Assert.assertNotNull(infoSA);
Assert.assertEquals(infoSA.taxIDs.size(), truthTax.size());
Assert.assertTrue(infoSA.taxIDs.containsAll(truthTax));
Assert.assertEquals(infoSA.numMates, 2);
}
public static void main(String[] args) {
// SparkConf conf = new SparkConf().setAppName("JDBCDataSource").setMaster("local");
JavaSparkContext sc = SparkUtils.getRemoteSparkContext(JDBCDataSource.class);
SQLContext sqlContext = new SQLContext(sc);
Map<String, String> options = new HashMap<String, String>();
options.put("url", "jdbc:mysql://192.168.2.129:3306/hive");
options.put("dbtable", "t_user");
options.put("user", "root");
options.put("password", "666666");
// 加载jdbc数据配置信息 并不会立即连接数据库
Dataset<Row> dataset1 = sqlContext.read().format("jdbc").options(options).load();
// options.put("dbtable", "tb_item");
// DataFrame dataFrame2 = sqlContext.read().format("jdbc").options(options).load();
// 读取jdbc表数据
dataset1.javaRDD().foreach(new VoidFunction<Row>() {
@Override
public void call(Row row) throws Exception {
System.out.println(row);
}
});
// 将RDD数据存储到MYSQL中
saveToMysql( sqlContext, options);
sc.close();
}
/**
* @param sparkContext active Spark Context
* @param trainData training data on which to build a model
* @param hyperParameters ordered list of hyper parameter values to use in building model
* @param candidatePath directory where additional model files can be written
* @return a {@link PMML} representation of a model trained on the given data
*/
@Override
public PMML buildModel(JavaSparkContext sparkContext,
JavaRDD<String> trainData,
List<?> hyperParameters,
Path candidatePath) {
int numClusters = (Integer) hyperParameters.get(0);
Preconditions.checkArgument(numClusters > 1);
log.info("Building KMeans Model with {} clusters", numClusters);
JavaRDD<Vector> trainingData = parsedToVectorRDD(trainData.map(MLFunctions.PARSE_FN));
KMeansModel kMeansModel = KMeans.train(trainingData.rdd(), numClusters, maxIterations, initializationStrategy);
return kMeansModelToPMML(kMeansModel, fetchClusterCountsFromModel(trainingData, kMeansModel));
}
public JavaRDD<String> generateRDD() {
Random values = new Random();
values.setSeed(System.currentTimeMillis());
List<String> list = Lists.newLinkedList();
for (int i = 0; i < batchSize; i++) {
list.add(String.valueOf(values.nextLong()));
}
SparkContext sc = Contexts.getSparkSession().sparkContext();
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
return jsc.parallelize(list,this.partitions);
}