下面列出了org.apache.hadoop.io.ArrayPrimitiveWritable#org.apache.spark.api.java.JavaRDD 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private JavaRDD<ItemScore> validScores(JavaRDD<ItemScore> all, final Set<String> whitelist, final Set<String> blacklist, final Set<String> categories, final Map<String, Item> items, String userEntityId) {
final Set<String> seenItemEntityIds = seenItemEntityIds(userEntityId);
final Set<String> unavailableItemEntityIds = unavailableItemEntityIds();
return all.filter(new Function<ItemScore, Boolean>() {
@Override
public Boolean call(ItemScore itemScore) throws Exception {
Item item = items.get(itemScore.getItemEntityId());
return (item != null
&& passWhitelistCriteria(whitelist, item.getEntityId())
&& passBlacklistCriteria(blacklist, item.getEntityId())
&& passCategoryCriteria(categories, item)
&& passUnseenCriteria(seenItemEntityIds, item.getEntityId())
&& passAvailabilityCriteria(unavailableItemEntityIds, item.getEntityId()));
}
});
}
/**
* Loads Reads using samReaderFactory, then calling ctx.parallelize.
* @param bam file to load
* @return RDD of (SAMRecord-backed) GATKReads from the file.
*/
public JavaRDD<GATKRead> getSerialReads(final JavaSparkContext ctx, final String bam, final GATKPath referencePath, final ValidationStringency validationStringency) {
final SAMFileHeader readsHeader = new ReadsSparkSource(ctx, validationStringency).getHeader(new GATKPath(bam), referencePath);
final SamReaderFactory samReaderFactory;
if (referencePath != null) {
samReaderFactory = SamReaderFactory.makeDefault().validationStringency(validationStringency).referenceSequence(referencePath.toPath());
} else {
samReaderFactory = SamReaderFactory.makeDefault().validationStringency(validationStringency);
}
ReadsDataSource bam2 = new ReadsPathDataSource(IOUtils.getPath(bam), samReaderFactory);
List<GATKRead> records = Lists.newArrayList();
for ( GATKRead read : bam2 ) {
records.add(read);
}
return ctx.parallelize(records);
}
private static <T> void translateFlatten(
PTransformNode transformNode, RunnerApi.Pipeline pipeline, SparkTranslationContext context) {
Map<String, String> inputsMap = transformNode.getTransform().getInputsMap();
JavaRDD<WindowedValue<T>> unionRDD;
if (inputsMap.isEmpty()) {
unionRDD = context.getSparkContext().emptyRDD();
} else {
JavaRDD<WindowedValue<T>>[] rdds = new JavaRDD[inputsMap.size()];
int index = 0;
for (String inputId : inputsMap.values()) {
rdds[index] = ((BoundedDataset<T>) context.popDataset(inputId)).getRDD();
index++;
}
unionRDD = context.getSparkContext().union(rdds);
}
context.pushDataset(getOutputId(transformNode), new BoundedDataset<>(unionRDD));
}
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("PCA Example");
SparkContext sc = new SparkContext(conf);
// $example on$
double[][] array = {{1.12, 2.05, 3.12}, {5.56, 6.28, 8.94}, {10.2, 8.0, 20.5}};
LinkedList<Vector> rowsList = new LinkedList<>();
for (int i = 0; i < array.length; i++) {
Vector currentRow = Vectors.dense(array[i]);
rowsList.add(currentRow);
}
JavaRDD<Vector> rows = JavaSparkContext.fromSparkContext(sc).parallelize(rowsList);
// Create a RowMatrix from JavaRDD<Vector>.
RowMatrix mat = new RowMatrix(rows.rdd());
// Compute the top 3 principal components.
Matrix pc = mat.computePrincipalComponents(3);
RowMatrix projected = mat.multiply(pc);
// $example off$
Vector[] collectPartitions = (Vector[])projected.rows().collect();
System.out.println("Projected vector of principal component:");
for (Vector vector : collectPartitions) {
System.out.println("\t" + vector);
}
}
private static JavaRDD<String[]> getOtherFormatHiveInput(JavaSparkContext sc, String hiveTable) {
SparkSession sparkSession = SparkSession.builder().sparkContext(HiveUtils.withHiveExternalCatalog(sc.sc()))
.config(sc.getConf()).enableHiveSupport().getOrCreate();
final Dataset intermediateTable = sparkSession.table(hiveTable);
return intermediateTable.javaRDD().map(new Function<Row, String[]>() {
@Override
public String[] call(Row row) throws Exception {
String[] result = new String[row.size()];
for (int i = 0; i < row.size(); i++) {
final Object o = row.get(i);
if (o != null) {
result[i] = o.toString();
} else {
result[i] = null;
}
}
return result;
}
});
}
/**
* Build a new classifier by analyzing the training data available in the
* specified input file. The file must be in LibSvm data format.
*
* @param libSvmFile The input file containing the documents used as training data.
* @param labels0Based True if the label indexes specified in the input file are 0-based (i.e. the first label ID is 0), false if they
* are 1-based (i.e. the first label ID is 1).
* @param binaryProblem True if the input file contains data for a binary problem, false if the input file contains data for a multiclass multilabel
* problem.
* @return A new MP-Boost classifier.
*/
public BoostClassifier buildModel(String libSvmFile, boolean labels0Based, boolean binaryProblem) {
if (libSvmFile == null || libSvmFile.isEmpty())
throw new IllegalArgumentException("The input file is 'null' or empty");
int minNumPartitions = 8;
if (this.numDocumentsPartitions != -1)
minNumPartitions = this.numDocumentsPartitions;
JavaRDD<MultilabelPoint> docs = DataUtils.loadLibSvmFileFormatData(sc, libSvmFile, labels0Based, binaryProblem, minNumPartitions);
if (this.numDocumentsPartitions == -1)
this.numDocumentsPartitions = sc.defaultParallelism();
if (this.numFeaturesPartitions == -1)
this.numFeaturesPartitions = sc.defaultParallelism();
if (this.numLabelsPartitions == -1)
this.numLabelsPartitions = sc.defaultParallelism();
Logging.l().info("Docs partitions = " + this.numDocumentsPartitions + ", feats partitions = " + this.numFeaturesPartitions + ", labels partitions = " + this.getNumLabelsPartitions());
return buildModel(docs);
}
@Test
public void testDataFrameSumDMLVectorWithIDColumn() {
System.out.println("MLContextTest - DataFrame sum DML, vector with ID column");
List<Tuple2<Double, Vector>> list = new ArrayList<>();
list.add(new Tuple2<>(1.0, Vectors.dense(1.0, 2.0, 3.0)));
list.add(new Tuple2<>(2.0, Vectors.dense(4.0, 5.0, 6.0)));
list.add(new Tuple2<>(3.0, Vectors.dense(7.0, 8.0, 9.0)));
JavaRDD<Tuple2<Double, Vector>> javaRddTuple = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddTuple.map(new DoubleVectorRow());
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C1", new VectorUDT(), true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema);
MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_VECTOR_WITH_INDEX);
Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame, mm);
setExpectedStdOut("sum: 45.0");
ml.execute(script);
}
private static void writeReadsADAM(
final JavaSparkContext ctx, final String outputFile, final JavaRDD<SAMRecord> reads,
final SAMFileHeader header) throws IOException {
final SequenceDictionary seqDict = SequenceDictionary.fromSAMSequenceDictionary(header.getSequenceDictionary());
final ReadGroupDictionary readGroups = ReadGroupDictionary.fromSAMHeader(header);
final JavaPairRDD<Void, AlignmentRecord> rddAlignmentRecords =
reads.map(read -> {
read.setHeaderStrict(header);
AlignmentRecord alignmentRecord = GATKReadToBDGAlignmentRecordConverter.convert(read, seqDict, readGroups);
read.setHeaderStrict(null); // Restore the header to its previous state so as not to surprise the caller
return alignmentRecord;
}).mapToPair(alignmentRecord -> new Tuple2<>(null, alignmentRecord));
// instantiating a Job is necessary here in order to set the Hadoop Configuration...
final Job job = Job.getInstance(ctx.hadoopConfiguration());
// ...here, which sets a config property that the AvroParquetOutputFormat needs when writing data. Specifically,
// we are writing the Avro schema to the Configuration as a JSON string. The AvroParquetOutputFormat class knows
// how to translate objects in the Avro data model to the Parquet primitives that get written.
AvroParquetOutputFormat.setSchema(job, AlignmentRecord.getClassSchema());
deleteHadoopFile(outputFile, ctx.hadoopConfiguration());
rddAlignmentRecords.saveAsNewAPIHadoopFile(
outputFile, Void.class, AlignmentRecord.class, AvroParquetOutputFormat.class, job.getConfiguration());
}
private static void distinct(JavaSparkContext sc) {
List<String> datas = Arrays.asList("张三", "李四", "tom", "张三");
/**
* ===================================
* | 去重--包含shuffle操作 |
* | Remove weights, including shuffle operations | |
* ===================================
*/
JavaRDD<String> distinctRDD = sc.parallelize(datas).distinct();
distinctRDD.foreach(new VoidFunction<String>() {
@Override
public void call(String t) throws Exception {
System.out.println(t);
}
});
}
/**
* Returns the latest versions of a given set of value sets.
*
* @param uris a set of URIs for which to retrieve the latest versions, or null to load them all
* @param includeExperimental whether to include value sets marked as experimental
* @return a map of value set URIs to the latest versions for them.
*/
public Map<String,String> getLatestVersions(final Set<String> uris, boolean includeExperimental) {
// Reduce by the concept map URI to return only the latest version
// per concept map. Spark's provided max aggregation function
// only works on numeric types, so we jump into RDDs and perform
// the reduce by hand.
JavaRDD<UrlAndVersion> members = this.valueSets.select("url", "version", "experimental")
.toJavaRDD()
.filter(row -> (uris == null || uris.contains(row.getString(0)))
&& (includeExperimental || row.isNullAt(2) || !row.getBoolean(2)))
.mapToPair(row -> new Tuple2<>(row.getString(0), row.getString(1)))
.reduceByKey((leftVersion, rightVersion) ->
leftVersion.compareTo(rightVersion) > 0 ? leftVersion : rightVersion)
.map(tuple -> new UrlAndVersion(tuple._1, tuple._2));
return spark.createDataset(members.rdd(), URL_AND_VERSION_ENCODER)
.collectAsList()
.stream()
.collect(Collectors.toMap(UrlAndVersion::getUrl,
UrlAndVersion::getVersion));
}
protected void executeTrainingDirect(SparkDl4jMultiLayer network, JavaRDD<DataSet> trainingData) {
if (collectTrainingStats)
stats.logFitStart();
//For "vanilla" parameter averaging training, we need to split the full data set into batches of size N, such that we can process the specified
// number of minibatches between averagings
//But to do that, wee need to know: (a) the number of examples, and (b) the number of workers
if (storageLevel != null)
trainingData.persist(storageLevel);
long totalDataSetObjectCount = getTotalDataSetObjectCount(trainingData);
// since this is real distributed training, we don't need to split data
doIteration(network, trainingData, 1, 1);
if (collectTrainingStats)
stats.logFitEnd((int) totalDataSetObjectCount);
}
private static <L extends Locatable, I extends Locatable, T> JavaRDD<T> joinOverlapping(JavaSparkContext ctx, JavaRDD<L> locatables, Class<L> locatableClass,
SAMSequenceDictionary sequenceDictionary, JavaRDD<I> intervals,
int maxLocatableLength, MapFunction<Tuple2<I, Iterable<L>>, T> f) {
return joinOverlapping(ctx, locatables, locatableClass, sequenceDictionary, intervals, maxLocatableLength,
(FlatMapFunction2<Iterator<L>, Iterator<I>, T>) (locatablesIterator, shardsIterator) -> Iterators.transform(locatablesPerShard(locatablesIterator, shardsIterator, sequenceDictionary, maxLocatableLength), new Function<Tuple2<I,Iterable<L>>, T>() {
@Nullable
@Override
public T apply(@Nullable Tuple2<I, Iterable<L>> input) {
try {
return f.call(input);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}));
}
/**
* Renders CATMAID overview ('small') images for each layer.
*
* @param sparkContext context for current run.
* @param broadcastBoxGenerator box generator broadcast to all worker nodes.
*/
private void renderOverviewImages(final JavaSparkContext sparkContext,
final Broadcast<BoxGenerator> broadcastBoxGenerator) {
final JavaRDD<Double> zValuesRdd = sparkContext.parallelize(zValues);
final JavaRDD<Integer> renderedOverview = zValuesRdd.map((Function<Double, Integer>) z -> {
final BoxGenerator localBoxGenerator = broadcastBoxGenerator.getValue();
localBoxGenerator.renderOverview(z.intValue());
return 1;
});
final long renderedOverviewCount = renderedOverview.count();
LOG.info(""); // empty statement adds newline to lengthy unterminated stage progress lines in log
LOG.info("run: rendered {} overview images", renderedOverviewCount);
}
public static DataAnalysis analyze(Schema schema, JavaRDD<List<Writable>> data, int maxHistogramBuckets) {
data.cache();
/*
* TODO: Some care should be given to add histogramBuckets and histogramBucketCounts to this in the future
*/
List<ColumnType> columnTypes = schema.getColumnTypes();
List<AnalysisCounter> counters =
data.aggregate(null, new AnalysisAddFunction(schema), new AnalysisCombineFunction());
double[][] minsMaxes = new double[counters.size()][2];
List<ColumnAnalysis> list = DataVecAnalysisUtils.convertCounters(counters, minsMaxes, columnTypes);
List<HistogramCounter> histogramCounters =
data.aggregate(null, new HistogramAddFunction(maxHistogramBuckets, schema, minsMaxes),
new HistogramCombineFunction());
DataVecAnalysisUtils.mergeCounters(list, histogramCounters);
return new DataAnalysis(schema, list);
}
/**
* Equivalent to {@link #balancedRandomSplit(int, int, JavaRDD)} with control over the RNG seed
*/
public static <T> JavaRDD<T>[] balancedRandomSplit(int totalObjectCount, int numObjectsPerSplit, JavaRDD<T> data,
long rngSeed) {
JavaRDD<T>[] splits;
if (totalObjectCount <= numObjectsPerSplit) {
splits = (JavaRDD<T>[]) Array.newInstance(JavaRDD.class, 1);
splits[0] = data;
} else {
int numSplits = totalObjectCount / numObjectsPerSplit; //Intentional round down
splits = (JavaRDD<T>[]) Array.newInstance(JavaRDD.class, numSplits);
for (int i = 0; i < numSplits; i++) {
splits[i] = data.mapPartitionsWithIndex(new SplitPartitionsFunction<T>(i, numSplits, rngSeed), true);
}
}
return splits;
}
/**
* Returns an RDD mapping each HoodieKey with a partitionPath/fileID which contains it. Option.Empty if the key is not
* found.
*
* @param hoodieKeys keys to lookup
* @param jsc spark context
* @param hoodieTable hoodie table object
*/
@Override
public JavaPairRDD<HoodieKey, Option<Pair<String, String>>> fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys,
JavaSparkContext jsc, HoodieTable<T> hoodieTable) {
JavaPairRDD<String, String> partitionRecordKeyPairRDD =
hoodieKeys.mapToPair(key -> new Tuple2<>(key.getPartitionPath(), key.getRecordKey()));
// Lookup indexes for all the partition/recordkey pair
JavaPairRDD<HoodieKey, HoodieRecordLocation> recordKeyLocationRDD =
lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable);
JavaPairRDD<HoodieKey, String> keyHoodieKeyPairRDD = hoodieKeys.mapToPair(key -> new Tuple2<>(key, null));
return keyHoodieKeyPairRDD.leftOuterJoin(recordKeyLocationRDD).mapToPair(keyLoc -> {
Option<Pair<String, String>> partitionPathFileidPair;
if (keyLoc._2._2.isPresent()) {
partitionPathFileidPair = Option.of(Pair.of(keyLoc._1().getPartitionPath(), keyLoc._2._2.get().getFileId()));
} else {
partitionPathFileidPair = Option.empty();
}
return new Tuple2<>(keyLoc._1, partitionPathFileidPair);
});
}
public static <T extends HoodieRecordPayload<T>> JavaRDD<HoodieRecord<T>> deduplicateRecords(
JavaRDD<HoodieRecord<T>> records, HoodieIndex<T> index, int parallelism) {
boolean isIndexingGlobal = index.isGlobal();
return records.mapToPair(record -> {
HoodieKey hoodieKey = record.getKey();
// If index used is global, then records are expected to differ in their partitionPath
Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
return new Tuple2<>(key, record);
}).reduceByKey((rec1, rec2) -> {
@SuppressWarnings("unchecked")
T reducedData = (T) rec1.getData().preCombine(rec2.getData());
// we cannot allow the user to change the key or partitionPath, since that will affect
// everything
// so pick it from one of the records.
return new HoodieRecord<T>(rec1.getKey(), reducedData);
}, parallelism).map(Tuple2::_2);
}
@Override
public void processInstruction(ExecutionContext ec) {
SparkExecutionContext sec = (SparkExecutionContext)ec;
//get input
JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec.getBinaryMatrixBlockRDDHandleForVariable( input1.getName() );
//execute tsmm instruction (always produce exactly one output block)
//(this formulation with values() requires --conf spark.driver.maxResultSize=0)
JavaRDD<MatrixBlock> tmp = in.map(new RDDTSMMFunction(_type));
MatrixBlock out = RDDAggregateUtils.sumStable(tmp);
//put output block into symbol table (no lineage because single block)
//this also includes implicit maintenance of matrix characteristics
sec.setMatrixOutput(output.getName(), out);
}
public static JavaPairRDD<MatrixIndexes, MatrixBlock> sortByVal( JavaPairRDD<MatrixIndexes, MatrixBlock> in,
JavaPairRDD<MatrixIndexes, MatrixBlock> in2, long rlen, int blen )
{
//create value-index rdd from inputs
JavaRDD<DoublePair> dvals = in.join(in2).values()
.flatMap(new ExtractDoubleValuesFunction2());
//sort (creates sorted range per partition)
long hdfsBlocksize = InfrastructureAnalyzer.getHDFSBlockSize();
int numPartitions = (int)Math.ceil(((double)rlen*8)/hdfsBlocksize);
JavaRDD<DoublePair> sdvals = dvals
.sortBy(new CreateDoubleKeyFunction2(), true, numPartitions);
//create binary block output
JavaPairRDD<MatrixIndexes, MatrixBlock> ret = sdvals
.zipWithIndex()
.mapPartitionsToPair(new ConvertToBinaryBlockFunction2(rlen, blen));
ret = RDDAggregateUtils.mergeByKey(ret, false);
return ret;
}
@Override
protected void runTool(final JavaSparkContext ctx) {
final VariantsSparkSource vss = new VariantsSparkSource(ctx);
final JavaRDD<VariantContext> variants = vss.getParallelVariantContexts(input, getIntervals());
final long count = variants.count();
System.out.println(count);
if( out != null) {
try (final PrintStream ps = new PrintStream(BucketUtils.createFile(out))) {
ps.print(count);
}
}
}
public static void writeRasterToGeoWave(
final SparkContext sc,
final Index index,
final DataStorePluginOptions outputStoreOptions,
final RasterDataAdapter adapter,
final JavaRDD<GridCoverage> inputRDD) throws IOException {
// setup the configuration and the output format
final Configuration conf = new org.apache.hadoop.conf.Configuration(sc.hadoopConfiguration());
GeoWaveOutputFormat.setStoreOptions(conf, outputStoreOptions);
GeoWaveOutputFormat.addIndex(conf, index);
GeoWaveOutputFormat.addDataAdapter(conf, adapter);
// create the job
final Job job = new Job(conf);
job.setOutputKeyClass(GeoWaveOutputKey.class);
job.setOutputValueClass(GridCoverage.class);
job.setOutputFormatClass(GeoWaveOutputFormat.class);
// broadcast string names
final ClassTag<String> stringTag = scala.reflect.ClassTag$.MODULE$.apply(String.class);
final Broadcast<String> typeName = sc.broadcast(adapter.getTypeName(), stringTag);
final Broadcast<String> indexName = sc.broadcast(index.getName(), stringTag);
// map to a pair containing the output key and the output value
inputRDD.mapToPair(
gridCoverage -> new Tuple2<>(
new GeoWaveOutputKey(typeName.value(), indexName.value()),
gridCoverage)).saveAsNewAPIHadoopDataset(job.getConfiguration());
}
@Test
public void testStringDataFrameToVectorDataFrame() {
List<String> list = new ArrayList<>();
list.add("((1.2, 4.3, 3.4))");
list.add("(1.2, 3.4, 2.2)");
list.add("[[1.2, 34.3, 1.2, 1.25]]");
list.add("[1.2, 3.4]");
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new StringToRow());
SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("C1", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> inDF = sparkSession.createDataFrame(javaRddRow, schema);
Dataset<Row> outDF = RDDConverterUtilsExt.stringDataFrameToVectorDataFrame(sparkSession, inDF);
List<String> expectedResults = new ArrayList<>();
expectedResults.add("[[1.2,4.3,3.4]]");
expectedResults.add("[[1.2,3.4,2.2]]");
expectedResults.add("[[1.2,34.3,1.2,1.25]]");
expectedResults.add("[[1.2,3.4]]");
List<Row> outputList = outDF.collectAsList();
for (Row row : outputList) {
assertTrue("Expected results don't contain: " + row, expectedResults.contains(row.toString()));
}
}
public static JavaRDD<ExecRow> toSparkRows(JavaRDD<ExecRow> execRows) {
return execRows.map(new Function<ExecRow, ExecRow>() {
@Override
public ExecRow call(ExecRow execRow) throws Exception {
return execRow;
}
});
}
/**
* Test if expected Instances (with added super types) are aggregated from input Quads
*/
@Test
public void testCreateInstancesWithSuperTypes() {
InstanceAggregatorConfig config = new InstanceAggregatorConfig()
.setDefaultLanguage("en")
.setAddSuperTypes(true);
InstanceAggregator collector = new InstanceAggregator(config, jsc().broadcast(schema));
JavaRDD<Instance> result = collector.aggregateInstances(TestUtils.getQuadsRDD(jsc(), "aggregatorTest.nq")).cache();
result = checkErrorInstance(result);
assertRDDEquals("Aggregated instances with super types are equal to expected RDD.", getExpectedRDD(true), result);
}
public void testEsRDDWrite() throws Exception {
Map<String, ?> doc1 = ImmutableMap.of("one", 1, "two", 2);
Map<String, ?> doc2 = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");
String target = "spark-test-java-basic-write/data";
JavaRDD<Map<String, ?>> javaRDD = sc.parallelize(ImmutableList.of(doc1, doc2));
// eliminate with static import
JavaEsSpark.saveToEs(javaRDD, target);
JavaEsSpark.saveToEs(javaRDD, ImmutableMap.of(ES_RESOURCE, target + "1"));
assertEquals(2, JavaEsSpark.esRDD(sc, target).count());
assertTrue(RestUtils.exists(target));
String results = RestUtils.get(target + "/_search?");
assertThat(results, containsString("SFO"));
}
public static void main(String[] args) throws Exception {
Properties prop = PropertyFileReader.readPropertyFile("iot-spark.properties");
String file = prop.getProperty("com.iot.app.hdfs") + "iot-data-parque";
String[] jars = {prop.getProperty("com.iot.app.jar")};
JavaSparkContext sparkContext = getSparkContext(prop, jars);
SQLContext sqlContext = new SQLContext(sparkContext);
Dataset<Row> dataFrame = getDataFrame(sqlContext, file);
JavaRDD<IoTData> rdd = dataFrame.javaRDD().map(getRowIoTDataFunction());
BatchHeatMapProcessor processor = new BatchHeatMapProcessor();
processor.processHeatMap(rdd);
sparkContext.close();
sparkContext.stop();
}
@Test
public void testsHBasePutAccessParallelism() {
HoodieWriteConfig config = getConfig();
HBaseIndex index = new HBaseIndex(config);
final JavaRDD<WriteStatus> writeStatusRDD = jsc.parallelize(
Arrays.asList(getSampleWriteStatus(1, 2), getSampleWriteStatus(0, 3), getSampleWriteStatus(10, 0)), 10);
final Tuple2<Long, Integer> tuple = index.getHBasePutAccessParallelism(writeStatusRDD);
final int hbasePutAccessParallelism = Integer.parseInt(tuple._2.toString());
final int hbaseNumPuts = Integer.parseInt(tuple._1.toString());
assertEquals(10, writeStatusRDD.getNumPartitions());
assertEquals(2, hbasePutAccessParallelism);
assertEquals(11, hbaseNumPuts);
}
public BulkInsertPreppedDeltaCommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> preppedInputRecordRdd,
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
super(jsc, config, table, instantTime, WriteOperationType.BULK_INSERT);
this.preppedInputRecordRdd = preppedInputRecordRdd;
this.bulkInsertPartitioner = bulkInsertPartitioner;
}
@Override
public void export(JavaRDD<ExportContainer<T>> rdd) {
if (codec == null)
rdd.saveAsTextFile(path);
else
rdd.saveAsTextFile(path, codec.getClass());
}
/**
* @throws Exception If failed.
*/
@Test
public void testReadDataFromIgnite() throws Exception {
JavaSparkContext sc = createContext();
JavaIgniteContext<String, Integer> ic = null;
try {
ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider(), false);
Ignite ignite = ic.ignite();
IgniteCache<String, Integer> cache = ignite.cache(PARTITIONED_CACHE_NAME);
for (int i = 0; i < KEYS_CNT; i++)
cache.put(String.valueOf(i), i);
JavaRDD<Integer> values = ic.fromCache(PARTITIONED_CACHE_NAME).map(STR_INT_PAIR_TO_INT_F);
int sum = values.fold(0, SUM_F);
int expSum = (KEYS_CNT * KEYS_CNT + KEYS_CNT) / 2 - KEYS_CNT;
assertEquals(expSum, sum);
}
finally {
if (ic != null)
ic.close(true);
sc.stop();
}
}