类org.apache.spark.sql.streaming.StreamingQuery源码实例Demo

下面列出了怎么用org.apache.spark.sql.streaming.StreamingQuery的API类实例代码及写法,或者点击链接到github查看源代码。

protected void stop() {
  try {
    // TODO: await any outstanding queries on the session if this is streaming.
    if (isStreaming) {
      for (StreamingQuery query : sparkSession.streams().active()) {
        query.stop();
      }
    }
  } catch (Exception e) {
    throw beamExceptionFrom(e);
  } finally {
    sparkSession.stop();
    if (Objects.equals(state, State.RUNNING)) {
      this.state = State.STOPPED;
    }
  }
}
 
源代码2 项目: iceberg   文件: TestForwardCompatibility.java
@Test
public void testSparkStreamingWriteFailsUnknownTransform() throws IOException {
  File parent = temp.newFolder("avro");
  File location = new File(parent, "test");
  File dataFolder = new File(location, "data");
  dataFolder.mkdirs();
  File checkpoint = new File(parent, "checkpoint");
  checkpoint.mkdirs();

  HadoopTables tables = new HadoopTables(CONF);
  tables.create(SCHEMA, UNKNOWN_SPEC, location.toString());

  MemoryStream<Integer> inputStream = newMemoryStream(1, spark.sqlContext(), Encoders.INT());
  StreamingQuery query = inputStream.toDF()
      .selectExpr("value AS id", "CAST (value AS STRING) AS data")
      .writeStream()
      .outputMode("append")
      .format("iceberg")
      .option("checkpointLocation", checkpoint.toString())
      .option("path", location.toString())
      .start();

  List<Integer> batch1 = Lists.newArrayList(1, 2);
  send(batch1, inputStream);

  AssertHelpers.assertThrows("Should reject streaming write with unsupported transform",
      StreamingQueryException.class, "Cannot write using unsupported transforms: zero",
      query::processAllAvailable);
}
 
源代码3 项目: sylph   文件: StructuredNodeLoader.java
@Override
public UnaryOperator<Dataset<Row>> loadSink(String driverStr, Map<String, Object> config)
{
    return stream -> {
        //-------启动job-------
        StreamingQuery streamingQuery = loadSinkWithComplic(driverStr, config).apply(stream).start(); //start job
        //streamingQuery.stop()
        return null;
    };
}
 
private void start() throws TimeoutException {
  log.debug("-> start()");

  SparkSession spark = SparkSession.builder()
      .appName("Read lines over a file stream").master("local")
      .getOrCreate();

  Dataset<Row> df = spark
      .readStream()
      .format("text")
      .load(StreamingUtils.getInputDirectory());

  StreamingQuery query = df
      .writeStream()
      .outputMode(OutputMode.Update())
      .format("console").start();

  try {
    query.awaitTermination();
  } catch (StreamingQueryException e) {
    log.error("Exception while waiting for query to end {}.", e
        .getMessage(),
        e);
  }

  // In this case everything is a string
  df.show();
  df.printSchema();
}
 
private void start() throws TimeoutException {
  log.debug("-> start()");

  SparkSession spark = SparkSession.builder()
      .appName("Read lines over a file stream")
      .master("local")
      .getOrCreate();

  Dataset<Row> df = spark
      .readStream()
      .format("text")
      .load(StreamingUtils.getInputDirectory());

  StreamingQuery query = df
      .writeStream()
      .outputMode(OutputMode.Update())
      .format("console")
      .start();

  try {
    query.awaitTermination();
  } catch (StreamingQueryException e) {
    log.error(
        "Exception while waiting for query to end {}.",
        e.getMessage(),
        e);
  }

  // Never executed
  df.show();
  df.printSchema();
}
 
public StreamingQuery start(final DataStreamWriter<?> writer, final String path) {
    Function0<StreamingQuery> runFunction = new AbstractFunction0<StreamingQuery>() {
        @Override
        public StreamingQuery apply() {
            return writer.start(path);
        }
    };
    return harness.startTest(runFunction);
}
 
public StreamingQuery start(final DataStreamWriter<?> writer) {
    Function0<StreamingQuery> runFunction = new AbstractFunction0<StreamingQuery>() {
        @Override
        public StreamingQuery apply() {
            return writer.start();
        }
    };
    return harness.startTest(runFunction);
}
 
public void run(final DataStreamWriter<?> writer, final String path) {
    Function0<StreamingQuery> runFunction = new AbstractFunction0<StreamingQuery>() {
        @Override
        public StreamingQuery apply() {
            return writer.start(path);
        }
    };
    harness.runTest(runFunction);
}
 
public void run(final DataStreamWriter<?> writer) {
    Function0<StreamingQuery> runFunction = new AbstractFunction0<StreamingQuery>() {
        @Override
        public StreamingQuery apply() {
            return writer.start();
        }
    };
    harness.runTest(runFunction);
}
 
public static void main(String[] args) throws InterruptedException, StreamingQueryException {

        System.setProperty("hadoop.home.dir", HADOOP_HOME_DIR_VALUE);

        final SparkConf conf = new SparkConf()
                .setMaster(RUN_LOCAL_WITH_AVAILABLE_CORES)
                .setAppName(APPLICATION_NAME)
                .set("spark.sql.caseSensitive", CASE_SENSITIVE);

        SparkSession sparkSession = SparkSession.builder()
                .config(conf)
                .getOrCreate();

        Dataset<Row> meetupDF = sparkSession.readStream()
                .format(STREAM_FORMAT)
                .option("kafka.bootstrap.servers", KAFKA_BROKERS)
                .option("subscribe", KAFKA_TOPIC)                
                .load();                              
        
        meetupDF.printSchema();

        Dataset<Row> rsvpAndTimestampDF = meetupDF
                .select(col("timestamp"),
                        from_json(col("value").cast("string"), RSVP_SCHEMA)
                                 .alias("rsvp"))
                .alias("meetup")
                .select("meetup.*");
        
        rsvpAndTimestampDF.printSchema();
       
        Dataset<Row> window = rsvpAndTimestampDF
                .withWatermark("timestamp", "1 minute")
                .groupBy(
                        window(col("timestamp"), "4 minutes", "2 minutes"),
                        col("rsvp.guests"))
                .count();

        StreamingQuery query = window.writeStream()
                .outputMode("complete")
                .format("console")                               
                .option("checkpointLocation", CHECKPOINT_LOCATION)
                .option("truncate", false)
                .start();

        query.awaitTermination();
    }
 
public static void main(String[] args) throws InterruptedException, StreamingQueryException {
 
      System.setProperty("hadoop.home.dir", HADOOP_HOME_DIR_VALUE);

      // * the schema can be written on disk, and read from disk
      // * the schema is not mandatory to be complete, it can contain only the needed fields    
      StructType RSVP_SCHEMA = new StructType()                                
              .add("event",
                      new StructType()
                              .add("event_id", StringType, true)
                              .add("event_name", StringType, true)
                              .add("event_url", StringType, true)
                              .add("time", LongType, true))
              .add("group",
                      new StructType()
                              .add("group_city", StringType, true)
                              .add("group_country", StringType, true)
                              .add("group_id", LongType, true)
                              .add("group_lat", DoubleType, true)
                              .add("group_lon", DoubleType, true)
                              .add("group_name", StringType, true)
                              .add("group_state", StringType, true)
                              .add("group_topics", DataTypes.createArrayType(
                                      new StructType()
                                              .add("topicName", StringType, true)
                                              .add("urlkey", StringType, true)), true)
                              .add("group_urlname", StringType, true))
              .add("guests", LongType, true)
              .add("member",
                      new StructType()
                              .add("member_id", LongType, true)
                              .add("member_name", StringType, true)                                
                              .add("photo", StringType, true))
              .add("mtime", LongType, true)
              .add("response", StringType, true)
              .add("rsvp_id", LongType, true)
              .add("venue",
                      new StructType()
                              .add("lat", DoubleType, true)
                              .add("lon", DoubleType, true)
                              .add("venue_id", LongType, true)
                              .add("venue_name", StringType, true))
              .add("visibility", StringType, true);

      final SparkConf conf = new SparkConf()
              .setMaster(RUN_LOCAL_WITH_AVAILABLE_CORES)
              .setAppName(APPLICATION_NAME)
              .set("spark.sql.caseSensitive", CASE_SENSITIVE);

      SparkSession spark = SparkSession
              .builder()
              .config(conf)
              .getOrCreate();

      PipelineModel pipelineModel = PipelineModel.load(MODEL_FOLDER_PATH);
     
      Dataset<Row> meetupStream = spark.readStream()
              .format(KAFKA_FORMAT)
              .option("kafka.bootstrap.servers", KAFKA_BROKERS)
              .option("subscribe", KAFKA_TOPIC)
              .load();

      Dataset<Row> gatheredDF = meetupStream.select(
    (from_json(col("value").cast("string"), RSVP_SCHEMA))
	        .alias("rsvp"))
	.alias("meetup")
          .select("meetup.*");
		
      Dataset<Row> filteredDF = gatheredDF.filter(e -> !e.anyNull());  

      Dataset<Row> preparedDF = filteredDF.select(
        col("rsvp.group.group_city"),
        col("rsvp.group.group_lat"), col("rsvp.group.group_lon"), 
		col("rsvp.response")
);
		                
      preparedDF.printSchema();
   
      Dataset<Row> predictionDF = pipelineModel.transform(preparedDF);
      
      StreamingQuery query = predictionDF.writeStream()                
              .format(JSON_FORMAT)
              .option("path", RESULT_FOLDER_PATH)
              .option("checkpointLocation", CHECKPOINT_LOCATION)
              .trigger(Trigger.ProcessingTime(QUERY_INTERVAL_SECONDS))
              .option("truncate", false)
              .start();

      query.awaitTermination();
  }
 
public static void main(String[] args) throws Exception {
//Read properties
Properties prop = PropertyFileReader.readPropertyFile();

//SparkSesion
SparkSession spark = SparkSession
	      .builder()
	      .appName("VideoStreamProcessor")
	      .master(prop.getProperty("spark.master.url"))
	      .getOrCreate();	

//directory to save image files with motion detected
final String processedImageDir = prop.getProperty("processed.output.dir");
logger.warn("Output directory for saving processed images is set to "+processedImageDir+". This is configured in processed.output.dir key of property file.");

//create schema for json message
StructType schema =  DataTypes.createStructType(new StructField[] { 
		DataTypes.createStructField("cameraId", DataTypes.StringType, true),
		DataTypes.createStructField("timestamp", DataTypes.TimestampType, true),
		DataTypes.createStructField("rows", DataTypes.IntegerType, true),
		DataTypes.createStructField("cols", DataTypes.IntegerType, true),
		DataTypes.createStructField("type", DataTypes.IntegerType, true),
		DataTypes.createStructField("data", DataTypes.StringType, true)
		});


//Create DataSet from stream messages from kafka
   Dataset<VideoEventData> ds = spark
     .readStream()
     .format("kafka")
     .option("kafka.bootstrap.servers", prop.getProperty("kafka.bootstrap.servers"))
     .option("subscribe", prop.getProperty("kafka.topic"))
     .option("kafka.max.partition.fetch.bytes", prop.getProperty("kafka.max.partition.fetch.bytes"))
     .option("kafka.max.poll.records", prop.getProperty("kafka.max.poll.records"))
     .load()
     .selectExpr("CAST(value AS STRING) as message")
     .select(functions.from_json(functions.col("message"),schema).as("json"))
     .select("json.*")
     .as(Encoders.bean(VideoEventData.class)); 
   
   //key-value pair of cameraId-VideoEventData
KeyValueGroupedDataset<String, VideoEventData> kvDataset = ds.groupByKey(new MapFunction<VideoEventData, String>() {
	@Override
	public String call(VideoEventData value) throws Exception {
		return value.getCameraId();
	}
}, Encoders.STRING());
	
//process
Dataset<VideoEventData> processedDataset = kvDataset.mapGroupsWithState(new MapGroupsWithStateFunction<String, VideoEventData, VideoEventData,VideoEventData>(){
	@Override
	public VideoEventData call(String key, Iterator<VideoEventData> values, GroupState<VideoEventData> state) throws Exception {
		logger.warn("CameraId="+key+" PartitionId="+TaskContext.getPartitionId());
		VideoEventData existing = null;
		//check previous state
		if (state.exists()) {
			existing = state.get();
		}
		//classify image
		VideoEventData processed = ImageProcessor.process(key,values,processedImageDir,existing);
		
		//update last processed
		if(processed != null){
			state.update(processed);
		}
		return processed;
	}}, Encoders.bean(VideoEventData.class), Encoders.bean(VideoEventData.class));

//start
 StreamingQuery query = processedDataset.writeStream()
	      .outputMode("update")
	      .format("console")
	      .start();
 
 //await
    query.awaitTermination();
}
 
public static void main(String[] args) throws Exception {
  if (args.length < 3) {
    System.err.println("Usage: JavaStructuredNetworkWordCountWindowed <hostname> <port>" +
      " <window duration in seconds> [<slide duration in seconds>]");
    System.exit(1);
  }

  String host = args[0];
  int port = Integer.parseInt(args[1]);
  int windowSize = Integer.parseInt(args[2]);
  int slideSize = (args.length == 3) ? windowSize : Integer.parseInt(args[3]);
  if (slideSize > windowSize) {
    System.err.println("<slide duration> must be less than or equal to <window duration>");
  }
  String windowDuration = windowSize + " seconds";
  String slideDuration = slideSize + " seconds";

  SparkSession spark = SparkSession
    .builder()
    .appName("JavaStructuredNetworkWordCountWindowed")
    .getOrCreate();

  // Create DataFrame representing the stream of input lines from connection to host:port
  Dataset<Row> lines = spark
    .readStream()
    .format("socket")
    .option("host", host)
    .option("port", port)
    .option("includeTimestamp", true)
    .load();

  // Split the lines into words, retaining timestamps
  Dataset<Row> words = lines
    .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()))
    .flatMap(
      new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() {
        @Override
        public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> t) {
          List<Tuple2<String, Timestamp>> result = new ArrayList<Tuple2<String, Timestamp>>();
          for (String word : t._1.split(" ")) {
            result.add(new Tuple2<String, Timestamp>(word, t._2));
          }
          return result.iterator();
        }
      },
      Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())
    ).toDF("word", "timestamp");

  // Group the data by window and word and compute the count of each group
  Dataset<Row> windowedCounts = words.groupBy(
    functions.window(words.col("timestamp"), windowDuration, slideDuration),
    words.col("word")
  ).count().orderBy("window");

  // Start running the query that prints the windowed word counts to the console
  StreamingQuery query = windowedCounts.writeStream()
    .outputMode("complete")
    .format("console")
    .option("truncate", "false")
    .start();

  query.awaitTermination();
}
 
源代码14 项目: SparkDemo   文件: JavaStructuredKafkaWordCount.java
public static void main(String[] args) throws Exception {
  if (args.length < 3) {
    System.err.println("Usage: JavaStructuredKafkaWordCount <bootstrap-servers> " +
      "<subscribe-type> <topics>");
    System.exit(1);
  }

  String bootstrapServers = args[0];
  String subscribeType = args[1];
  String topics = args[2];

  SparkSession spark = SparkSession
    .builder()
    .appName("JavaStructuredKafkaWordCount")
    .getOrCreate();

  // Create DataSet representing the stream of input lines from kafka
  Dataset<String> lines = spark
    .readStream()
    .format("kafka")
    .option("kafka.bootstrap.servers", bootstrapServers)
    .option(subscribeType, topics)
    .load()
    .selectExpr("CAST(value AS STRING)")
    .as(Encoders.STRING());

  // Generate running word count
  Dataset<Row> wordCounts = lines.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public Iterator<String> call(String x) {
      return Arrays.asList(x.split(" ")).iterator();
    }
  }, Encoders.STRING()).groupBy("value").count();

  // Start running the query that prints the running counts to the console
  StreamingQuery query = wordCounts.writeStream()
    .outputMode("complete")
    .format("console")
    .start();

  query.awaitTermination();
}
 
源代码15 项目: SparkDemo   文件: JavaStructuredNetworkWordCount.java
public static void main(String[] args) throws Exception {
  if (args.length < 2) {
    System.err.println("Usage: JavaStructuredNetworkWordCount <hostname> <port>");
    System.exit(1);
  }

  String host = args[0];
  int port = Integer.parseInt(args[1]);

  SparkSession spark = SparkSession
    .builder()
    .appName("JavaStructuredNetworkWordCount")
    .getOrCreate();

  // Create DataFrame representing the stream of input lines from connection to host:port
  Dataset<Row> lines = spark
    .readStream()
    .format("socket")
    .option("host", host)
    .option("port", port)
    .load();

  // Split the lines into words
  Dataset<String> words = lines.as(Encoders.STRING())
    .flatMap(new FlatMapFunction<String, String>() {
      @Override
      public Iterator<String> call(String x) {
        return Arrays.asList(x.split(" ")).iterator();
      }
  }, Encoders.STRING());

  // Generate running word count
  Dataset<Row> wordCounts = words.groupBy("value").count();

  // Start running the query that prints the running counts to the console
  StreamingQuery query = wordCounts.writeStream()
    .outputMode("complete")
    .format("console")
    .start();

  query.awaitTermination();
}
 
public static void main(String[] args) throws Exception {
//Read properties
Properties prop = PropertyFileReader.readPropertyFile();

//SparkSesion
SparkSession spark = SparkSession
	      .builder()
	      .appName("VideoStreamProcessor")
	      .master(prop.getProperty("spark.master.url"))
	      .getOrCreate();	

//directory to save image files with motion detected
final String processedImageDir = prop.getProperty("processed.output.dir");
logger.warn("Output directory for saving processed images is set to "+processedImageDir+". This is configured in processed.output.dir key of property file.");

//create schema for json message
StructType schema =  DataTypes.createStructType(new StructField[] { 
		DataTypes.createStructField("cameraId", DataTypes.StringType, true),
		DataTypes.createStructField("timestamp", DataTypes.TimestampType, true),
		DataTypes.createStructField("rows", DataTypes.IntegerType, true),
		DataTypes.createStructField("cols", DataTypes.IntegerType, true),
		DataTypes.createStructField("type", DataTypes.IntegerType, true),
		DataTypes.createStructField("data", DataTypes.StringType, true)
		});


//Create DataSet from stream messages from kafka
   Dataset<VideoEventData> ds = spark
     .readStream()
     .format("kafka")
     .option("kafka.bootstrap.servers", prop.getProperty("kafka.bootstrap.servers"))
     .option("subscribe", prop.getProperty("kafka.topic"))
     .option("kafka.max.partition.fetch.bytes", prop.getProperty("kafka.max.partition.fetch.bytes"))
     .option("kafka.max.poll.records", prop.getProperty("kafka.max.poll.records"))
     .load()
     .selectExpr("CAST(value AS STRING) as message")
     .select(functions.from_json(functions.col("message"),schema).as("json"))
     .select("json.*")
     .as(Encoders.bean(VideoEventData.class)); 
   
   //key-value pair of cameraId-VideoEventData
KeyValueGroupedDataset<String, VideoEventData> kvDataset = ds.groupByKey(new MapFunction<VideoEventData, String>() {
	@Override
	public String call(VideoEventData value) throws Exception {
		return value.getCameraId();
	}
}, Encoders.STRING());
	
//process
Dataset<VideoEventData> processedDataset = kvDataset.mapGroupsWithState(new MapGroupsWithStateFunction<String, VideoEventData, VideoEventData,VideoEventData>(){
	@Override
	public VideoEventData call(String key, Iterator<VideoEventData> values, GroupState<VideoEventData> state) throws Exception {
		logger.warn("CameraId="+key+" PartitionId="+TaskContext.getPartitionId());
		VideoEventData existing = null;
		//check previous state
		if (state.exists()) {
			existing = state.get();
		}
		//detect motion
		VideoEventData processed = VideoMotionDetector.detectMotion(key,values,processedImageDir,existing);
		
		//update last processed
		if(processed != null){
			state.update(processed);
		}
		return processed;
	}}, Encoders.bean(VideoEventData.class), Encoders.bean(VideoEventData.class));

//start
 StreamingQuery query = processedDataset.writeStream()
	      .outputMode("update")
	      .format("console")
	      .start();
 
 //await
    query.awaitTermination();
}
 
public static void main(String[] args) throws StreamingQueryException, TimeoutException {
    //set log4j programmatically
    LogManager.getLogger("org.apache.spark").setLevel(Level.WARN);
    LogManager.getLogger("org.apache.kafka").setLevel(Level.WARN);
    LogManager.getLogger("akka").setLevel(Level.ERROR);
    //on windows we may need to configure winutils if hadoop_home is not set
    //System.setProperty("hadoop.home.dir", "c:/app/hadoop");
    //configure Spark
    SparkConf conf = new SparkConf()
            .setAppName("kafka-structured")
            .set("spark.driver.bindAddress", "localhost")
            .setMaster("local[*]");

    //initialize spark session
    SparkSession sparkSession = SparkSession
            .builder()
            .config(conf)
            .getOrCreate();

    //reduce task number
    sparkSession.sqlContext().setConf("spark.sql.shuffle.partitions", "3");

    //data stream from kafka
    Dataset<Row> ds1 = sparkSession
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("subscribe", "mytopic")
            .option("startingOffsets", "earliest")
            .load();
    //print kafka schema
    ds1.printSchema();
    
    //start the streaming query
    Dataset<Row> ds2 = ds1
            .select(from_avro(col("value"), USER_SCHEMA).as("rows"))
            .select("rows.*");

    //print avro schema converted to dataframe :)
    ds2.printSchema();

    StreamingQuery query1 = ds2
            .groupBy("str1")
            .count()
            .writeStream()
            .queryName("Test query")
            .outputMode("complete")
            .format("console")
            .start();

    query1.awaitTermination();

}
 
public static void main(String[] args) throws StreamingQueryException {
	System.setProperty("hadoop.home.dir", "C:\\softwares\\Winutils");
	SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("structured Streaming Example")
			.config("spark.sql.warehouse.dir", "file:////C:/Users/sgulati/spark-warehouse").getOrCreate();

	Dataset<Row> inStream = sparkSession.readStream().format("socket").option("host", "10.204.136.223")
			.option("port", 9999).load();

	Dataset<FlightDetails> dsFlightDetails = inStream.as(Encoders.STRING()).map(x -> {
		ObjectMapper mapper = new ObjectMapper();
		return mapper.readValue(x, FlightDetails.class);

	}, Encoders.bean(FlightDetails.class));
	
	
	dsFlightDetails.createOrReplaceTempView("flight_details");
	
	Dataset<Row> avdFlightDetails = sparkSession.sql("select flightId, avg(temperature) from flight_details group by flightId");
	
	StreamingQuery query = avdFlightDetails.writeStream()
			  .outputMode("complete")
			  .format("console")
			  .start();

			query.awaitTermination();
	

}
 
 类所在包
 类方法
 同包方法