类org.apache.spark.sql.streaming.DataStreamWriter源码实例Demo

下面列出了怎么用org.apache.spark.sql.streaming.DataStreamWriter的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: sylph   文件: StructuredStreamingSqlAnalyse.java
public void createSinkTable(SinkContext sinkContext, StructType tableSparkType)
{
    final String driverClass = (String) sinkContext.withConfig().get("type");
    IocFactory iocFactory = IocFactory.create(sparkBean, binder -> binder.bind(SinkContext.class, sinkContext));
    StructuredNodeLoader loader = new StructuredNodeLoader(connectorStore, iocFactory);

    UnaryOperator<Dataset<Row>> outputStream = dataSet -> {
        checkQueryAndTableSinkSchema(dataSet.schema(), tableSparkType, sinkContext.getSinkTable());
        DataStreamWriter<Row> writer = loader.loadSinkWithComplic(driverClass, sinkContext.withConfig()).apply(dataSet);
        if (!isCompile) {
            //UnsupportedOperationChecker.checkForContinuous();
            writer = writer.option("checkpointLocation", checkpointLocation);
            writer.start();
        }
        return null;
    };
    sinks.put(sinkContext.getSinkTable(), outputStream);
}
 
源代码2 项目: hudi   文件: HoodieJavaStreamingApp.java
/**
 * Setup configs for syncing to hive.
 * 
 * @param writer
 * @return
 */
private DataStreamWriter<Row> updateHiveSyncConfig(DataStreamWriter<Row> writer) {
  if (enableHiveSync) {
    LOG.info("Enabling Hive sync to " + hiveJdbcUrl);
    writer = writer.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), hiveTable)
        .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), hiveDB)
        .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), hiveJdbcUrl)
        .option(DataSourceWriteOptions.HIVE_USER_OPT_KEY(), hiveUser)
        .option(DataSourceWriteOptions.HIVE_PASS_OPT_KEY(), hivePass)
        .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY(), "true");
    if (useMultiPartitionKeys) {
      writer = writer.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "year,month,day").option(
          DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(),
          MultiPartKeysValueExtractor.class.getCanonicalName());
    } else {
      writer = writer.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "dateStr");
    }
  }
  return writer;
}
 
源代码3 项目: sylph   文件: StructuredNodeLoader.java
private static Sink<DataStreamWriter<Row>> loadRealTimeSink(RealTimeSink realTimeSink)
{
    return stream -> stream.foreach(new ForeachWriter<Row>()
    {
        @Override
        public void process(Row value)
        {
            realTimeSink.process(SparkRecord.make(value));
        }

        @Override
        public void close(Throwable errorOrNull)
        {
            realTimeSink.close(errorOrNull);
        }

        @Override
        public boolean open(long partitionId, long version)
        {
            try {
                return realTimeSink.open(partitionId, version);
            }
            catch (Exception e) {
                throw throwsException(e);
            }
        }
    });
}
 
源代码4 项目: sylph   文件: StructuredStreamingSqlAnalyse.java
@Override
public void selectQuery(SelectQuery statement)
        throws Exception
{
    Dataset<Row> df = sparkSession.sql(statement.toString());
    DataStreamWriter<Row> writer = df.writeStream()
            .foreach(new ConsoleWriter())
            .trigger(Trigger.Continuous("90 seconds"))
            //.option("checkpointLocation", checkpointLocation)
            .outputMode(OutputMode.Append());
    if (!isCompile) {
        writer.start();
    }
}
 
源代码5 项目: hudi   文件: HoodieJavaStreamingApp.java
/**
 * Hoodie spark streaming job.
 * 
 * @param streamingInput
 * @throws Exception
 */
public void stream(Dataset<Row> streamingInput) throws Exception {

  DataStreamWriter<Row> writer = streamingInput.writeStream().format("org.apache.hudi")
      .option("hoodie.insert.shuffle.parallelism", "2").option("hoodie.upsert.shuffle.parallelism", "2")
      .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY(), tableType)
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition")
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp")
      .option(HoodieWriteConfig.TABLE_NAME, tableName).option("checkpointLocation", streamingCheckpointingPath)
      .outputMode(OutputMode.Append());

  updateHiveSyncConfig(writer);
  writer.trigger(new ProcessingTime(500)).start(tablePath).awaitTermination(streamingDurationInMs);
}
 
源代码6 项目: beam   文件: TranslationContext.java
/** Starts the pipeline. */
public void startPipeline() {
  try {
    SparkStructuredStreamingPipelineOptions options =
        serializablePipelineOptions.get().as(SparkStructuredStreamingPipelineOptions.class);
    int datasetIndex = 0;
    for (Dataset<?> dataset : leaves) {
      if (options.isStreaming()) {
        // TODO: deal with Beam Discarding, Accumulating and Accumulating & Retracting	outputmodes
        // with DatastreamWriter.outputMode
        DataStreamWriter<?> dataStreamWriter = dataset.writeStream();
        // spark sets a default checkpoint dir if not set.
        if (options.getCheckpointDir() != null) {
          dataStreamWriter =
              dataStreamWriter.option("checkpointLocation", options.getCheckpointDir());
        }
        // TODO: Do not await termination here.
        dataStreamWriter.foreach(new NoOpForeachWriter<>()).start().awaitTermination();
      } else {
        if (options.getTestMode()) {
          LOG.debug("**** dataset {} catalyst execution plans ****", ++datasetIndex);
          dataset.explain(true);
        }
        // apply a dummy fn just to apply foreach action that will trigger the pipeline run in
        // spark
        dataset.foreach((ForeachFunction) t -> {});
      }
    }
  } catch (StreamingQueryException e) {
    throw new RuntimeException("Pipeline execution failed: " + e);
  }
}
 
public StreamingQuery start(final DataStreamWriter<?> writer, final String path) {
    Function0<StreamingQuery> runFunction = new AbstractFunction0<StreamingQuery>() {
        @Override
        public StreamingQuery apply() {
            return writer.start(path);
        }
    };
    return harness.startTest(runFunction);
}
 
public StreamingQuery start(final DataStreamWriter<?> writer) {
    Function0<StreamingQuery> runFunction = new AbstractFunction0<StreamingQuery>() {
        @Override
        public StreamingQuery apply() {
            return writer.start();
        }
    };
    return harness.startTest(runFunction);
}
 
public void run(final DataStreamWriter<?> writer, final String path) {
    Function0<StreamingQuery> runFunction = new AbstractFunction0<StreamingQuery>() {
        @Override
        public StreamingQuery apply() {
            return writer.start(path);
        }
    };
    harness.runTest(runFunction);
}
 
public void run(final DataStreamWriter<?> writer) {
    Function0<StreamingQuery> runFunction = new AbstractFunction0<StreamingQuery>() {
        @Override
        public StreamingQuery apply() {
            return writer.start();
        }
    };
    harness.runTest(runFunction);
}
 
 类所在包
 类方法
 同包方法