下面列出了怎么用org.apache.beam.sdk.transforms.View的API类实例代码及写法,或者点击链接到github查看源代码。
private <ElemT, ViewT> void translateTyped(
View.CreatePCollectionView<ElemT, ViewT> transform, TranslationContext context) {
StepTranslationContext stepContext =
context.addStep(transform, "CollectionToSingleton");
PCollection<ElemT> input = context.getInput(transform);
stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
stepContext.addInput(
PropertyNames.WINDOWING_STRATEGY,
byteArrayToJsonString(
serializeWindowingStrategy(windowingStrategy, context.getPipelineOptions())));
stepContext.addInput(
PropertyNames.IS_MERGING_WINDOW_FN,
!windowingStrategy.getWindowFn().isNonMerging());
stepContext.addCollectionToSingletonOutput(
input, PropertyNames.OUTPUT, transform.getView());
}
@Test
public void writeForElementInMultipleWindowsSucceeds() throws Exception {
ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
for (Object materializedValue :
materializeValuesFor(singletonView.getPipeline().getOptions(), View.asSingleton(), 2.875)) {
valuesBuilder.add(
WindowedValue.of(
materializedValue,
FIRST_WINDOW.maxTimestamp().minus(200L),
ImmutableList.of(FIRST_WINDOW, SECOND_WINDOW),
PaneInfo.ON_TIME_AND_ONLY_FIRING));
}
container.write(singletonView, valuesBuilder.build());
assertThat(
container
.createReaderForViews(ImmutableList.of(singletonView))
.get(singletonView, FIRST_WINDOW),
equalTo(2.875));
assertThat(
container
.createReaderForViews(ImmutableList.of(singletonView))
.get(singletonView, SECOND_WINDOW),
equalTo(2.875));
}
/**
* @param options
* @param pipeline
* @param readContent
* @return
*/
private static PCollection<InputContent> filterAlreadyProcessedUrls(
PCollection<InputContent> readContent, Pipeline pipeline,
IndexerPipelineOptions options) {
PCollection<InputContent> contentToProcess;
String query = IndexerPipelineUtils.buildBigQueryProcessedUrlsQuery(options);
PCollection<KV<String,Long>> alreadyProcessedUrls = pipeline
.apply("Get processed URLs",BigQueryIO.read().fromQuery(query))
.apply(ParDo.of(new GetUrlFn()));
final PCollectionView<Map<String,Long>> alreadyProcessedUrlsSideInput =
alreadyProcessedUrls.apply(View.<String,Long>asMap());
contentToProcess = readContent
.apply(ParDo.of(new FilterProcessedUrls(alreadyProcessedUrlsSideInput))
.withSideInputs(alreadyProcessedUrlsSideInput));
return contentToProcess;
}
/** Creates a simple pipeline with a {@link Combine.GroupedValues} with side inputs. */
private static TestPipeline createCombineGroupedValuesWithSideInputsPipeline() {
TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
PCollection<KV<String, Integer>> input =
pipeline
.apply(Create.of(KV.of("key", 1)))
.setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
PCollection<String> sideInput = pipeline.apply(Create.of("side input"));
PCollectionView<String> sideInputView = sideInput.apply(View.asSingleton());
input
.apply(GroupByKey.create())
.apply(
Combine.<String, Integer, Integer>groupedValues(new SumCombineFnWithContext())
.withSideInputs(sideInputView));
return pipeline;
}
@Override
public PCollectionView<Integer> expand(PCollection<T> input) {
return input
.getPipeline()
.apply(Create.of(0))
.apply(
"FixedNumShards",
ParDo.of(
new DoFn<Integer, Integer>() {
@ProcessElement
public void outputNumShards(ProcessContext ctxt) {
checkArgument(
numShards.isAccessible(),
"NumShards must be accessible at runtime to use constant sharding");
ctxt.output(numShards.get());
}
}))
.apply(View.<Integer>asSingleton());
}
@Test
public void throwsExceptionWhenDelimiterIsNullAndHeadersAreSet() {
PCollectionView<List<String>> header =
testPipeline.apply(Create.of("header")).apply(View.asList());
assertThrows(
"Column delimiter should be set if headers are present.",
IllegalArgumentException.class,
() ->
DLPReidentifyText.newBuilder()
.setProjectId(PROJECT_ID)
.setBatchSizeBytes(BATCH_SIZE_SMALL)
.setReidentifyTemplateName(TEMPLATE_NAME)
.setHeaderColumns(header)
.build());
testPipeline.run().waitUntilFinish();
}
@Test
public void testPassThroughThenCleanupExecuted() throws Exception {
p.apply(Create.empty(VarIntCoder.of()))
.apply(
new PassThroughThenCleanup<>(
new PassThroughThenCleanup.CleanupOperation() {
@Override
void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception {
throw new RuntimeException("cleanup executed");
}
},
p.apply("Create1", Create.of("")).apply(View.asSingleton())));
thrown.expect(RuntimeException.class);
thrown.expectMessage("cleanup executed");
p.run();
}
@Test
public void getViewsReturnsViews() {
PCollectionView<List<String>> listView =
p.apply("listCreate", Create.of("foo", "bar"))
.apply(
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void processElement(DoFn<String, String>.ProcessContext c)
throws Exception {
c.output(Integer.toString(c.element().length()));
}
}))
.apply(View.asList());
PCollectionView<Object> singletonView =
p.apply("singletonCreate", Create.<Object>of(1, 2, 3)).apply(View.asSingleton());
p.replaceAll(
DirectRunner.fromOptions(TestPipeline.testingPipelineOptions())
.defaultTransformOverrides());
p.traverseTopologically(visitor);
assertThat(visitor.getGraph().getViews(), Matchers.containsInAnyOrder(listView, singletonView));
}
/**
* Creates {@link PCollectionView} with one {@link Configuration} based on the set source of the
* configuration.
*
* @param input input data
* @return PCollectionView with single {@link Configuration}
* @see Builder#withConfiguration(Configuration)
* @see Builder#withConfigurationTransform(PTransform)
*/
private PCollectionView<Configuration> createConfigurationView(
PCollection<KV<KeyT, ValueT>> input) {
PCollectionView<Configuration> config;
if (configuration != null) {
config =
input
.getPipeline()
.apply("CreateOutputConfig", Create.<Configuration>of(configuration))
.apply(View.<Configuration>asSingleton().withDefaultValue(configuration));
} else {
config = input.apply("TransformDataIntoConfig", configTransform);
}
return config;
}
/**
* Main function for the BEAM program.
*
* @param args arguments.
*/
public static void main(final String[] args) {
final String inputFilePath = args[0];
final String outputFilePath = args[1];
final PipelineOptions options = NemoPipelineOptionsFactory.create();
final Pipeline p = Pipeline.create(options);
final PCollection<String> elemCollection = GenericSourceSink.read(p, inputFilePath);
final PCollectionView<Iterable<String>> allCollection = elemCollection.apply(View.<String>asIterable());
final PCollection<String> result = elemCollection.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(final ProcessContext c) {
final String line = c.element();
final Iterable<String> all = c.sideInput(allCollection);
final Optional<String> appended = StreamSupport.stream(all.spliterator(), false)
.reduce((l, r) -> l + '\n' + r);
if (appended.isPresent()) {
c.output("line: " + line + "\n" + appended.get());
} else {
c.output("error");
}
}
}).withSideInputs(allCollection)
);
GenericSourceSink.write(result, outputFilePath);
p.run().waitUntilFinish();
}
/**
* Main function for the MR BEAM program.
*
* @param args arguments.
*/
public static void main(final String[] args) {
final String outputFilePath = args[0];
final Window<Long> windowFn = Window
.<Long>into(SlidingWindows.of(Duration.standardSeconds(2))
.every(Duration.standardSeconds(1)));
final PipelineOptions options = NemoPipelineOptionsFactory.create();
options.setJobName("WindowedBroadcast");
final Pipeline p = Pipeline.create(options);
final PCollection<Long> windowedElements = getSource(p).apply(windowFn);
final PCollectionView<List<Long>> windowedView = windowedElements.apply(View.asList());
windowedElements.apply(ParDo.of(new DoFn<Long, String>() {
@ProcessElement
public void processElement(final ProcessContext c) {
final Long anElementInTheWindow = c.element();
final List<Long> allElementsInTheWindow = c.sideInput(windowedView);
System.out.println(anElementInTheWindow + " / " + allElementsInTheWindow);
if (!allElementsInTheWindow.contains(anElementInTheWindow)) {
throw new RuntimeException(anElementInTheWindow + " not in " + allElementsInTheWindow.toString());
} else {
c.output(anElementInTheWindow + " is in " + allElementsInTheWindow);
}
}
}).withSideInputs(windowedView)
).apply(new WriteOneFilePerWindow(outputFilePath, 1));
p.run().waitUntilFinish();
}
@Test
public void createViewWithViewFn() {
PCollection<Integer> input = p.apply(Create.of(1));
PCollectionView<Iterable<Integer>> view = input.apply(View.asIterable());
ViewFn<?, ?> viewFn = view.getViewFn();
CreatePCollectionView<?, ?> createView = CreatePCollectionView.of(view);
PTransformMatcher matcher = PTransformMatchers.createViewWithViewFn(viewFn.getClass());
assertThat(matcher.matches(getAppliedTransform(createView)), is(true));
}
@Override
public PCollectionView<Transaction> expand(PBegin input) {
getSpannerConfig().validate();
return input
.apply(Create.of(1))
.apply("Create transaction", ParDo.of(new LocalCreateTransactionFn(this)))
.apply("As PCollectionView", View.asSingleton());
}
@Override
public PCollection<Mutation> expand(PCollection<KV<String, String>> filesToTables) {
// Map<filename,tablename>
PCollectionView<Map<String, String>> filenamesToTableNamesMapView =
filesToTables.apply("asView", View.asMap());
return filesToTables
.apply("Get Filenames", Keys.create())
// PCollection<String>
.apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW))
// PCollection<Match.Metadata>
.apply(FileIO.readMatches())
// Pcollection<FileIO.ReadableFile>
.apply(
"Split into ranges",
ParDo.of(
new SplitIntoRangesFn(
SplitIntoRangesFn.DEFAULT_BUNDLE_SIZE, filenamesToTableNamesMapView))
.withSideInputs(filenamesToTableNamesMapView))
.setCoder(FileShard.Coder.of())
// PCollection<FileShard>
.apply("Reshuffle", Reshuffle.viaRandomKey())
// PCollection<FileShard>
.apply("Read ranges", ParDo.of(new ReadFileRangesFn(ddlView)).withSideInputs(ddlView));
}
@Override
public PCollection<T> expand(PCollection<T> input) {
// See https://issues.apache.org/jira/browse/BEAM-2803
// We use a combined approach to "break fusion" here:
// (see https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion)
// 1) force the data to be materialized by passing it as a side input to an identity fn,
// then 2) reshuffle it with a random key. Initial materialization provides some parallelism
// and ensures that data to be shuffled can be generated in parallel, while reshuffling
// provides perfect parallelism.
// In most cases where a "fusion break" is needed, a simple reshuffle would be sufficient.
// The current approach is necessary only to support the particular case of JdbcIO where
// a single query may produce many gigabytes of query results.
PCollectionView<Iterable<T>> empty =
input
.apply("Consume", Filter.by(SerializableFunctions.constant(false)))
.apply(View.asIterable());
PCollection<T> materialized =
input.apply(
"Identity",
ParDo.of(
new DoFn<T, T>() {
@ProcessElement
public void process(ProcessContext c) {
c.output(c.element());
}
})
.withSideInputs(empty));
return materialized.apply(Reshuffle.viaRandomKey());
}
@Test
public void testToIterableTranslationWithFnApiSideInput() throws Exception {
// A "change detector" test that makes sure the translation
// of getting a PCollectionView<Iterable<T>> does not change
// in bad ways during refactor
DataflowPipelineOptions options = buildPipelineOptions();
options.setExperiments(Arrays.asList("beam_fn_api"));
DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(Create.of(1, 2, 3)).apply(View.asIterable());
DataflowRunner runner = DataflowRunner.fromOptions(options);
runner.replaceTransforms(pipeline);
SdkComponents sdkComponents = createSdkComponents(options);
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
Job job =
translator
.translate(pipeline, pipelineProto, sdkComponents, runner, Collections.emptyList())
.getJob();
assertAllStepOutputsHaveUniqueIds(job);
List<Step> steps = job.getSteps();
assertEquals(5, steps.size());
@SuppressWarnings("unchecked")
List<Map<String, Object>> ctsOutputs =
(List<Map<String, Object>>)
steps.get(steps.size() - 1).getProperties().get(PropertyNames.OUTPUT_INFO);
assertTrue(Structs.getBoolean(Iterables.getOnlyElement(ctsOutputs), "use_indexed_format"));
Step collectionToSingletonStep = steps.get(steps.size() - 1);
assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
}
@Test
public void testIsReady() {
SideInputHandler sideInputHandler =
new SideInputHandler(
ImmutableList.of(view1, view2), InMemoryStateInternals.<Void>forKey(null));
IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(WINDOW_MSECS_1));
IntervalWindow secondWindow = new IntervalWindow(new Instant(0), new Instant(WINDOW_MSECS_2));
// side input should not yet be ready
assertFalse(sideInputHandler.isReady(view1, firstWindow));
// add a value for view1
sideInputHandler.addSideInputValue(
view1,
valuesInWindow(
materializeValuesFor(view1.getPipeline().getOptions(), View.asIterable(), "Hello"),
new Instant(0),
firstWindow));
// now side input should be ready
assertTrue(sideInputHandler.isReady(view1, firstWindow));
// second window input should still not be ready
assertFalse(sideInputHandler.isReady(view1, secondWindow));
}
@Test
public void readImportManifestUtfWithBOM() throws Exception {
Path f11 = Files.createTempFile("table1-file", "1");
String tempDir = f11.getParent().toString();
Path manifestFile = Files.createTempFile("import-manifest", ".json");
Charset charset = Charset.forName("UTF-8");
try (BufferedWriter writer = Files.newBufferedWriter(manifestFile, charset)) {
String jsonString =
String.format(
"\uFEFF{\"tables\": ["
+ "{\"table_name\": \"table1\","
+ "\"file_patterns\":[\"%s\"]}"
+ "]}",
f11.toString());
writer.write(jsonString, 0, jsonString.length());
} catch (IOException e) {
e.printStackTrace();
}
ValueProvider<String> importManifest =
ValueProvider.StaticValueProvider.of(manifestFile.toString());
PCollectionView<Ddl> ddlView =
pipeline.apply("ddl", Create.of(getTestDdl())).apply(View.asSingleton());
PCollection<KV<String, String>> tableAndFiles =
pipeline
.apply("Read manifest file", new ReadImportManifest(importManifest))
.apply("Resolve data files", new ResolveDataFiles(importManifest, ddlView));
PAssert.that(tableAndFiles)
.containsInAnyOrder(
KV.of("table1", f11.toString()));
pipeline.run();
}
@Test(expected = PipelineExecutionException.class)
public void readImportManifestInvalidTable() throws Exception {
Path f11 = Files.createTempFile("table1-file", "1");
Path manifestFile = Files.createTempFile("import-manifest", ".json");
Charset charset = Charset.forName("UTF-8");
try (BufferedWriter writer = Files.newBufferedWriter(manifestFile, charset)) {
// An invalid json string (missing the ending close "}").
String jsonString =
String.format(
"{\"tables\": ["
+ "{\"table_name\": \"NON_EXIST_TABLE\","
+ "\"file_patterns\":[\"%s\"]}"
+ "]",
f11.toString());
writer.write(jsonString, 0, jsonString.length());
} catch (IOException e) {
e.printStackTrace();
}
ValueProvider<String> importManifest =
ValueProvider.StaticValueProvider.of(manifestFile.toString());
PCollectionView<Ddl> ddlView =
pipeline.apply("ddl", Create.of(getTestDdl())).apply(View.asSingleton());
PCollection<KV<String, String>> tableAndFiles =
pipeline
.apply("Read manifest file", new ReadImportManifest(importManifest))
.apply("Resolve data files", new ResolveDataFiles(importManifest, ddlView));
pipeline.run();
}
@Test
public void testMultipleSideInputs() {
SideInputHandler sideInputHandler =
new SideInputHandler(
ImmutableList.of(view1, view2), InMemoryStateInternals.<Void>forKey(null));
// two windows that we'll later use for adding elements/retrieving side input
IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(WINDOW_MSECS_1));
// add value for view1 in the first window
sideInputHandler.addSideInputValue(
view1,
valuesInWindow(
materializeValuesFor(view1.getPipeline().getOptions(), View.asIterable(), "Hello"),
new Instant(0),
firstWindow));
assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
// view2 should not have any data
assertFalse(sideInputHandler.isReady(view2, firstWindow));
// also add some data for view2
sideInputHandler.addSideInputValue(
view2,
valuesInWindow(
materializeValuesFor(view2.getPipeline().getOptions(), View.asIterable(), "Salut"),
new Instant(0),
firstWindow));
assertTrue(sideInputHandler.isReady(view2, firstWindow));
assertThat(sideInputHandler.get(view2, firstWindow), contains("Salut"));
// view1 should not be affected by that
assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
}
@Test(expected = PipelineExecutionException.class)
public void parseRowToMutationTooManyColumns() throws Exception {
PCollectionView<Ddl> ddlView =
pipeline.apply("ddl", Create.of(getTestDdl())).apply(View.asSingleton());
PCollectionView<Map<String, List<TableManifest.Column>>> tableColumnsMapView =
pipeline
.apply(
"tableColumnsMap",
Create.<Map<String, List<TableManifest.Column>>>of(getEmptyTableColumnsMap())
.withCoder(
MapCoder.of(
StringUtf8Coder.of(),
ListCoder.of(ProtoCoder.of(TableManifest.Column.class)))))
.apply("Map as view", View.asSingleton());
PCollection<KV<String, String>> input =
pipeline.apply(
"input",
Create.of(KV.of(testTableName, "123,a string,yet another string,1.23,True,,,,,,,")));
PCollection<Mutation> mutations =
input.apply(
ParDo.of(
new TextRowToMutation(
ddlView,
tableColumnsMapView,
columnDelimiter,
StaticValueProvider.of('"'),
trailingDelimiter,
escape,
nullString,
dateFormat,
timestampFormat))
.withSideInputs(ddlView, tableColumnsMapView));
pipeline.run();
}
@Test
public void testToSingletonTranslationWithIsmSideInput() throws Exception {
// A "change detector" test that makes sure the translation
// of getting a PCollectionView<T> does not change
// in bad ways during refactor
DataflowPipelineOptions options = buildPipelineOptions();
DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(Create.of(1)).apply(View.asSingleton());
DataflowRunner runner = DataflowRunner.fromOptions(options);
runner.replaceTransforms(pipeline);
SdkComponents sdkComponents = createSdkComponents(options);
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
Job job =
translator
.translate(pipeline, pipelineProto, sdkComponents, runner, Collections.emptyList())
.getJob();
assertAllStepOutputsHaveUniqueIds(job);
List<Step> steps = job.getSteps();
assertEquals(9, steps.size());
@SuppressWarnings("unchecked")
List<Map<String, Object>> toIsmRecordOutputs =
(List<Map<String, Object>>)
steps.get(steps.size() - 2).getProperties().get(PropertyNames.OUTPUT_INFO);
assertTrue(
Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
Step collectionToSingletonStep = steps.get(steps.size() - 1);
assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
}
/**
* @param contentToIndexNotSkipped
* @param contentNotToIndexSkipped
* @param pipeline
* @param options
* @return
*/
private static ContentToIndexOrNot filterAlreadyProcessedDocuments(
PCollection<InputContent> contentToIndexNotSkipped, PCollection<InputContent> contentNotToIndexSkipped,
Pipeline pipeline, IndexerPipelineOptions options) {
PCollection<KV<String,Long>> alreadyProcessedDocs = null;
if (!options.getWriteTruncate()) {
String query = IndexerPipelineUtils.buildBigQueryProcessedDocsQuery(options);
alreadyProcessedDocs = pipeline
.apply("Get already processed Documents",BigQueryIO.read().fromQuery(query))
.apply(ParDo.of(new GetDocumentHashFn()));
} else {
Map<String, Long> map = new HashMap<String,Long>();
alreadyProcessedDocs = pipeline
.apply("Create empty side input of Docs",
Create.of(map).withCoder(KvCoder.of(StringUtf8Coder.of(),VarLongCoder.of())));
}
final PCollectionView<Map<String,Long>> alreadyProcessedDocsSideInput =
alreadyProcessedDocs.apply(View.<String,Long>asMap());
PCollectionTuple indexOrNotBasedOnExactDupes = contentToIndexNotSkipped
.apply("Extract DocumentHash key", ParDo.of(new GetInputContentDocumentHashFn()))
.apply("Group by DocumentHash key", GroupByKey.<String, InputContent>create())
.apply("Eliminate InputContent Dupes", ParDo.of(new EliminateInputContentDupes(alreadyProcessedDocsSideInput))
.withSideInputs(alreadyProcessedDocsSideInput)
.withOutputTags(PipelineTags.contentToIndexNotExactDupesTag, // main output collection
TupleTagList.of(PipelineTags.contentNotToIndexExactDupesTag))); // side output collection
PCollection<InputContent> contentToIndexNotExactDupes = indexOrNotBasedOnExactDupes.get(PipelineTags.contentToIndexNotExactDupesTag);
PCollection<InputContent> contentNotToIndexExactDupes = indexOrNotBasedOnExactDupes.get(PipelineTags.contentNotToIndexExactDupesTag);
// Merge the sets of items that are dupes or skipped
PCollectionList<InputContent> contentNotToIndexList = PCollectionList.of(contentNotToIndexExactDupes).and(contentNotToIndexSkipped);
ContentToIndexOrNot content = new ContentToIndexOrNot(contentToIndexNotExactDupes, contentNotToIndexList.apply(Flatten.<InputContent>pCollections()));
return content;
}
/**
* Main function for the BEAM program.
* @param args arguments.
*/
public static void main(final String[] args) {
final String inputFilePath = args[0];
final String outputFilePath = args[1];
final PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(NemoPipelineRunner.class);
final Pipeline p = Pipeline.create(options);
final PCollection<String> elemCollection = GenericSourceSink.read(p, inputFilePath);
final PCollectionView<Iterable<String>> allCollection = elemCollection.apply(View.<String>asIterable());
final PCollection<String> result = elemCollection.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(final ProcessContext c) {
final String line = c.element();
final Iterable<String> all = c.sideInput(allCollection);
final Optional<String> appended = StreamSupport.stream(all.spliterator(), false)
.reduce((l, r) -> l + '\n' + r);
if (appended.isPresent()) {
c.output("line: " + line + "\n" + appended.get());
} else {
c.output("error");
}
}
}).withSideInputs(allCollection)
);
GenericSourceSink.write(result, outputFilePath);
p.run();
}
@Test
public void testToSingletonTranslationWithFnApiSideInput() throws Exception {
// A "change detector" test that makes sure the translation
// of getting a PCollectionView<T> does not change
// in bad ways during refactor
DataflowPipelineOptions options = buildPipelineOptions();
options.setExperiments(Arrays.asList("beam_fn_api"));
DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(Create.of(1)).apply(View.asSingleton());
DataflowRunner runner = DataflowRunner.fromOptions(options);
runner.replaceTransforms(pipeline);
SdkComponents sdkComponents = createSdkComponents(options);
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
Job job =
translator
.translate(pipeline, pipelineProto, sdkComponents, runner, Collections.emptyList())
.getJob();
assertAllStepOutputsHaveUniqueIds(job);
List<Step> steps = job.getSteps();
assertEquals(9, steps.size());
Step collectionToSingletonStep = steps.get(steps.size() - 1);
assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
@SuppressWarnings("unchecked")
List<Map<String, Object>> ctsOutputs =
(List<Map<String, Object>>)
steps.get(steps.size() - 1).getProperties().get(PropertyNames.OUTPUT_INFO);
assertTrue(Structs.getBoolean(Iterables.getOnlyElement(ctsOutputs), "use_indexed_format"));
}
private static <ReadT, WriteT>
TransformEvaluator<View.CreatePCollectionView<ReadT, WriteT>> createPCollView() {
return new TransformEvaluator<View.CreatePCollectionView<ReadT, WriteT>>() {
@Override
public void evaluate(
View.CreatePCollectionView<ReadT, WriteT> transform, EvaluationContext context) {
Iterable<? extends WindowedValue<?>> iter =
context.getWindowedValues(context.getInput(transform));
PCollectionView<WriteT> output = transform.getView();
Coder<Iterable<WindowedValue<?>>> coderInternal =
(Coder)
IterableCoder.of(
WindowedValue.getFullCoder(
output.getCoderInternal(),
output.getWindowingStrategyInternal().getWindowFn().windowCoder()));
@SuppressWarnings("unchecked")
Iterable<WindowedValue<?>> iterCast = (Iterable<WindowedValue<?>>) iter;
context.putPView(output, iterCast, coderInternal);
}
@Override
public String toNativeString() {
return "<createPCollectionView>";
}
};
}
@Before
public void setup() {
DirectRunner runner = DirectRunner.fromOptions(PipelineOptionsFactory.create());
created = p.apply(Create.of(1, 2, 3));
downstream = created.apply(WithKeys.of("foo"));
view = created.apply(View.asIterable());
unbounded = p.apply(GenerateSequence.from(0));
p.replaceAll(runner.defaultTransformOverrides());
KeyedPValueTrackingVisitor keyedPValueTrackingVisitor = KeyedPValueTrackingVisitor.create();
p.traverseTopologically(keyedPValueTrackingVisitor);
BundleFactory bundleFactory = ImmutableListBundleFactory.create();
DirectGraphs.performDirectOverrides(p);
graph = DirectGraphs.getGraph(p);
context =
EvaluationContext.create(
NanosOffsetClock.create(),
bundleFactory,
graph,
keyedPValueTrackingVisitor.getKeyedPValues(),
Executors.newSingleThreadExecutor());
createdProducer = graph.getProducer(created);
downstreamProducer = graph.getProducer(downstream);
viewProducer = graph.getProducer(view);
unboundedProducer = graph.getProducer(unbounded);
}
@Test
public void createViewWithViewFnDifferentViewFn() {
PCollection<Integer> input = p.apply(Create.of(1));
PCollectionView<Iterable<Integer>> view = input.apply(View.asIterable());
// Purposely create a subclass to get a different class then what was expected.
IterableViewFn<Integer> viewFn =
new PCollectionViews.IterableViewFn<Integer>(() -> TypeDescriptors.integers()) {};
CreatePCollectionView<?, ?> createView = CreatePCollectionView.of(view);
PTransformMatcher matcher = PTransformMatchers.createViewWithViewFn(viewFn.getClass());
assertThat(matcher.matches(getAppliedTransform(createView)), is(false));
}
@Override
public PCollection<Bid> expand(PCollection<Event> events) {
checkState(getSideInput() != null, "Configuration error: side input is null");
final PCollectionView<Map<Long, String>> sideInputMap = getSideInput().apply(View.asMap());
return events
// Only want the bid events; easier to fake some side input data
.apply(NexmarkQueryUtil.JUST_BIDS)
// Map the conversion function over all bids.
.apply(
name + ".JoinToFiles",
ParDo.of(
new DoFn<Bid, Bid>() {
@ProcessElement
public void processElement(ProcessContext c) {
Bid bid = c.element();
c.output(
new Bid(
bid.auction,
bid.bidder,
bid.price,
bid.dateTime,
c.sideInput(sideInputMap)
.get(bid.bidder % configuration.sideInputRowCount)));
}
})
.withSideInputs(sideInputMap));
}
private void performTestWithList(
PCollection<KV<byte[], byte[]>> input, Optional<SyntheticStep> syntheticStep) {
applyStepIfPresent(input, "Synthetic step", syntheticStep);
PCollectionView<List<KV<byte[], byte[]>>> sideInput =
applyWindowingIfPresent(input).apply(View.asList());
input
.apply(ParDo.of(new SideInputTestWithList(sideInput)).withSideInputs(sideInput))
.apply("Collect end time metrics", ParDo.of(runtimeMonitor));
}