org.apache.hadoop.io.ArrayPrimitiveWritable#org.apache.spark.SparkConf源码实例Demo

下面列出了org.apache.hadoop.io.ArrayPrimitiveWritable#org.apache.spark.SparkConf 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: sparkResearch   文件: ReduceByKeyAndWindow.java
public static void main(String[] args) {
    SparkConf sparkConf = new SparkConf().setAppName("reduceByKeyAndWindow").setMaster("local[2]");
    JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(10));
    //检查点设置
    streamingContext.checkpoint("hdfs://localhost:9300");
    //数据源
    JavaDStream<String> dStream = streamingContext.socketTextStream("localhost", 8080);

    JavaPairDStream<String, Long> ipPairDstream = dStream.mapToPair(new GetIp());

    JavaPairDStream<String, Long> result = ipPairDstream.reduceByKeyAndWindow(new AddLongs(),
            new SubtractLongs(), Durations.seconds(30), Durations.seconds(10));

    try {
        streamingContext.start();
        streamingContext.awaitTermination();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
 
private void start() {
  // Create a local StreamingContext with two working thread and batch
  // interval of
  // 1 second
  SparkConf conf = new SparkConf().setMaster("local[2]").setAppName(
      "Streaming Ingestion File System Text File to Dataframe");
  JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations
      .seconds(5));

  JavaDStream<String> msgDataStream = jssc.textFileStream(StreamingUtils
      .getInputDirectory());

  msgDataStream.print();
  // Create JavaRDD<Row>
  msgDataStream.foreachRDD(new RowProcessor());

  jssc.start();
  try {
    jssc.awaitTermination();
  } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
  }
}
 
源代码3 项目: component-runtime   文件: SparkExtensionTest.java
public static void main(final String[] args) {
    final SparkConf conf =
            new SparkConf().setAppName(SparkClusterRuleTest.SubmittableMain.class.getName()).setMaster(args[0]);
    final JavaSparkContext context = new JavaSparkContext(conf);

    context
            .parallelize(singletonList("a b"))
            .flatMap((FlatMapFunction<String, String>) text -> asList(text.split(" ")).iterator())
            .mapToPair(word -> new Tuple2<>(word, 1))
            .reduceByKey((a, b) -> a + b)
            .foreach(result -> {
                try (final FileWriter writer = new FileWriter(args[1], true)) {
                    writer.write(result._1 + " -> " + result._2 + '\n');
                }
            });
}
 
源代码4 项目: kylin-on-parquet-v2   文件: SparkApplication.java
private void autoSetSparkConf(SparkConf sparkConf) throws Exception {
    logger.info("Start set spark conf automatically.");
    SparkConfHelper helper = new SparkConfHelper();
    helper.setFetcher(KylinBuildEnv.get().clusterInfoFetcher());
    Path shareDir = config.getJobTmpShareDir(project, jobId);
    String contentSize = chooseContentSize(shareDir);

    // add content size with unit
    helper.setOption(SparkConfHelper.SOURCE_TABLE_SIZE, contentSize);
    helper.setOption(SparkConfHelper.LAYOUT_SIZE, Integer.toString(layoutSize));
    Map<String, String> configOverride = config.getSparkConfigOverride();
    helper.setConf(SparkConfHelper.DEFAULT_QUEUE, configOverride.get(SparkConfHelper.DEFAULT_QUEUE));
    helper.setOption(SparkConfHelper.REQUIRED_CORES, calculateRequiredCores());
    helper.setConf(SparkConfHelper.COUNT_DISTICT, hasCountDistinct().toString());
    helper.generateSparkConf();
    helper.applySparkConf(sparkConf);
}
 
源代码5 项目: deeplearning4j   文件: SparkSequenceVectorsTest.java
@Before
public void setUp() throws Exception {
    if (sequencesCyclic == null) {
        sequencesCyclic = new ArrayList<>();

        // 10 sequences in total
        for (int с = 0; с < 10; с++) {

            Sequence<VocabWord> sequence = new Sequence<>();

            for (int e = 0; e < 10; e++) {
                // we will have 9 equal elements, with total frequency of 10
                sequence.addElement(new VocabWord(1.0, "" + e, (long) e));
            }

            // and 1 element with frequency of 20
            sequence.addElement(new VocabWord(1.0, "0", 0L));
            sequencesCyclic.add(sequence);
        }
    }

    SparkConf sparkConf = new SparkConf().setMaster("local[8]")
            .set("spark.driver.host", "localhost")
            .setAppName("SeqVecTests");
    sc = new JavaSparkContext(sparkConf);
}
 
public static void main(String[] args) {

    SparkConf conf =
      new SparkConf().setAppName("JavaHypothesisTestingKolmogorovSmirnovTestExample");
    JavaSparkContext jsc = new JavaSparkContext(conf);

    // $example on$
    JavaDoubleRDD data = jsc.parallelizeDoubles(Arrays.asList(0.1, 0.15, 0.2, 0.3, 0.25));
    KolmogorovSmirnovTestResult testResult =
      Statistics.kolmogorovSmirnovTest(data, "norm", 0.0, 1.0);
    // summary of the test including the p-value, test statistic, and null hypothesis
    // if our p-value indicates significance, we can reject the null hypothesis
    System.out.println(testResult);
    // $example off$

    jsc.stop();
  }
 
public static void wordCountJava8( String filename )
{
    // Define a configuration to use to interact with Spark
    SparkConf conf = new SparkConf().setMaster("local").setAppName("Work Count App");

    // Create a Java version of the Spark Context from the configuration
    JavaSparkContext sc = new JavaSparkContext(conf);

    // Load the input data, which is a text file read from the command line
    JavaRDD<String> input = sc.textFile( filename );

    // Java 8 with lambdas: split the input string into words
   // TODO here a change has happened 
    JavaRDD<String> words = input.flatMap( s -> Arrays.asList( s.split( " " ) ).iterator() );

    // Java 8 with lambdas: transform the collection of words into pairs (word and 1) and then count them
    JavaPairRDD<Object, Object> counts = words.mapToPair( t -> new Tuple2( t, 1 ) ).reduceByKey( (x, y) -> (int)x + (int)y );

    // Save the word count back out to a text file, causing evaluation.
    counts.saveAsTextFile( "output" );
}
 
源代码8 项目: tutorials   文件: WordCount.java
public static void main(String[] args) throws Exception {
    if (args.length < 1) {
        System.err.println("Usage: JavaWordCount <file>");
        System.exit(1);
    }
    SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount")
        .setMaster("local");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);
    JavaRDD<String> lines = ctx.textFile(args[0], 1);

    JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());
    JavaPairRDD<String, Integer> wordAsTuple = words.mapToPair(word -> new Tuple2<>(word, 1));
    JavaPairRDD<String, Integer> wordWithCount = wordAsTuple.reduceByKey((Integer i1, Integer i2)->i1 + i2);
    List<Tuple2<String, Integer>> output = wordWithCount.collect();
    for (Tuple2<?, ?> tuple : output) {
         System.out.println(tuple._1() + ": " + tuple._2());
    }
    ctx.stop();
}
 
源代码9 项目: mmtf-spark   文件: ContainsDProteinChainTest.java
@Before
public void setUp() throws Exception {
	SparkConf conf = new SparkConf().setMaster("local[*]").setAppName(ContainsDProteinChainTest.class.getSimpleName());
    sc = new JavaSparkContext(conf);
    
    // 2ONX: only L-protein chain
    // 1JLP: single L-protein chains with non-polymer capping group (NH2)
    // 5X6H: L-protein and DNA chain
    // 5L2G: DNA chain
    // 2MK1: D-saccharide
    // 2V5W: Chain C: GLY-GLY-GLY matches both D-protein and L-protein
    // 5XDP: L-protein and D-protein (modified)
    // 5GOD: 2 L-protein + 2 D-protein
    List<String> pdbIds = Arrays.asList("2ONX","1JLP","5X6H","5L2G","2MK1","2V5W","5XDP","5GOD");
    pdb = MmtfReader.downloadReducedMmtfFiles(pdbIds, sc);
}
 
源代码10 项目: deeplearning4j   文件: Word2VecPerformerVoid.java
public void setup(SparkConf conf) {
    useAdaGrad = conf.getBoolean(ADAGRAD, false);
    negative = conf.getDouble(NEGATIVE, 5);
    numWords = conf.getInt(NUM_WORDS, 1);
    window = conf.getInt(WINDOW, 5);
    alpha = conf.getDouble(ALPHA, 0.025f);
    minAlpha = conf.getDouble(MIN_ALPHA, 1e-2f);
    totalWords = conf.getInt(NUM_WORDS, 1);
    iterations = conf.getInt(ITERATIONS, 5);
    vectorLength = conf.getInt(VECTOR_LENGTH, 100);

    initExpTable();

    if (negative > 0 && conf.contains(TABLE)) {
        ByteArrayInputStream bis = new ByteArrayInputStream(conf.get(TABLE).getBytes());
        DataInputStream dis = new DataInputStream(bis);
        table = Nd4j.read(dis);
    }
}
 
源代码11 项目: mmtf-spark   文件: NotFilterExample.java
public static void main(String[] args) throws FileNotFoundException {

		String path = MmtfReader.getMmtfReducedPath();
	    
	    SparkConf conf = new SparkConf().setMaster("local[*]").setAppName(NotFilterExample.class.getSimpleName());
	    JavaSparkContext sc = new JavaSparkContext(conf);
	    
	    long count = MmtfReader
	    		.readSequenceFile(path, sc) // read MMTF hadoop sequence file
	    		.filter(new ContainsLProteinChain()) // retain pdb entries that exclusively contain L-peptide chains
	    		// a NotFilter can be used to reverse a filter
	    		.filter(new NotFilter(new ContainsDnaChain())) // should not contain any DNA chains
	    		.count();
	    
	    System.out.println("# PDB entries with L-protein and without DNA chains: " + count);
	    sc.close();
	}
 
源代码12 项目: DataGenerator   文件: SparkDistributor.java
@Override
public void distribute(final List<Frontier> frontierList) {
    JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("dg-spark").setMaster(masterURL));

    generatedMaps = sc
            .parallelize(frontierList)
            .flatMap(new FlatMapFunction<Frontier, Map<String, String>>() {
                @Override
                public Iterable<Map<String, String>> call(Frontier frontier) {
                    LinkedList<Map<String, String>> storage = new LinkedList<>();
                    frontier.searchForScenarios(new CatchAndStoreProcessing(storage), searchExitFlag);

                    return storage;
                }
            })
            .flatMap(new FlatMapFunction<Map<String, String>, Map<String, String>>() {
                @Override
                public Iterable<Map<String, String>> call(Map<String, String> initialVars) {
                    return SparkDistributor.dataConsumer.transformAndReturn(initialVars);
                }
            });
}
 
源代码13 项目: iceberg   文件: TestManifestFileSerialization.java
@Test
public void testManifestFileKryoSerialization() throws IOException {
  File data = temp.newFile();
  Assert.assertTrue(data.delete());

  Kryo kryo = new KryoSerializer(new SparkConf()).newKryo();

  ManifestFile manifest = writeManifest(FILE_A);

  try (Output out = new Output(new FileOutputStream(data))) {
    kryo.writeClassAndObject(out, manifest);
    kryo.writeClassAndObject(out, manifest.copy());
    kryo.writeClassAndObject(out, GenericManifestFile.copyOf(manifest).build());
  }

  try (Input in = new Input(new FileInputStream(data))) {
    for (int i = 0; i < 3; i += 1) {
      Object obj = kryo.readClassAndObject(in);
      Assert.assertTrue("Should be a ManifestFile", obj instanceof ManifestFile);
      checkManifestFile(manifest, (ManifestFile) obj);
    }
  }
}
 
源代码14 项目: ytk-learn   文件: SparkTrainWorker.java
public SparkTrainWorker(
                        SparkConf conf,
                        String modelName,
                        String configPath,
                        String configFile,
                        String pyTransformScript,
                        boolean needPyTransform,
                        String loginName,
                        String hostName,
                        int hostPort,
                        int slaveNum,
                        int threadNum) throws Exception {
    super(modelName, configPath, configFile, pyTransformScript, needPyTransform,
            loginName, hostName, hostPort, threadNum);
    this.slaveNum = slaveNum;

    conf.set("spark.files.fetchTimeout", "3200");
    conf.set("spark.network.timeout", "3200");
    conf.set("spark.dynamicAllocation.executorIdleTimeout", "3200");
    conf.set("spark.dynamicAllocation.schedulerBacklogTimeout", "300");
    conf.set("spark.core.connection.auth.wait.timeout", "3200");
    conf.set("spark.memory.fraction", "0.01");
}
 
public static void main(String[] args) {

    SparkConf conf = new SparkConf().setAppName("JavaKernelDensityEstimationExample");
    JavaSparkContext jsc = new JavaSparkContext(conf);

    // $example on$
    // an RDD of sample data
    JavaRDD<Double> data = jsc.parallelize(
      Arrays.asList(1.0, 1.0, 1.0, 2.0, 3.0, 4.0, 5.0, 5.0, 6.0, 7.0, 8.0, 9.0, 9.0));

    // Construct the density estimator with the sample data
    // and a standard deviation for the Gaussian kernels
    KernelDensity kd = new KernelDensity().setSample(data).setBandwidth(3.0);

    // Find density estimates for the given values
    double[] densities = kd.estimate(new double[]{-1.0, 2.0, 5.0});

    System.out.println(Arrays.toString(densities));
    // $example off$

    jsc.stop();
  }
 
源代码16 项目: mmtf-spark   文件: PolymerCompositionTest.java
@Before
public void setUp() throws Exception {
	SparkConf conf = new SparkConf().setMaster("local[*]").setAppName(PolymerCompositionTest.class.getSimpleName());
    sc = new JavaSparkContext(conf);
    
    // 2ONX: only L-protein chain
    // 1JLP: single L-protein chains with non-polymer capping group (NH2)
    // 5X6H: L-protein and DNA chain (with std. nucleotides)
    // 5L2G: DNA chain (with non-std. nucleotide)
    // 2MK1: D-saccharide
    // 5UZT: RNA chain (with std. nucleotides)
    // 1AA6: contains SEC, selenocysteine (21st amino acid)
    // 1NTH: contains PYL, pyrrolysine (22nd amino acid)
    List<String> pdbIds = Arrays.asList("2ONX","1JLP","5X6H","5L2G","2MK1","5UZT","1AA6","1NTH");
    pdb = MmtfReader.downloadReducedMmtfFiles(pdbIds, sc);
}
 
源代码17 项目: geowave   文件: GeoWaveSparkConf.java
public static SparkSession createSessionFromParams(
    final String appName,
    String master,
    final String host,
    final String jars) {
  // Grab default config for GeoWave
  SparkConf defaultConfig = GeoWaveSparkConf.getDefaultConfig();
  // Apply master from default
  if (master == null) {
    master = "yarn";
  }

  // Apply user options if set, correctly handling host for yarn.
  if (appName != null) {
    defaultConfig = defaultConfig.setAppName(appName);
  }
  defaultConfig = defaultConfig.setMaster(master);
  if (host != null) {
    if (master != "yarn") {
      defaultConfig = defaultConfig.set("spark.driver.host", host);
    } else {
      LOGGER.warn(
          "Attempting to set spark driver host for yarn master. Normally this is handled via hadoop configuration. Remove host or set another master designation and try again.");
    }
  }

  if (jars != null) {
    defaultConfig = defaultConfig.set("spark.jars", jars);
  }

  // Finally return the session from builder
  return GeoWaveSparkConf.internalCreateSession(defaultConfig, null);
}
 
源代码18 项目: render   文件: BoxClient.java
public void run(final SparkConf sparkConf)
        throws IOException {

    final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

    LogUtilities.logSparkClusterInfo(sparkContext);

    setupForRun();

    boolean foundBoxesRenderedForPriorRun = false;
    if (parameters.cleanUpPriorRun) {
        foundBoxesRenderedForPriorRun = cleanUpPriorRun(sparkContext);
    }

    final JavaRDD<BoxData> distributedBoxDataRdd = partitionBoxes(sparkContext,
                                                                  foundBoxesRenderedForPriorRun);

    final Broadcast<BoxGenerator> broadcastBoxGenerator = sparkContext.broadcast(boxGenerator);

    if (parameters.validateLabelsOnly) {
        validateLabelBoxes(sparkContext, distributedBoxDataRdd);
    } else {
        for (int level = 0; level <= parameters.box.maxLevel; level++) {
            renderBoxesForLevel(level, distributedBoxDataRdd, broadcastBoxGenerator);
        }
    }

    if (parameters.box.isOverviewNeeded() && (! parameters.explainPlan) && (! parameters.validateLabelsOnly)) {
        renderOverviewImages(sparkContext,
                             broadcastBoxGenerator);
    }

    LogUtilities.logSparkClusterInfo(sparkContext); // log cluster info again here to add run stats to driver log

    sparkContext.stop();
}
 
源代码19 项目: articles   文件: SpringSparkDemoApplication.java
@Bean
public SparkSession spark(SparkConf sparkConf)
{
    SparkSession sparkSession = SparkSession.builder()
        .sparkContext(javaSparkContext(sparkConf).sc())
        .config(sparkConf)
        .getOrCreate();

    logger.info("Using Spark Version {}", sparkSession.version());

    return sparkSession;
}
 
源代码20 项目: gatk   文件: MarkDuplicatesSparkUtilsUnitTest.java
@Test
public void testChangingContigsOnHeaderlessSAMRecord() {
    final SparkConf conf = new SparkConf().set("spark.kryo.registrator",
            "org.broadinstitute.hellbender.tools.spark.transforms.markduplicates.MarkDuplicatesSparkUtilsUnitTest$TestGATKRegistrator");
    final SAMRecord read = ((SAMRecordToGATKReadAdapter) ArtificialReadUtils.createHeaderlessSamBackedRead("read1", "1", 100, 50)).getEncapsulatedSamRecord();
    final OpticalDuplicateFinder finder = new OpticalDuplicateFinder(OpticalDuplicateFinder.DEFAULT_READ_NAME_REGEX,2500, null);

    final OpticalDuplicateFinder roundTrippedRead = SparkTestUtils.roundTripInKryo(finder, OpticalDuplicateFinder.class, conf);
    Assert.assertEquals(roundTrippedRead.opticalDuplicatePixelDistance, finder.opticalDuplicatePixelDistance);
}
 
源代码21 项目: flink-perf   文件: Grep.java
public static void main(String[] args) {
	String master = args[0];
	String inFile = args[1];
	String outFile = args[2];

	String patterns[] = new String[args.length-3];
	System.arraycopy(args,3,patterns,0,args.length-3);
	System.err.println("Starting spark with master="+master+" in="+inFile);
	System.err.println("Using patterns: "+ Arrays.toString(patterns));

	SparkConf conf = new SparkConf().setAppName("Grep job").setMaster(master).set("spark.hadoop.validateOutputSpecs", "false");
	JavaSparkContext sc = new JavaSparkContext(conf);

	JavaRDD<String> file = sc.textFile(inFile);
	for(int p = 0; p < patterns.length; p++) {
		final String pattern = patterns[p];
		JavaRDD<String> res = file.filter(new Function<String, Boolean>() {
			private static final long serialVersionUID = 1L;
			Pattern p = Pattern.compile(pattern);

			@Override
			public Boolean call(String value) throws Exception {
				if (value == null || value.length() == 0) {
					return false;
				}
				final Matcher m = p.matcher(value);
				if (m.find()) {
					return true;
				}
				return false;
			}
		});
		res.saveAsTextFile(outFile+"_"+pattern);
	}
}
 
源代码22 项目: sylph   文件: SparkStreamingSqlEngine.java
private static Serializable compile(String jobId, SqlFlow sqlFlow, ConnectorStore connectorStore, SparkJobConfig sparkJobConfig, URLClassLoader jobClassLoader)
        throws JVMException
{
    int batchDuration = sparkJobConfig.getSparkStreamingBatchDuration();
    final AtomicBoolean isCompile = new AtomicBoolean(true);
    final Supplier<StreamingContext> appGetter = (Supplier<StreamingContext> & Serializable) () -> {
        logger.info("========create spark StreamingContext mode isCompile = " + isCompile.get() + "============");
        SparkConf sparkConf = isCompile.get() ?
                new SparkConf().setMaster("local[*]").setAppName("sparkCompile")
                : new SparkConf();
        SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
        StreamingContext ssc = new StreamingContext(sparkSession.sparkContext(), Duration.apply(batchDuration));

        //build sql
        SqlAnalyse analyse = new SparkStreamingSqlAnalyse(ssc, connectorStore, isCompile.get());
        try {
            buildSql(analyse, jobId, sqlFlow);
        }
        catch (Exception e) {
            throwsException(e);
        }
        return ssc;
    };

    JVMLauncher<Boolean> launcher = JVMLaunchers.<Boolean>newJvm()
            .setConsole((line) -> System.out.println(new Ansi().fg(YELLOW).a("[" + jobId + "] ").fg(GREEN).a(line).reset()))
            .setCallable(() -> {
                System.out.println("************ job start ***************");
                appGetter.get();
                return true;
            })
            .addUserURLClassLoader(jobClassLoader)
            .setClassLoader(jobClassLoader)
            .notDepThisJvmClassPath()
            .build();

    launcher.startAndGet();
    isCompile.set(false);
    return (Serializable) appGetter;
}
 
源代码23 项目: envelope   文件: TestKerberosUtils.java
@Test
public void testGetKerberosPrincFromSpark() {
  SparkConf sparkConf = new SparkConf(false);
  sparkConf.set("spark.yarn.principal", "boom-oo-ya-ta-ta-ta");
  KerberosUtils.setSparkConf(sparkConf);
  Map<String, Object> configMap = new HashMap<>();
  Config config = ConfigFactory.parseMap(configMap);

  String principal = KerberosUtils.getKerberosPrincipal(config);

  assertEquals("boom-oo-ya-ta-ta-ta", principal);
}
 
源代码24 项目: hui-bigdata-spark   文件: TransformationRDD.java
/**
 * 元素转换,在每一个分区内部进行元素转换.
 * demo计算目的:算平方。(参数1是分区的索引)
 *
 * @since hui_project 1.0.0
 */
public void testMapPartitionsWithIndex() {
    SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("test");
    JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
    JavaRDD<Integer> parallelize = sparkContext.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 3);
    JavaRDD<Tuple2<Integer, Integer>> rdd = parallelize.mapPartitionsWithIndex((x, y) -> getSquareWithIndex(x, y), false);
    checkResult(rdd.collect());
}
 
public static SparkSession getInstance(SparkConf sparkConf) {
    if (instance == null) {
        synchronized (JavaSparkSessionSingleton.class) {
            if (instance == null) {
                instance = SparkSession.builder().config(sparkConf).getOrCreate();
            }
        }
    }

    return instance;
}
 
源代码26 项目: envelope   文件: TestKerberosUtils.java
@Test
public void testGetKerberosPrincFromConfig() {
  SparkConf sparkConf = new SparkConf(false);
  sparkConf.set("spark.yarn.principal", "boom-oo-ya-ta-ta-ta");
  KerberosUtils.setSparkConf(sparkConf);
  Map<String, Object> configMap = new HashMap<>();
  configMap.put(USER_PRINC_CONFIG, "foo");
  Config config = ConfigFactory.parseMap(configMap);

  String principal = KerberosUtils.getKerberosPrincipal(config);

  assertEquals("foo", principal);
}
 
源代码27 项目: components   文件: SparkRunnerTestUtils.java
public Pipeline createPipeline() {
    SparkContextOptions sparkOpts = options.as(SparkContextOptions.class);
    sparkOpts.setFilesToStage(emptyList());

    SparkConf conf = new SparkConf();
    conf.setAppName(appName);
    conf.setMaster("local[2]");
    conf.set("spark.driver.allowMultipleContexts", "true");
    JavaSparkContext jsc = new JavaSparkContext(new SparkContext(conf));
    sparkOpts.setProvidedSparkContext(jsc);
    sparkOpts.setUsesProvidedSparkContext(true);
    sparkOpts.setRunner(SparkRunner.class);

    return Pipeline.create(sparkOpts);
}
 
protected static JavaStreamingContext createContext(String ip, int port, String checkpointDirectory) {
	SparkConf sparkConf = new SparkConf().setAppName("WordCountRecoverableEx").setMaster("local[*]");
	JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
	streamingContext.checkpoint(checkpointDirectory);
	// Initial state RDD input to mapWithState
	@SuppressWarnings("unchecked")
	List<Tuple2<String, Integer>> tuples = Arrays.asList(new Tuple2<>("hello", 1), new Tuple2<>("world", 1));
	JavaPairRDD<String, Integer> initialRDD = streamingContext.sparkContext().parallelizePairs(tuples);

	JavaReceiverInputDStream<String> StreamingLines = streamingContext.socketTextStream(ip,port, StorageLevels.MEMORY_AND_DISK_SER);

	JavaDStream<String> words = StreamingLines.flatMap(str -> Arrays.asList(str.split(" ")).iterator());

	JavaPairDStream<String, Integer> wordCounts = words.mapToPair(str -> new Tuple2<>(str, 1))
			.reduceByKey((count1, count2) -> count1 + count2);

	// Update the cumulative count function
	Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc = new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
		@Override
		public Tuple2<String, Integer> call(String word, Optional<Integer> one, State<Integer> state) {
			int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
			Tuple2<String, Integer> output = new Tuple2<>(word, sum);
			state.update(sum);
			return output;
		}
	};

	// DStream made of get cumulative counts that get updated in every batch
	JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream = wordCounts
			.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));

	stateDstream.print();
	return streamingContext;
}
 
源代码29 项目: Explorer   文件: SparkConnectorCreatorTest.java
@Before
public void setUp(){
    keysToInspect = new ArrayList<>();
    keysToInspect.add(AttributteNames.CT_MASTER);
    properties = new Properties();
    creator = new ConnectorCreator<SparkConf>(new SparkConfComparator()," Porperty spark master is not filled ");
}
 
源代码30 项目: envelope   文件: TestKerberosUtils.java
@Test
public void testGetKerberosKeytabFromSpark() {
  SparkConf sparkConf = new SparkConf(false);
  sparkConf.set("spark.yarn.keytab", "boom-oo-ya-ta-ta-ta.kt");
  KerberosUtils.setSparkConf(sparkConf);
  Map<String, Object> configMap = new HashMap<>();
  Config config = ConfigFactory.parseMap(configMap);

  String keytab = KerberosUtils.getKerberosKeytab(config);

  assertEquals("boom-oo-ya-ta-ta-ta.kt", keytab);
}