下面列出了怎么用org.apache.spark.sql.streaming.StreamingQuery的API类实例代码及写法,或者点击链接到github查看源代码。
protected void stop() {
try {
// TODO: await any outstanding queries on the session if this is streaming.
if (isStreaming) {
for (StreamingQuery query : sparkSession.streams().active()) {
query.stop();
}
}
} catch (Exception e) {
throw beamExceptionFrom(e);
} finally {
sparkSession.stop();
if (Objects.equals(state, State.RUNNING)) {
this.state = State.STOPPED;
}
}
}
@Test
public void testSparkStreamingWriteFailsUnknownTransform() throws IOException {
File parent = temp.newFolder("avro");
File location = new File(parent, "test");
File dataFolder = new File(location, "data");
dataFolder.mkdirs();
File checkpoint = new File(parent, "checkpoint");
checkpoint.mkdirs();
HadoopTables tables = new HadoopTables(CONF);
tables.create(SCHEMA, UNKNOWN_SPEC, location.toString());
MemoryStream<Integer> inputStream = newMemoryStream(1, spark.sqlContext(), Encoders.INT());
StreamingQuery query = inputStream.toDF()
.selectExpr("value AS id", "CAST (value AS STRING) AS data")
.writeStream()
.outputMode("append")
.format("iceberg")
.option("checkpointLocation", checkpoint.toString())
.option("path", location.toString())
.start();
List<Integer> batch1 = Lists.newArrayList(1, 2);
send(batch1, inputStream);
AssertHelpers.assertThrows("Should reject streaming write with unsupported transform",
StreamingQueryException.class, "Cannot write using unsupported transforms: zero",
query::processAllAvailable);
}
@Override
public UnaryOperator<Dataset<Row>> loadSink(String driverStr, Map<String, Object> config)
{
return stream -> {
//-------启动job-------
StreamingQuery streamingQuery = loadSinkWithComplic(driverStr, config).apply(stream).start(); //start job
//streamingQuery.stop()
return null;
};
}
private void start() throws TimeoutException {
log.debug("-> start()");
SparkSession spark = SparkSession.builder()
.appName("Read lines over a file stream").master("local")
.getOrCreate();
Dataset<Row> df = spark
.readStream()
.format("text")
.load(StreamingUtils.getInputDirectory());
StreamingQuery query = df
.writeStream()
.outputMode(OutputMode.Update())
.format("console").start();
try {
query.awaitTermination();
} catch (StreamingQueryException e) {
log.error("Exception while waiting for query to end {}.", e
.getMessage(),
e);
}
// In this case everything is a string
df.show();
df.printSchema();
}
private void start() throws TimeoutException {
log.debug("-> start()");
SparkSession spark = SparkSession.builder()
.appName("Read lines over a file stream")
.master("local")
.getOrCreate();
Dataset<Row> df = spark
.readStream()
.format("text")
.load(StreamingUtils.getInputDirectory());
StreamingQuery query = df
.writeStream()
.outputMode(OutputMode.Update())
.format("console")
.start();
try {
query.awaitTermination();
} catch (StreamingQueryException e) {
log.error(
"Exception while waiting for query to end {}.",
e.getMessage(),
e);
}
// Never executed
df.show();
df.printSchema();
}
public StreamingQuery start(final DataStreamWriter<?> writer, final String path) {
Function0<StreamingQuery> runFunction = new AbstractFunction0<StreamingQuery>() {
@Override
public StreamingQuery apply() {
return writer.start(path);
}
};
return harness.startTest(runFunction);
}
public StreamingQuery start(final DataStreamWriter<?> writer) {
Function0<StreamingQuery> runFunction = new AbstractFunction0<StreamingQuery>() {
@Override
public StreamingQuery apply() {
return writer.start();
}
};
return harness.startTest(runFunction);
}
public void run(final DataStreamWriter<?> writer, final String path) {
Function0<StreamingQuery> runFunction = new AbstractFunction0<StreamingQuery>() {
@Override
public StreamingQuery apply() {
return writer.start(path);
}
};
harness.runTest(runFunction);
}
public void run(final DataStreamWriter<?> writer) {
Function0<StreamingQuery> runFunction = new AbstractFunction0<StreamingQuery>() {
@Override
public StreamingQuery apply() {
return writer.start();
}
};
harness.runTest(runFunction);
}
public static void main(String[] args) throws InterruptedException, StreamingQueryException {
System.setProperty("hadoop.home.dir", HADOOP_HOME_DIR_VALUE);
final SparkConf conf = new SparkConf()
.setMaster(RUN_LOCAL_WITH_AVAILABLE_CORES)
.setAppName(APPLICATION_NAME)
.set("spark.sql.caseSensitive", CASE_SENSITIVE);
SparkSession sparkSession = SparkSession.builder()
.config(conf)
.getOrCreate();
Dataset<Row> meetupDF = sparkSession.readStream()
.format(STREAM_FORMAT)
.option("kafka.bootstrap.servers", KAFKA_BROKERS)
.option("subscribe", KAFKA_TOPIC)
.load();
meetupDF.printSchema();
Dataset<Row> rsvpAndTimestampDF = meetupDF
.select(col("timestamp"),
from_json(col("value").cast("string"), RSVP_SCHEMA)
.alias("rsvp"))
.alias("meetup")
.select("meetup.*");
rsvpAndTimestampDF.printSchema();
Dataset<Row> window = rsvpAndTimestampDF
.withWatermark("timestamp", "1 minute")
.groupBy(
window(col("timestamp"), "4 minutes", "2 minutes"),
col("rsvp.guests"))
.count();
StreamingQuery query = window.writeStream()
.outputMode("complete")
.format("console")
.option("checkpointLocation", CHECKPOINT_LOCATION)
.option("truncate", false)
.start();
query.awaitTermination();
}
public static void main(String[] args) throws InterruptedException, StreamingQueryException {
System.setProperty("hadoop.home.dir", HADOOP_HOME_DIR_VALUE);
// * the schema can be written on disk, and read from disk
// * the schema is not mandatory to be complete, it can contain only the needed fields
StructType RSVP_SCHEMA = new StructType()
.add("event",
new StructType()
.add("event_id", StringType, true)
.add("event_name", StringType, true)
.add("event_url", StringType, true)
.add("time", LongType, true))
.add("group",
new StructType()
.add("group_city", StringType, true)
.add("group_country", StringType, true)
.add("group_id", LongType, true)
.add("group_lat", DoubleType, true)
.add("group_lon", DoubleType, true)
.add("group_name", StringType, true)
.add("group_state", StringType, true)
.add("group_topics", DataTypes.createArrayType(
new StructType()
.add("topicName", StringType, true)
.add("urlkey", StringType, true)), true)
.add("group_urlname", StringType, true))
.add("guests", LongType, true)
.add("member",
new StructType()
.add("member_id", LongType, true)
.add("member_name", StringType, true)
.add("photo", StringType, true))
.add("mtime", LongType, true)
.add("response", StringType, true)
.add("rsvp_id", LongType, true)
.add("venue",
new StructType()
.add("lat", DoubleType, true)
.add("lon", DoubleType, true)
.add("venue_id", LongType, true)
.add("venue_name", StringType, true))
.add("visibility", StringType, true);
final SparkConf conf = new SparkConf()
.setMaster(RUN_LOCAL_WITH_AVAILABLE_CORES)
.setAppName(APPLICATION_NAME)
.set("spark.sql.caseSensitive", CASE_SENSITIVE);
SparkSession spark = SparkSession
.builder()
.config(conf)
.getOrCreate();
PipelineModel pipelineModel = PipelineModel.load(MODEL_FOLDER_PATH);
Dataset<Row> meetupStream = spark.readStream()
.format(KAFKA_FORMAT)
.option("kafka.bootstrap.servers", KAFKA_BROKERS)
.option("subscribe", KAFKA_TOPIC)
.load();
Dataset<Row> gatheredDF = meetupStream.select(
(from_json(col("value").cast("string"), RSVP_SCHEMA))
.alias("rsvp"))
.alias("meetup")
.select("meetup.*");
Dataset<Row> filteredDF = gatheredDF.filter(e -> !e.anyNull());
Dataset<Row> preparedDF = filteredDF.select(
col("rsvp.group.group_city"),
col("rsvp.group.group_lat"), col("rsvp.group.group_lon"),
col("rsvp.response")
);
preparedDF.printSchema();
Dataset<Row> predictionDF = pipelineModel.transform(preparedDF);
StreamingQuery query = predictionDF.writeStream()
.format(JSON_FORMAT)
.option("path", RESULT_FOLDER_PATH)
.option("checkpointLocation", CHECKPOINT_LOCATION)
.trigger(Trigger.ProcessingTime(QUERY_INTERVAL_SECONDS))
.option("truncate", false)
.start();
query.awaitTermination();
}
public static void main(String[] args) throws Exception {
//Read properties
Properties prop = PropertyFileReader.readPropertyFile();
//SparkSesion
SparkSession spark = SparkSession
.builder()
.appName("VideoStreamProcessor")
.master(prop.getProperty("spark.master.url"))
.getOrCreate();
//directory to save image files with motion detected
final String processedImageDir = prop.getProperty("processed.output.dir");
logger.warn("Output directory for saving processed images is set to "+processedImageDir+". This is configured in processed.output.dir key of property file.");
//create schema for json message
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("cameraId", DataTypes.StringType, true),
DataTypes.createStructField("timestamp", DataTypes.TimestampType, true),
DataTypes.createStructField("rows", DataTypes.IntegerType, true),
DataTypes.createStructField("cols", DataTypes.IntegerType, true),
DataTypes.createStructField("type", DataTypes.IntegerType, true),
DataTypes.createStructField("data", DataTypes.StringType, true)
});
//Create DataSet from stream messages from kafka
Dataset<VideoEventData> ds = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", prop.getProperty("kafka.bootstrap.servers"))
.option("subscribe", prop.getProperty("kafka.topic"))
.option("kafka.max.partition.fetch.bytes", prop.getProperty("kafka.max.partition.fetch.bytes"))
.option("kafka.max.poll.records", prop.getProperty("kafka.max.poll.records"))
.load()
.selectExpr("CAST(value AS STRING) as message")
.select(functions.from_json(functions.col("message"),schema).as("json"))
.select("json.*")
.as(Encoders.bean(VideoEventData.class));
//key-value pair of cameraId-VideoEventData
KeyValueGroupedDataset<String, VideoEventData> kvDataset = ds.groupByKey(new MapFunction<VideoEventData, String>() {
@Override
public String call(VideoEventData value) throws Exception {
return value.getCameraId();
}
}, Encoders.STRING());
//process
Dataset<VideoEventData> processedDataset = kvDataset.mapGroupsWithState(new MapGroupsWithStateFunction<String, VideoEventData, VideoEventData,VideoEventData>(){
@Override
public VideoEventData call(String key, Iterator<VideoEventData> values, GroupState<VideoEventData> state) throws Exception {
logger.warn("CameraId="+key+" PartitionId="+TaskContext.getPartitionId());
VideoEventData existing = null;
//check previous state
if (state.exists()) {
existing = state.get();
}
//classify image
VideoEventData processed = ImageProcessor.process(key,values,processedImageDir,existing);
//update last processed
if(processed != null){
state.update(processed);
}
return processed;
}}, Encoders.bean(VideoEventData.class), Encoders.bean(VideoEventData.class));
//start
StreamingQuery query = processedDataset.writeStream()
.outputMode("update")
.format("console")
.start();
//await
query.awaitTermination();
}
public static void main(String[] args) throws Exception {
if (args.length < 3) {
System.err.println("Usage: JavaStructuredNetworkWordCountWindowed <hostname> <port>" +
" <window duration in seconds> [<slide duration in seconds>]");
System.exit(1);
}
String host = args[0];
int port = Integer.parseInt(args[1]);
int windowSize = Integer.parseInt(args[2]);
int slideSize = (args.length == 3) ? windowSize : Integer.parseInt(args[3]);
if (slideSize > windowSize) {
System.err.println("<slide duration> must be less than or equal to <window duration>");
}
String windowDuration = windowSize + " seconds";
String slideDuration = slideSize + " seconds";
SparkSession spark = SparkSession
.builder()
.appName("JavaStructuredNetworkWordCountWindowed")
.getOrCreate();
// Create DataFrame representing the stream of input lines from connection to host:port
Dataset<Row> lines = spark
.readStream()
.format("socket")
.option("host", host)
.option("port", port)
.option("includeTimestamp", true)
.load();
// Split the lines into words, retaining timestamps
Dataset<Row> words = lines
.as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()))
.flatMap(
new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() {
@Override
public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> t) {
List<Tuple2<String, Timestamp>> result = new ArrayList<Tuple2<String, Timestamp>>();
for (String word : t._1.split(" ")) {
result.add(new Tuple2<String, Timestamp>(word, t._2));
}
return result.iterator();
}
},
Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())
).toDF("word", "timestamp");
// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
functions.window(words.col("timestamp"), windowDuration, slideDuration),
words.col("word")
).count().orderBy("window");
// Start running the query that prints the windowed word counts to the console
StreamingQuery query = windowedCounts.writeStream()
.outputMode("complete")
.format("console")
.option("truncate", "false")
.start();
query.awaitTermination();
}
public static void main(String[] args) throws Exception {
if (args.length < 3) {
System.err.println("Usage: JavaStructuredKafkaWordCount <bootstrap-servers> " +
"<subscribe-type> <topics>");
System.exit(1);
}
String bootstrapServers = args[0];
String subscribeType = args[1];
String topics = args[2];
SparkSession spark = SparkSession
.builder()
.appName("JavaStructuredKafkaWordCount")
.getOrCreate();
// Create DataSet representing the stream of input lines from kafka
Dataset<String> lines = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option(subscribeType, topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING());
// Generate running word count
Dataset<Row> wordCounts = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String x) {
return Arrays.asList(x.split(" ")).iterator();
}
}, Encoders.STRING()).groupBy("value").count();
// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
.outputMode("complete")
.format("console")
.start();
query.awaitTermination();
}
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: JavaStructuredNetworkWordCount <hostname> <port>");
System.exit(1);
}
String host = args[0];
int port = Integer.parseInt(args[1]);
SparkSession spark = SparkSession
.builder()
.appName("JavaStructuredNetworkWordCount")
.getOrCreate();
// Create DataFrame representing the stream of input lines from connection to host:port
Dataset<Row> lines = spark
.readStream()
.format("socket")
.option("host", host)
.option("port", port)
.load();
// Split the lines into words
Dataset<String> words = lines.as(Encoders.STRING())
.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String x) {
return Arrays.asList(x.split(" ")).iterator();
}
}, Encoders.STRING());
// Generate running word count
Dataset<Row> wordCounts = words.groupBy("value").count();
// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
.outputMode("complete")
.format("console")
.start();
query.awaitTermination();
}
public static void main(String[] args) throws Exception {
//Read properties
Properties prop = PropertyFileReader.readPropertyFile();
//SparkSesion
SparkSession spark = SparkSession
.builder()
.appName("VideoStreamProcessor")
.master(prop.getProperty("spark.master.url"))
.getOrCreate();
//directory to save image files with motion detected
final String processedImageDir = prop.getProperty("processed.output.dir");
logger.warn("Output directory for saving processed images is set to "+processedImageDir+". This is configured in processed.output.dir key of property file.");
//create schema for json message
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("cameraId", DataTypes.StringType, true),
DataTypes.createStructField("timestamp", DataTypes.TimestampType, true),
DataTypes.createStructField("rows", DataTypes.IntegerType, true),
DataTypes.createStructField("cols", DataTypes.IntegerType, true),
DataTypes.createStructField("type", DataTypes.IntegerType, true),
DataTypes.createStructField("data", DataTypes.StringType, true)
});
//Create DataSet from stream messages from kafka
Dataset<VideoEventData> ds = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", prop.getProperty("kafka.bootstrap.servers"))
.option("subscribe", prop.getProperty("kafka.topic"))
.option("kafka.max.partition.fetch.bytes", prop.getProperty("kafka.max.partition.fetch.bytes"))
.option("kafka.max.poll.records", prop.getProperty("kafka.max.poll.records"))
.load()
.selectExpr("CAST(value AS STRING) as message")
.select(functions.from_json(functions.col("message"),schema).as("json"))
.select("json.*")
.as(Encoders.bean(VideoEventData.class));
//key-value pair of cameraId-VideoEventData
KeyValueGroupedDataset<String, VideoEventData> kvDataset = ds.groupByKey(new MapFunction<VideoEventData, String>() {
@Override
public String call(VideoEventData value) throws Exception {
return value.getCameraId();
}
}, Encoders.STRING());
//process
Dataset<VideoEventData> processedDataset = kvDataset.mapGroupsWithState(new MapGroupsWithStateFunction<String, VideoEventData, VideoEventData,VideoEventData>(){
@Override
public VideoEventData call(String key, Iterator<VideoEventData> values, GroupState<VideoEventData> state) throws Exception {
logger.warn("CameraId="+key+" PartitionId="+TaskContext.getPartitionId());
VideoEventData existing = null;
//check previous state
if (state.exists()) {
existing = state.get();
}
//detect motion
VideoEventData processed = VideoMotionDetector.detectMotion(key,values,processedImageDir,existing);
//update last processed
if(processed != null){
state.update(processed);
}
return processed;
}}, Encoders.bean(VideoEventData.class), Encoders.bean(VideoEventData.class));
//start
StreamingQuery query = processedDataset.writeStream()
.outputMode("update")
.format("console")
.start();
//await
query.awaitTermination();
}
public static void main(String[] args) throws StreamingQueryException, TimeoutException {
//set log4j programmatically
LogManager.getLogger("org.apache.spark").setLevel(Level.WARN);
LogManager.getLogger("org.apache.kafka").setLevel(Level.WARN);
LogManager.getLogger("akka").setLevel(Level.ERROR);
//on windows we may need to configure winutils if hadoop_home is not set
//System.setProperty("hadoop.home.dir", "c:/app/hadoop");
//configure Spark
SparkConf conf = new SparkConf()
.setAppName("kafka-structured")
.set("spark.driver.bindAddress", "localhost")
.setMaster("local[*]");
//initialize spark session
SparkSession sparkSession = SparkSession
.builder()
.config(conf)
.getOrCreate();
//reduce task number
sparkSession.sqlContext().setConf("spark.sql.shuffle.partitions", "3");
//data stream from kafka
Dataset<Row> ds1 = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "mytopic")
.option("startingOffsets", "earliest")
.load();
//print kafka schema
ds1.printSchema();
//start the streaming query
Dataset<Row> ds2 = ds1
.select(from_avro(col("value"), USER_SCHEMA).as("rows"))
.select("rows.*");
//print avro schema converted to dataframe :)
ds2.printSchema();
StreamingQuery query1 = ds2
.groupBy("str1")
.count()
.writeStream()
.queryName("Test query")
.outputMode("complete")
.format("console")
.start();
query1.awaitTermination();
}
public static void main(String[] args) throws StreamingQueryException {
System.setProperty("hadoop.home.dir", "C:\\softwares\\Winutils");
SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("structured Streaming Example")
.config("spark.sql.warehouse.dir", "file:////C:/Users/sgulati/spark-warehouse").getOrCreate();
Dataset<Row> inStream = sparkSession.readStream().format("socket").option("host", "10.204.136.223")
.option("port", 9999).load();
Dataset<FlightDetails> dsFlightDetails = inStream.as(Encoders.STRING()).map(x -> {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(x, FlightDetails.class);
}, Encoders.bean(FlightDetails.class));
dsFlightDetails.createOrReplaceTempView("flight_details");
Dataset<Row> avdFlightDetails = sparkSession.sql("select flightId, avg(temperature) from flight_details group by flightId");
StreamingQuery query = avdFlightDetails.writeStream()
.outputMode("complete")
.format("console")
.start();
query.awaitTermination();
}