类org.apache.spark.sql.KeyValueGroupedDataset源码实例Demo

下面列出了怎么用org.apache.spark.sql.KeyValueGroupedDataset的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: beam   文件: GroupByKeyTranslatorBatch.java
@Override
public void translateTransform(
    PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> transform,
    TranslationContext context) {

  @SuppressWarnings("unchecked")
  final PCollection<KV<K, V>> inputPCollection = (PCollection<KV<K, V>>) context.getInput();
  Dataset<WindowedValue<KV<K, V>>> input = context.getDataset(inputPCollection);
  WindowingStrategy<?, ?> windowingStrategy = inputPCollection.getWindowingStrategy();
  KvCoder<K, V> kvCoder = (KvCoder<K, V>) inputPCollection.getCoder();
  Coder<V> valueCoder = kvCoder.getValueCoder();

  // group by key only
  Coder<K> keyCoder = kvCoder.getKeyCoder();
  KeyValueGroupedDataset<K, WindowedValue<KV<K, V>>> groupByKeyOnly =
      input.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromBeamCoder(keyCoder));

  // group also by windows
  WindowedValue.FullWindowedValueCoder<KV<K, Iterable<V>>> outputCoder =
      WindowedValue.FullWindowedValueCoder.of(
          KvCoder.of(keyCoder, IterableCoder.of(valueCoder)),
          windowingStrategy.getWindowFn().windowCoder());
  Dataset<WindowedValue<KV<K, Iterable<V>>>> output =
      groupByKeyOnly.flatMapGroups(
          new GroupAlsoByWindowViaOutputBufferFn<>(
              windowingStrategy,
              new InMemoryStateInternalsFactory<>(),
              SystemReduceFn.buffering(valueCoder),
              context.getSerializableOptions()),
          EncoderHelpers.fromBeamCoder(outputCoder));

  context.putDataset(context.getOutput(), output);
}
 
public static void main(String[] args) throws Exception {
//Read properties
Properties prop = PropertyFileReader.readPropertyFile();

//SparkSesion
SparkSession spark = SparkSession
	      .builder()
	      .appName("VideoStreamProcessor")
	      .master(prop.getProperty("spark.master.url"))
	      .getOrCreate();	

//directory to save image files with motion detected
final String processedImageDir = prop.getProperty("processed.output.dir");
logger.warn("Output directory for saving processed images is set to "+processedImageDir+". This is configured in processed.output.dir key of property file.");

//create schema for json message
StructType schema =  DataTypes.createStructType(new StructField[] { 
		DataTypes.createStructField("cameraId", DataTypes.StringType, true),
		DataTypes.createStructField("timestamp", DataTypes.TimestampType, true),
		DataTypes.createStructField("rows", DataTypes.IntegerType, true),
		DataTypes.createStructField("cols", DataTypes.IntegerType, true),
		DataTypes.createStructField("type", DataTypes.IntegerType, true),
		DataTypes.createStructField("data", DataTypes.StringType, true)
		});


//Create DataSet from stream messages from kafka
   Dataset<VideoEventData> ds = spark
     .readStream()
     .format("kafka")
     .option("kafka.bootstrap.servers", prop.getProperty("kafka.bootstrap.servers"))
     .option("subscribe", prop.getProperty("kafka.topic"))
     .option("kafka.max.partition.fetch.bytes", prop.getProperty("kafka.max.partition.fetch.bytes"))
     .option("kafka.max.poll.records", prop.getProperty("kafka.max.poll.records"))
     .load()
     .selectExpr("CAST(value AS STRING) as message")
     .select(functions.from_json(functions.col("message"),schema).as("json"))
     .select("json.*")
     .as(Encoders.bean(VideoEventData.class)); 
   
   //key-value pair of cameraId-VideoEventData
KeyValueGroupedDataset<String, VideoEventData> kvDataset = ds.groupByKey(new MapFunction<VideoEventData, String>() {
	@Override
	public String call(VideoEventData value) throws Exception {
		return value.getCameraId();
	}
}, Encoders.STRING());
	
//process
Dataset<VideoEventData> processedDataset = kvDataset.mapGroupsWithState(new MapGroupsWithStateFunction<String, VideoEventData, VideoEventData,VideoEventData>(){
	@Override
	public VideoEventData call(String key, Iterator<VideoEventData> values, GroupState<VideoEventData> state) throws Exception {
		logger.warn("CameraId="+key+" PartitionId="+TaskContext.getPartitionId());
		VideoEventData existing = null;
		//check previous state
		if (state.exists()) {
			existing = state.get();
		}
		//classify image
		VideoEventData processed = ImageProcessor.process(key,values,processedImageDir,existing);
		
		//update last processed
		if(processed != null){
			state.update(processed);
		}
		return processed;
	}}, Encoders.bean(VideoEventData.class), Encoders.bean(VideoEventData.class));

//start
 StreamingQuery query = processedDataset.writeStream()
	      .outputMode("update")
	      .format("console")
	      .start();
 
 //await
    query.awaitTermination();
}
 
public static void main(String[] args) throws Exception {
//Read properties
Properties prop = PropertyFileReader.readPropertyFile();

//SparkSesion
SparkSession spark = SparkSession
	      .builder()
	      .appName("VideoStreamProcessor")
	      .master(prop.getProperty("spark.master.url"))
	      .getOrCreate();	

//directory to save image files with motion detected
final String processedImageDir = prop.getProperty("processed.output.dir");
logger.warn("Output directory for saving processed images is set to "+processedImageDir+". This is configured in processed.output.dir key of property file.");

//create schema for json message
StructType schema =  DataTypes.createStructType(new StructField[] { 
		DataTypes.createStructField("cameraId", DataTypes.StringType, true),
		DataTypes.createStructField("timestamp", DataTypes.TimestampType, true),
		DataTypes.createStructField("rows", DataTypes.IntegerType, true),
		DataTypes.createStructField("cols", DataTypes.IntegerType, true),
		DataTypes.createStructField("type", DataTypes.IntegerType, true),
		DataTypes.createStructField("data", DataTypes.StringType, true)
		});


//Create DataSet from stream messages from kafka
   Dataset<VideoEventData> ds = spark
     .readStream()
     .format("kafka")
     .option("kafka.bootstrap.servers", prop.getProperty("kafka.bootstrap.servers"))
     .option("subscribe", prop.getProperty("kafka.topic"))
     .option("kafka.max.partition.fetch.bytes", prop.getProperty("kafka.max.partition.fetch.bytes"))
     .option("kafka.max.poll.records", prop.getProperty("kafka.max.poll.records"))
     .load()
     .selectExpr("CAST(value AS STRING) as message")
     .select(functions.from_json(functions.col("message"),schema).as("json"))
     .select("json.*")
     .as(Encoders.bean(VideoEventData.class)); 
   
   //key-value pair of cameraId-VideoEventData
KeyValueGroupedDataset<String, VideoEventData> kvDataset = ds.groupByKey(new MapFunction<VideoEventData, String>() {
	@Override
	public String call(VideoEventData value) throws Exception {
		return value.getCameraId();
	}
}, Encoders.STRING());
	
//process
Dataset<VideoEventData> processedDataset = kvDataset.mapGroupsWithState(new MapGroupsWithStateFunction<String, VideoEventData, VideoEventData,VideoEventData>(){
	@Override
	public VideoEventData call(String key, Iterator<VideoEventData> values, GroupState<VideoEventData> state) throws Exception {
		logger.warn("CameraId="+key+" PartitionId="+TaskContext.getPartitionId());
		VideoEventData existing = null;
		//check previous state
		if (state.exists()) {
			existing = state.get();
		}
		//detect motion
		VideoEventData processed = VideoMotionDetector.detectMotion(key,values,processedImageDir,existing);
		
		//update last processed
		if(processed != null){
			state.update(processed);
		}
		return processed;
	}}, Encoders.bean(VideoEventData.class), Encoders.bean(VideoEventData.class));

//start
 StreamingQuery query = processedDataset.writeStream()
	      .outputMode("update")
	      .format("console")
	      .start();
 
 //await
    query.awaitTermination();
}
 
源代码4 项目: beam   文件: CombinePerKeyTranslatorBatch.java
@Override
public void translateTransform(
    PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform,
    TranslationContext context) {

  Combine.PerKey combineTransform = (Combine.PerKey) transform;
  @SuppressWarnings("unchecked")
  final PCollection<KV<K, InputT>> input = (PCollection<KV<K, InputT>>) context.getInput();
  @SuppressWarnings("unchecked")
  final PCollection<KV<K, OutputT>> output = (PCollection<KV<K, OutputT>>) context.getOutput();
  @SuppressWarnings("unchecked")
  final Combine.CombineFn<InputT, AccumT, OutputT> combineFn =
      (Combine.CombineFn<InputT, AccumT, OutputT>) combineTransform.getFn();
  WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();

  Dataset<WindowedValue<KV<K, InputT>>> inputDataset = context.getDataset(input);

  KvCoder<K, InputT> inputCoder = (KvCoder<K, InputT>) input.getCoder();
  Coder<K> keyCoder = inputCoder.getKeyCoder();
  KvCoder<K, OutputT> outputKVCoder = (KvCoder<K, OutputT>) output.getCoder();
  Coder<OutputT> outputCoder = outputKVCoder.getValueCoder();

  KeyValueGroupedDataset<K, WindowedValue<KV<K, InputT>>> groupedDataset =
      inputDataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromBeamCoder(keyCoder));

  Coder<AccumT> accumulatorCoder = null;
  try {
    accumulatorCoder =
        combineFn.getAccumulatorCoder(
            input.getPipeline().getCoderRegistry(), inputCoder.getValueCoder());
  } catch (CannotProvideCoderException e) {
    throw new RuntimeException(e);
  }

  Dataset<Tuple2<K, Iterable<WindowedValue<OutputT>>>> combinedDataset =
      groupedDataset.agg(
          new AggregatorCombiner<K, InputT, AccumT, OutputT, BoundedWindow>(
                  combineFn, windowingStrategy, accumulatorCoder, outputCoder)
              .toColumn());

  // expand the list into separate elements and put the key back into the elements
  WindowedValue.WindowedValueCoder<KV<K, OutputT>> wvCoder =
      WindowedValue.FullWindowedValueCoder.of(
          outputKVCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
  Dataset<WindowedValue<KV<K, OutputT>>> outputDataset =
      combinedDataset.flatMap(
          (FlatMapFunction<
                  Tuple2<K, Iterable<WindowedValue<OutputT>>>, WindowedValue<KV<K, OutputT>>>)
              tuple2 -> {
                K key = tuple2._1();
                Iterable<WindowedValue<OutputT>> windowedValues = tuple2._2();
                List<WindowedValue<KV<K, OutputT>>> result = new ArrayList<>();
                for (WindowedValue<OutputT> windowedValue : windowedValues) {
                  KV<K, OutputT> kv = KV.of(key, windowedValue.getValue());
                  result.add(
                      WindowedValue.of(
                          kv,
                          windowedValue.getTimestamp(),
                          windowedValue.getWindows(),
                          windowedValue.getPane()));
                }
                return result.iterator();
              },
          EncoderHelpers.fromBeamCoder(wvCoder));
  context.putDataset(output, outputDataset);
}
 
 类所在包
 类方法
 同包方法