下面列出了怎么用org.apache.spark.sql.streaming.DataStreamWriter的API类实例代码及写法,或者点击链接到github查看源代码。
public void createSinkTable(SinkContext sinkContext, StructType tableSparkType)
{
final String driverClass = (String) sinkContext.withConfig().get("type");
IocFactory iocFactory = IocFactory.create(sparkBean, binder -> binder.bind(SinkContext.class, sinkContext));
StructuredNodeLoader loader = new StructuredNodeLoader(connectorStore, iocFactory);
UnaryOperator<Dataset<Row>> outputStream = dataSet -> {
checkQueryAndTableSinkSchema(dataSet.schema(), tableSparkType, sinkContext.getSinkTable());
DataStreamWriter<Row> writer = loader.loadSinkWithComplic(driverClass, sinkContext.withConfig()).apply(dataSet);
if (!isCompile) {
//UnsupportedOperationChecker.checkForContinuous();
writer = writer.option("checkpointLocation", checkpointLocation);
writer.start();
}
return null;
};
sinks.put(sinkContext.getSinkTable(), outputStream);
}
/**
* Setup configs for syncing to hive.
*
* @param writer
* @return
*/
private DataStreamWriter<Row> updateHiveSyncConfig(DataStreamWriter<Row> writer) {
if (enableHiveSync) {
LOG.info("Enabling Hive sync to " + hiveJdbcUrl);
writer = writer.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), hiveTable)
.option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), hiveDB)
.option(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), hiveJdbcUrl)
.option(DataSourceWriteOptions.HIVE_USER_OPT_KEY(), hiveUser)
.option(DataSourceWriteOptions.HIVE_PASS_OPT_KEY(), hivePass)
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY(), "true");
if (useMultiPartitionKeys) {
writer = writer.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "year,month,day").option(
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(),
MultiPartKeysValueExtractor.class.getCanonicalName());
} else {
writer = writer.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "dateStr");
}
}
return writer;
}
private static Sink<DataStreamWriter<Row>> loadRealTimeSink(RealTimeSink realTimeSink)
{
return stream -> stream.foreach(new ForeachWriter<Row>()
{
@Override
public void process(Row value)
{
realTimeSink.process(SparkRecord.make(value));
}
@Override
public void close(Throwable errorOrNull)
{
realTimeSink.close(errorOrNull);
}
@Override
public boolean open(long partitionId, long version)
{
try {
return realTimeSink.open(partitionId, version);
}
catch (Exception e) {
throw throwsException(e);
}
}
});
}
@Override
public void selectQuery(SelectQuery statement)
throws Exception
{
Dataset<Row> df = sparkSession.sql(statement.toString());
DataStreamWriter<Row> writer = df.writeStream()
.foreach(new ConsoleWriter())
.trigger(Trigger.Continuous("90 seconds"))
//.option("checkpointLocation", checkpointLocation)
.outputMode(OutputMode.Append());
if (!isCompile) {
writer.start();
}
}
/**
* Hoodie spark streaming job.
*
* @param streamingInput
* @throws Exception
*/
public void stream(Dataset<Row> streamingInput) throws Exception {
DataStreamWriter<Row> writer = streamingInput.writeStream().format("org.apache.hudi")
.option("hoodie.insert.shuffle.parallelism", "2").option("hoodie.upsert.shuffle.parallelism", "2")
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY(), tableType)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp")
.option(HoodieWriteConfig.TABLE_NAME, tableName).option("checkpointLocation", streamingCheckpointingPath)
.outputMode(OutputMode.Append());
updateHiveSyncConfig(writer);
writer.trigger(new ProcessingTime(500)).start(tablePath).awaitTermination(streamingDurationInMs);
}
/** Starts the pipeline. */
public void startPipeline() {
try {
SparkStructuredStreamingPipelineOptions options =
serializablePipelineOptions.get().as(SparkStructuredStreamingPipelineOptions.class);
int datasetIndex = 0;
for (Dataset<?> dataset : leaves) {
if (options.isStreaming()) {
// TODO: deal with Beam Discarding, Accumulating and Accumulating & Retracting outputmodes
// with DatastreamWriter.outputMode
DataStreamWriter<?> dataStreamWriter = dataset.writeStream();
// spark sets a default checkpoint dir if not set.
if (options.getCheckpointDir() != null) {
dataStreamWriter =
dataStreamWriter.option("checkpointLocation", options.getCheckpointDir());
}
// TODO: Do not await termination here.
dataStreamWriter.foreach(new NoOpForeachWriter<>()).start().awaitTermination();
} else {
if (options.getTestMode()) {
LOG.debug("**** dataset {} catalyst execution plans ****", ++datasetIndex);
dataset.explain(true);
}
// apply a dummy fn just to apply foreach action that will trigger the pipeline run in
// spark
dataset.foreach((ForeachFunction) t -> {});
}
}
} catch (StreamingQueryException e) {
throw new RuntimeException("Pipeline execution failed: " + e);
}
}
public StreamingQuery start(final DataStreamWriter<?> writer, final String path) {
Function0<StreamingQuery> runFunction = new AbstractFunction0<StreamingQuery>() {
@Override
public StreamingQuery apply() {
return writer.start(path);
}
};
return harness.startTest(runFunction);
}
public StreamingQuery start(final DataStreamWriter<?> writer) {
Function0<StreamingQuery> runFunction = new AbstractFunction0<StreamingQuery>() {
@Override
public StreamingQuery apply() {
return writer.start();
}
};
return harness.startTest(runFunction);
}
public void run(final DataStreamWriter<?> writer, final String path) {
Function0<StreamingQuery> runFunction = new AbstractFunction0<StreamingQuery>() {
@Override
public StreamingQuery apply() {
return writer.start(path);
}
};
harness.runTest(runFunction);
}
public void run(final DataStreamWriter<?> writer) {
Function0<StreamingQuery> runFunction = new AbstractFunction0<StreamingQuery>() {
@Override
public StreamingQuery apply() {
return writer.start();
}
};
harness.runTest(runFunction);
}