类org.apache.beam.sdk.transforms.View源码实例Demo

下面列出了怎么用org.apache.beam.sdk.transforms.View的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: beam   文件: DataflowPipelineTranslator.java
private <ElemT, ViewT> void translateTyped(
    View.CreatePCollectionView<ElemT, ViewT> transform, TranslationContext context) {
  StepTranslationContext stepContext =
      context.addStep(transform, "CollectionToSingleton");
  PCollection<ElemT> input = context.getInput(transform);
  stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
  WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
  stepContext.addInput(
      PropertyNames.WINDOWING_STRATEGY,
      byteArrayToJsonString(
          serializeWindowingStrategy(windowingStrategy, context.getPipelineOptions())));
  stepContext.addInput(
      PropertyNames.IS_MERGING_WINDOW_FN,
      !windowingStrategy.getWindowFn().isNonMerging());
  stepContext.addCollectionToSingletonOutput(
      input, PropertyNames.OUTPUT, transform.getView());
}
 
源代码2 项目: beam   文件: SideInputContainerTest.java
@Test
public void writeForElementInMultipleWindowsSucceeds() throws Exception {
  ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
  for (Object materializedValue :
      materializeValuesFor(singletonView.getPipeline().getOptions(), View.asSingleton(), 2.875)) {
    valuesBuilder.add(
        WindowedValue.of(
            materializedValue,
            FIRST_WINDOW.maxTimestamp().minus(200L),
            ImmutableList.of(FIRST_WINDOW, SECOND_WINDOW),
            PaneInfo.ON_TIME_AND_ONLY_FIRING));
  }
  container.write(singletonView, valuesBuilder.build());
  assertThat(
      container
          .createReaderForViews(ImmutableList.of(singletonView))
          .get(singletonView, FIRST_WINDOW),
      equalTo(2.875));
  assertThat(
      container
          .createReaderForViews(ImmutableList.of(singletonView))
          .get(singletonView, SECOND_WINDOW),
      equalTo(2.875));
}
 
/**
 * @param options
 * @param pipeline
 * @param readContent
 * @return
 */
private static PCollection<InputContent> filterAlreadyProcessedUrls(
		PCollection<InputContent> readContent, Pipeline pipeline, 
		IndexerPipelineOptions options) {
	PCollection<InputContent> contentToProcess;
	String query = IndexerPipelineUtils.buildBigQueryProcessedUrlsQuery(options);
	PCollection<KV<String,Long>> alreadyProcessedUrls = pipeline
		.apply("Get processed URLs",BigQueryIO.read().fromQuery(query))
		.apply(ParDo.of(new GetUrlFn()));

	final PCollectionView<Map<String,Long>> alreadyProcessedUrlsSideInput =
		alreadyProcessedUrls.apply(View.<String,Long>asMap());
	  
	contentToProcess = readContent
		.apply(ParDo.of(new FilterProcessedUrls(alreadyProcessedUrlsSideInput))
			.withSideInputs(alreadyProcessedUrlsSideInput));
	return contentToProcess;
}
 
源代码4 项目: beam   文件: DataflowPTransformMatchersTest.java
/** Creates a simple pipeline with a {@link Combine.GroupedValues} with side inputs. */
private static TestPipeline createCombineGroupedValuesWithSideInputsPipeline() {
  TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
  PCollection<KV<String, Integer>> input =
      pipeline
          .apply(Create.of(KV.of("key", 1)))
          .setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
  PCollection<String> sideInput = pipeline.apply(Create.of("side input"));
  PCollectionView<String> sideInputView = sideInput.apply(View.asSingleton());

  input
      .apply(GroupByKey.create())
      .apply(
          Combine.<String, Integer, Integer>groupedValues(new SumCombineFnWithContext())
              .withSideInputs(sideInputView));

  return pipeline;
}
 
源代码5 项目: components   文件: Write.java
@Override
public PCollectionView<Integer> expand(PCollection<T> input) {
    return input
            .getPipeline()
            .apply(Create.of(0))
            .apply(
                    "FixedNumShards",
                    ParDo.of(
                            new DoFn<Integer, Integer>() {
                                @ProcessElement
                                public void outputNumShards(ProcessContext ctxt) {
                                    checkArgument(
                                            numShards.isAccessible(),
                                            "NumShards must be accessible at runtime to use constant sharding");
                                    ctxt.output(numShards.get());
                                }
                            }))
            .apply(View.<Integer>asSingleton());
}
 
源代码6 项目: beam   文件: DLPReidentifyTextTest.java
@Test
public void throwsExceptionWhenDelimiterIsNullAndHeadersAreSet() {
  PCollectionView<List<String>> header =
      testPipeline.apply(Create.of("header")).apply(View.asList());
  assertThrows(
      "Column delimiter should be set if headers are present.",
      IllegalArgumentException.class,
      () ->
          DLPReidentifyText.newBuilder()
              .setProjectId(PROJECT_ID)
              .setBatchSizeBytes(BATCH_SIZE_SMALL)
              .setReidentifyTemplateName(TEMPLATE_NAME)
              .setHeaderColumns(header)
              .build());
  testPipeline.run().waitUntilFinish();
}
 
源代码7 项目: beam   文件: BigQueryIOReadTest.java
@Test
public void testPassThroughThenCleanupExecuted() throws Exception {

  p.apply(Create.empty(VarIntCoder.of()))
      .apply(
          new PassThroughThenCleanup<>(
              new PassThroughThenCleanup.CleanupOperation() {
                @Override
                void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception {
                  throw new RuntimeException("cleanup executed");
                }
              },
              p.apply("Create1", Create.of("")).apply(View.asSingleton())));

  thrown.expect(RuntimeException.class);
  thrown.expectMessage("cleanup executed");

  p.run();
}
 
源代码8 项目: beam   文件: DirectGraphVisitorTest.java
@Test
public void getViewsReturnsViews() {
  PCollectionView<List<String>> listView =
      p.apply("listCreate", Create.of("foo", "bar"))
          .apply(
              ParDo.of(
                  new DoFn<String, String>() {
                    @ProcessElement
                    public void processElement(DoFn<String, String>.ProcessContext c)
                        throws Exception {
                      c.output(Integer.toString(c.element().length()));
                    }
                  }))
          .apply(View.asList());
  PCollectionView<Object> singletonView =
      p.apply("singletonCreate", Create.<Object>of(1, 2, 3)).apply(View.asSingleton());
  p.replaceAll(
      DirectRunner.fromOptions(TestPipeline.testingPipelineOptions())
          .defaultTransformOverrides());
  p.traverseTopologically(visitor);
  assertThat(visitor.getGraph().getViews(), Matchers.containsInAnyOrder(listView, singletonView));
}
 
源代码9 项目: beam   文件: HadoopFormatIO.java
/**
 * Creates {@link PCollectionView} with one {@link Configuration} based on the set source of the
 * configuration.
 *
 * @param input input data
 * @return PCollectionView with single {@link Configuration}
 * @see Builder#withConfiguration(Configuration)
 * @see Builder#withConfigurationTransform(PTransform)
 */
private PCollectionView<Configuration> createConfigurationView(
    PCollection<KV<KeyT, ValueT>> input) {

  PCollectionView<Configuration> config;
  if (configuration != null) {
    config =
        input
            .getPipeline()
            .apply("CreateOutputConfig", Create.<Configuration>of(configuration))
            .apply(View.<Configuration>asSingleton().withDefaultValue(configuration));
  } else {
    config = input.apply("TransformDataIntoConfig", configTransform);
  }

  return config;
}
 
源代码10 项目: incubator-nemo   文件: Broadcast.java
/**
 * Main function for the BEAM program.
 *
 * @param args arguments.
 */
public static void main(final String[] args) {
  final String inputFilePath = args[0];
  final String outputFilePath = args[1];
  final PipelineOptions options = NemoPipelineOptionsFactory.create();

  final Pipeline p = Pipeline.create(options);
  final PCollection<String> elemCollection = GenericSourceSink.read(p, inputFilePath);
  final PCollectionView<Iterable<String>> allCollection = elemCollection.apply(View.<String>asIterable());

  final PCollection<String> result = elemCollection.apply(ParDo.of(new DoFn<String, String>() {
      @ProcessElement
      public void processElement(final ProcessContext c) {
        final String line = c.element();
        final Iterable<String> all = c.sideInput(allCollection);
        final Optional<String> appended = StreamSupport.stream(all.spliterator(), false)
          .reduce((l, r) -> l + '\n' + r);
        if (appended.isPresent()) {
          c.output("line: " + line + "\n" + appended.get());
        } else {
          c.output("error");
        }
      }
    }).withSideInputs(allCollection)
  );

  GenericSourceSink.write(result, outputFilePath);
  p.run().waitUntilFinish();
}
 
源代码11 项目: incubator-nemo   文件: WindowedBroadcast.java
/**
 * Main function for the MR BEAM program.
 *
 * @param args arguments.
 */
public static void main(final String[] args) {
  final String outputFilePath = args[0];

  final Window<Long> windowFn = Window
    .<Long>into(SlidingWindows.of(Duration.standardSeconds(2))
      .every(Duration.standardSeconds(1)));

  final PipelineOptions options = NemoPipelineOptionsFactory.create();
  options.setJobName("WindowedBroadcast");

  final Pipeline p = Pipeline.create(options);

  final PCollection<Long> windowedElements = getSource(p).apply(windowFn);
  final PCollectionView<List<Long>> windowedView = windowedElements.apply(View.asList());

  windowedElements.apply(ParDo.of(new DoFn<Long, String>() {
      @ProcessElement
      public void processElement(final ProcessContext c) {
        final Long anElementInTheWindow = c.element();
        final List<Long> allElementsInTheWindow = c.sideInput(windowedView);
        System.out.println(anElementInTheWindow + " / " + allElementsInTheWindow);
        if (!allElementsInTheWindow.contains(anElementInTheWindow)) {
          throw new RuntimeException(anElementInTheWindow + " not in " + allElementsInTheWindow.toString());
        } else {
          c.output(anElementInTheWindow + " is in " + allElementsInTheWindow);
        }
      }
    }).withSideInputs(windowedView)
  ).apply(new WriteOneFilePerWindow(outputFilePath, 1));

  p.run().waitUntilFinish();
}
 
源代码12 项目: beam   文件: PTransformMatchersTest.java
@Test
public void createViewWithViewFn() {
  PCollection<Integer> input = p.apply(Create.of(1));
  PCollectionView<Iterable<Integer>> view = input.apply(View.asIterable());
  ViewFn<?, ?> viewFn = view.getViewFn();
  CreatePCollectionView<?, ?> createView = CreatePCollectionView.of(view);

  PTransformMatcher matcher = PTransformMatchers.createViewWithViewFn(viewFn.getClass());
  assertThat(matcher.matches(getAppliedTransform(createView)), is(true));
}
 
源代码13 项目: DataflowTemplates   文件: LocalSpannerIO.java
@Override
public PCollectionView<Transaction> expand(PBegin input) {
  getSpannerConfig().validate();

  return input
      .apply(Create.of(1))
      .apply("Create transaction", ParDo.of(new LocalCreateTransactionFn(this)))
      .apply("As PCollectionView", View.asSingleton());
}
 
@Override
public PCollection<Mutation> expand(PCollection<KV<String, String>> filesToTables) {

  // Map<filename,tablename>
  PCollectionView<Map<String, String>> filenamesToTableNamesMapView =
      filesToTables.apply("asView", View.asMap());

  return filesToTables
      .apply("Get Filenames", Keys.create())
      // PCollection<String>
      .apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW))
      // PCollection<Match.Metadata>
      .apply(FileIO.readMatches())
      // Pcollection<FileIO.ReadableFile>
      .apply(
          "Split into ranges",
          ParDo.of(
                  new SplitIntoRangesFn(
                      SplitIntoRangesFn.DEFAULT_BUNDLE_SIZE, filenamesToTableNamesMapView))
              .withSideInputs(filenamesToTableNamesMapView))
      .setCoder(FileShard.Coder.of())
      // PCollection<FileShard>
      .apply("Reshuffle", Reshuffle.viaRandomKey())
      // PCollection<FileShard>

      .apply("Read ranges", ParDo.of(new ReadFileRangesFn(ddlView)).withSideInputs(ddlView));
}
 
源代码15 项目: DataflowTemplates   文件: DynamicJdbcIO.java
@Override
public PCollection<T> expand(PCollection<T> input) {
  // See https://issues.apache.org/jira/browse/BEAM-2803
  // We use a combined approach to "break fusion" here:
  // (see https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion)
  // 1) force the data to be materialized by passing it as a side input to an identity fn,
  // then 2) reshuffle it with a random key. Initial materialization provides some parallelism
  // and ensures that data to be shuffled can be generated in parallel, while reshuffling
  // provides perfect parallelism.
  // In most cases where a "fusion break" is needed, a simple reshuffle would be sufficient.
  // The current approach is necessary only to support the particular case of JdbcIO where
  // a single query may produce many gigabytes of query results.
  PCollectionView<Iterable<T>> empty =
      input
          .apply("Consume", Filter.by(SerializableFunctions.constant(false)))
          .apply(View.asIterable());
  PCollection<T> materialized =
      input.apply(
          "Identity",
          ParDo.of(
                  new DoFn<T, T>() {
                    @ProcessElement
                    public void process(ProcessContext c) {
                      c.output(c.element());
                    }
                  })
              .withSideInputs(empty));
  return materialized.apply(Reshuffle.viaRandomKey());
}
 
源代码16 项目: beam   文件: DataflowPipelineTranslatorTest.java
@Test
public void testToIterableTranslationWithFnApiSideInput() throws Exception {
  // A "change detector" test that makes sure the translation
  // of getting a PCollectionView<Iterable<T>> does not change
  // in bad ways during refactor

  DataflowPipelineOptions options = buildPipelineOptions();
  options.setExperiments(Arrays.asList("beam_fn_api"));
  DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);

  Pipeline pipeline = Pipeline.create(options);
  pipeline.apply(Create.of(1, 2, 3)).apply(View.asIterable());

  DataflowRunner runner = DataflowRunner.fromOptions(options);
  runner.replaceTransforms(pipeline);
  SdkComponents sdkComponents = createSdkComponents(options);
  RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
  Job job =
      translator
          .translate(pipeline, pipelineProto, sdkComponents, runner, Collections.emptyList())
          .getJob();
  assertAllStepOutputsHaveUniqueIds(job);

  List<Step> steps = job.getSteps();
  assertEquals(5, steps.size());

  @SuppressWarnings("unchecked")
  List<Map<String, Object>> ctsOutputs =
      (List<Map<String, Object>>)
          steps.get(steps.size() - 1).getProperties().get(PropertyNames.OUTPUT_INFO);
  assertTrue(Structs.getBoolean(Iterables.getOnlyElement(ctsOutputs), "use_indexed_format"));
  Step collectionToSingletonStep = steps.get(steps.size() - 1);
  assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
}
 
源代码17 项目: beam   文件: SideInputHandlerTest.java
@Test
public void testIsReady() {
  SideInputHandler sideInputHandler =
      new SideInputHandler(
          ImmutableList.of(view1, view2), InMemoryStateInternals.<Void>forKey(null));

  IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(WINDOW_MSECS_1));

  IntervalWindow secondWindow = new IntervalWindow(new Instant(0), new Instant(WINDOW_MSECS_2));

  // side input should not yet be ready
  assertFalse(sideInputHandler.isReady(view1, firstWindow));

  // add a value for view1
  sideInputHandler.addSideInputValue(
      view1,
      valuesInWindow(
          materializeValuesFor(view1.getPipeline().getOptions(), View.asIterable(), "Hello"),
          new Instant(0),
          firstWindow));

  // now side input should be ready
  assertTrue(sideInputHandler.isReady(view1, firstWindow));

  // second window input should still not be ready
  assertFalse(sideInputHandler.isReady(view1, secondWindow));
}
 
@Test
public void readImportManifestUtfWithBOM() throws Exception {
  Path f11 = Files.createTempFile("table1-file", "1");
  String tempDir = f11.getParent().toString();

  Path manifestFile = Files.createTempFile("import-manifest", ".json");
  Charset charset = Charset.forName("UTF-8");
  try (BufferedWriter writer = Files.newBufferedWriter(manifestFile, charset)) {
    String jsonString =
        String.format(
            "\uFEFF{\"tables\": ["
                + "{\"table_name\": \"table1\","
                + "\"file_patterns\":[\"%s\"]}"
                + "]}",
            f11.toString());
    writer.write(jsonString, 0, jsonString.length());
  } catch (IOException e) {
    e.printStackTrace();
  }

  ValueProvider<String> importManifest =
      ValueProvider.StaticValueProvider.of(manifestFile.toString());
  PCollectionView<Ddl> ddlView =
      pipeline.apply("ddl", Create.of(getTestDdl())).apply(View.asSingleton());

  PCollection<KV<String, String>> tableAndFiles =
      pipeline
          .apply("Read manifest file", new ReadImportManifest(importManifest))
          .apply("Resolve data files", new ResolveDataFiles(importManifest, ddlView));

  PAssert.that(tableAndFiles)
      .containsInAnyOrder(
          KV.of("table1", f11.toString()));

  pipeline.run();
}
 
@Test(expected = PipelineExecutionException.class)
public void readImportManifestInvalidTable() throws Exception {
  Path f11 = Files.createTempFile("table1-file", "1");

  Path manifestFile = Files.createTempFile("import-manifest", ".json");
  Charset charset = Charset.forName("UTF-8");
  try (BufferedWriter writer = Files.newBufferedWriter(manifestFile, charset)) {
    // An invalid json string (missing the ending close "}").
    String jsonString =
        String.format(
            "{\"tables\": ["
                + "{\"table_name\": \"NON_EXIST_TABLE\","
                + "\"file_patterns\":[\"%s\"]}"
                + "]",
            f11.toString());
    writer.write(jsonString, 0, jsonString.length());
  } catch (IOException e) {
    e.printStackTrace();
  }

  ValueProvider<String> importManifest =
      ValueProvider.StaticValueProvider.of(manifestFile.toString());
  PCollectionView<Ddl> ddlView =
      pipeline.apply("ddl", Create.of(getTestDdl())).apply(View.asSingleton());

  PCollection<KV<String, String>> tableAndFiles =
      pipeline
          .apply("Read manifest file", new ReadImportManifest(importManifest))
          .apply("Resolve data files", new ResolveDataFiles(importManifest, ddlView));

  pipeline.run();
}
 
源代码20 项目: beam   文件: SideInputHandlerTest.java
@Test
public void testMultipleSideInputs() {
  SideInputHandler sideInputHandler =
      new SideInputHandler(
          ImmutableList.of(view1, view2), InMemoryStateInternals.<Void>forKey(null));

  // two windows that we'll later use for adding elements/retrieving side input
  IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(WINDOW_MSECS_1));

  // add value for view1 in the first window
  sideInputHandler.addSideInputValue(
      view1,
      valuesInWindow(
          materializeValuesFor(view1.getPipeline().getOptions(), View.asIterable(), "Hello"),
          new Instant(0),
          firstWindow));

  assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));

  // view2 should not have any data
  assertFalse(sideInputHandler.isReady(view2, firstWindow));

  // also add some data for view2
  sideInputHandler.addSideInputValue(
      view2,
      valuesInWindow(
          materializeValuesFor(view2.getPipeline().getOptions(), View.asIterable(), "Salut"),
          new Instant(0),
          firstWindow));

  assertTrue(sideInputHandler.isReady(view2, firstWindow));
  assertThat(sideInputHandler.get(view2, firstWindow), contains("Salut"));

  // view1 should not be affected by that
  assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
}
 
源代码21 项目: DataflowTemplates   文件: TextRowToMutationTest.java
@Test(expected = PipelineExecutionException.class)
public void parseRowToMutationTooManyColumns() throws Exception {
  PCollectionView<Ddl> ddlView =
      pipeline.apply("ddl", Create.of(getTestDdl())).apply(View.asSingleton());
  PCollectionView<Map<String, List<TableManifest.Column>>> tableColumnsMapView =
      pipeline
          .apply(
              "tableColumnsMap",
              Create.<Map<String, List<TableManifest.Column>>>of(getEmptyTableColumnsMap())
                  .withCoder(
                      MapCoder.of(
                          StringUtf8Coder.of(),
                          ListCoder.of(ProtoCoder.of(TableManifest.Column.class)))))
          .apply("Map as view", View.asSingleton());

  PCollection<KV<String, String>> input =
      pipeline.apply(
          "input",
          Create.of(KV.of(testTableName, "123,a string,yet another string,1.23,True,,,,,,,")));
  PCollection<Mutation> mutations =
      input.apply(
          ParDo.of(
                  new TextRowToMutation(
                      ddlView,
                      tableColumnsMapView,
                      columnDelimiter,
                      StaticValueProvider.of('"'),
                      trailingDelimiter,
                      escape,
                      nullString,
                      dateFormat,
                      timestampFormat))
              .withSideInputs(ddlView, tableColumnsMapView));

  pipeline.run();
}
 
源代码22 项目: beam   文件: DataflowPipelineTranslatorTest.java
@Test
public void testToSingletonTranslationWithIsmSideInput() throws Exception {
  // A "change detector" test that makes sure the translation
  // of getting a PCollectionView<T> does not change
  // in bad ways during refactor

  DataflowPipelineOptions options = buildPipelineOptions();
  DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);

  Pipeline pipeline = Pipeline.create(options);
  pipeline.apply(Create.of(1)).apply(View.asSingleton());
  DataflowRunner runner = DataflowRunner.fromOptions(options);
  runner.replaceTransforms(pipeline);
  SdkComponents sdkComponents = createSdkComponents(options);
  RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
  Job job =
      translator
          .translate(pipeline, pipelineProto, sdkComponents, runner, Collections.emptyList())
          .getJob();
  assertAllStepOutputsHaveUniqueIds(job);

  List<Step> steps = job.getSteps();
  assertEquals(9, steps.size());

  @SuppressWarnings("unchecked")
  List<Map<String, Object>> toIsmRecordOutputs =
      (List<Map<String, Object>>)
          steps.get(steps.size() - 2).getProperties().get(PropertyNames.OUTPUT_INFO);
  assertTrue(
      Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));

  Step collectionToSingletonStep = steps.get(steps.size() - 1);
  assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
}
 
/**
 * @param contentToIndexNotSkipped
 * @param contentNotToIndexSkipped
 * @param pipeline
 * @param options
 * @return
 */
private static ContentToIndexOrNot filterAlreadyProcessedDocuments(
		PCollection<InputContent> contentToIndexNotSkipped, PCollection<InputContent> contentNotToIndexSkipped,
		Pipeline pipeline, IndexerPipelineOptions options) {
	PCollection<KV<String,Long>> alreadyProcessedDocs = null;
	
	if (!options.getWriteTruncate()) {
		String query = IndexerPipelineUtils.buildBigQueryProcessedDocsQuery(options);
		alreadyProcessedDocs = pipeline
			.apply("Get already processed Documents",BigQueryIO.read().fromQuery(query))
			.apply(ParDo.of(new GetDocumentHashFn()));

	} else {
		Map<String, Long> map = new HashMap<String,Long>();
		alreadyProcessedDocs = pipeline
			.apply("Create empty side input of Docs",
				Create.of(map).withCoder(KvCoder.of(StringUtf8Coder.of(),VarLongCoder.of())));
	}			
	
	final PCollectionView<Map<String,Long>> alreadyProcessedDocsSideInput =  
		alreadyProcessedDocs.apply(View.<String,Long>asMap());
	
	PCollectionTuple indexOrNotBasedOnExactDupes = contentToIndexNotSkipped
		.apply("Extract DocumentHash key", ParDo.of(new GetInputContentDocumentHashFn()))
		.apply("Group by DocumentHash key", GroupByKey.<String, InputContent>create())
		.apply("Eliminate InputContent Dupes", ParDo.of(new EliminateInputContentDupes(alreadyProcessedDocsSideInput))
			.withSideInputs(alreadyProcessedDocsSideInput)
			.withOutputTags(PipelineTags.contentToIndexNotExactDupesTag, // main output collection
				TupleTagList.of(PipelineTags.contentNotToIndexExactDupesTag))); // side output collection	
	
	PCollection<InputContent> contentToIndexNotExactDupes = indexOrNotBasedOnExactDupes.get(PipelineTags.contentToIndexNotExactDupesTag);
	PCollection<InputContent> contentNotToIndexExactDupes = indexOrNotBasedOnExactDupes.get(PipelineTags.contentNotToIndexExactDupesTag);
	
	// Merge the sets of items that are dupes or skipped
	PCollectionList<InputContent> contentNotToIndexList = PCollectionList.of(contentNotToIndexExactDupes).and(contentNotToIndexSkipped);
	
	ContentToIndexOrNot content = new ContentToIndexOrNot(contentToIndexNotExactDupes, contentNotToIndexList.apply(Flatten.<InputContent>pCollections()));
	return content;
}
 
源代码24 项目: nemo   文件: Broadcast.java
/**
 * Main function for the BEAM program.
 * @param args arguments.
 */
public static void main(final String[] args) {
  final String inputFilePath = args[0];
  final String outputFilePath = args[1];
  final PipelineOptions options = PipelineOptionsFactory.create();
  options.setRunner(NemoPipelineRunner.class);

  final Pipeline p = Pipeline.create(options);
  final PCollection<String> elemCollection = GenericSourceSink.read(p, inputFilePath);
  final PCollectionView<Iterable<String>> allCollection = elemCollection.apply(View.<String>asIterable());

  final PCollection<String> result = elemCollection.apply(ParDo.of(new DoFn<String, String>() {
        @ProcessElement
        public void processElement(final ProcessContext c) {
          final String line = c.element();
          final Iterable<String> all = c.sideInput(allCollection);
          final Optional<String> appended = StreamSupport.stream(all.spliterator(), false)
              .reduce((l, r) -> l + '\n' + r);
          if (appended.isPresent()) {
            c.output("line: " + line + "\n" + appended.get());
          } else {
            c.output("error");
          }
        }
      }).withSideInputs(allCollection)
  );

  GenericSourceSink.write(result, outputFilePath);
  p.run();
}
 
源代码25 项目: beam   文件: DataflowPipelineTranslatorTest.java
@Test
public void testToSingletonTranslationWithFnApiSideInput() throws Exception {
  // A "change detector" test that makes sure the translation
  // of getting a PCollectionView<T> does not change
  // in bad ways during refactor

  DataflowPipelineOptions options = buildPipelineOptions();
  options.setExperiments(Arrays.asList("beam_fn_api"));
  DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);

  Pipeline pipeline = Pipeline.create(options);
  pipeline.apply(Create.of(1)).apply(View.asSingleton());
  DataflowRunner runner = DataflowRunner.fromOptions(options);
  runner.replaceTransforms(pipeline);
  SdkComponents sdkComponents = createSdkComponents(options);
  RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
  Job job =
      translator
          .translate(pipeline, pipelineProto, sdkComponents, runner, Collections.emptyList())
          .getJob();
  assertAllStepOutputsHaveUniqueIds(job);

  List<Step> steps = job.getSteps();
  assertEquals(9, steps.size());

  Step collectionToSingletonStep = steps.get(steps.size() - 1);
  assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());

  @SuppressWarnings("unchecked")
  List<Map<String, Object>> ctsOutputs =
      (List<Map<String, Object>>)
          steps.get(steps.size() - 1).getProperties().get(PropertyNames.OUTPUT_INFO);
  assertTrue(Structs.getBoolean(Iterables.getOnlyElement(ctsOutputs), "use_indexed_format"));
}
 
源代码26 项目: beam   文件: TransformTranslator.java
private static <ReadT, WriteT>
    TransformEvaluator<View.CreatePCollectionView<ReadT, WriteT>> createPCollView() {
  return new TransformEvaluator<View.CreatePCollectionView<ReadT, WriteT>>() {
    @Override
    public void evaluate(
        View.CreatePCollectionView<ReadT, WriteT> transform, EvaluationContext context) {
      Iterable<? extends WindowedValue<?>> iter =
          context.getWindowedValues(context.getInput(transform));
      PCollectionView<WriteT> output = transform.getView();
      Coder<Iterable<WindowedValue<?>>> coderInternal =
          (Coder)
              IterableCoder.of(
                  WindowedValue.getFullCoder(
                      output.getCoderInternal(),
                      output.getWindowingStrategyInternal().getWindowFn().windowCoder()));

      @SuppressWarnings("unchecked")
      Iterable<WindowedValue<?>> iterCast = (Iterable<WindowedValue<?>>) iter;

      context.putPView(output, iterCast, coderInternal);
    }

    @Override
    public String toNativeString() {
      return "<createPCollectionView>";
    }
  };
}
 
源代码27 项目: beam   文件: EvaluationContextTest.java
@Before
public void setup() {
  DirectRunner runner = DirectRunner.fromOptions(PipelineOptionsFactory.create());

  created = p.apply(Create.of(1, 2, 3));
  downstream = created.apply(WithKeys.of("foo"));
  view = created.apply(View.asIterable());
  unbounded = p.apply(GenerateSequence.from(0));

  p.replaceAll(runner.defaultTransformOverrides());

  KeyedPValueTrackingVisitor keyedPValueTrackingVisitor = KeyedPValueTrackingVisitor.create();
  p.traverseTopologically(keyedPValueTrackingVisitor);

  BundleFactory bundleFactory = ImmutableListBundleFactory.create();
  DirectGraphs.performDirectOverrides(p);
  graph = DirectGraphs.getGraph(p);
  context =
      EvaluationContext.create(
          NanosOffsetClock.create(),
          bundleFactory,
          graph,
          keyedPValueTrackingVisitor.getKeyedPValues(),
          Executors.newSingleThreadExecutor());

  createdProducer = graph.getProducer(created);
  downstreamProducer = graph.getProducer(downstream);
  viewProducer = graph.getProducer(view);
  unboundedProducer = graph.getProducer(unbounded);
}
 
源代码28 项目: beam   文件: PTransformMatchersTest.java
@Test
public void createViewWithViewFnDifferentViewFn() {
  PCollection<Integer> input = p.apply(Create.of(1));
  PCollectionView<Iterable<Integer>> view = input.apply(View.asIterable());

  // Purposely create a subclass to get a different class then what was expected.
  IterableViewFn<Integer> viewFn =
      new PCollectionViews.IterableViewFn<Integer>(() -> TypeDescriptors.integers()) {};
  CreatePCollectionView<?, ?> createView = CreatePCollectionView.of(view);

  PTransformMatcher matcher = PTransformMatchers.createViewWithViewFn(viewFn.getClass());
  assertThat(matcher.matches(getAppliedTransform(createView)), is(false));
}
 
源代码29 项目: beam   文件: BoundedSideInputJoin.java
@Override
public PCollection<Bid> expand(PCollection<Event> events) {

  checkState(getSideInput() != null, "Configuration error: side input is null");

  final PCollectionView<Map<Long, String>> sideInputMap = getSideInput().apply(View.asMap());

  return events
      // Only want the bid events; easier to fake some side input data
      .apply(NexmarkQueryUtil.JUST_BIDS)

      // Map the conversion function over all bids.
      .apply(
          name + ".JoinToFiles",
          ParDo.of(
                  new DoFn<Bid, Bid>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) {
                      Bid bid = c.element();
                      c.output(
                          new Bid(
                              bid.auction,
                              bid.bidder,
                              bid.price,
                              bid.dateTime,
                              c.sideInput(sideInputMap)
                                  .get(bid.bidder % configuration.sideInputRowCount)));
                    }
                  })
              .withSideInputs(sideInputMap));
}
 
源代码30 项目: beam   文件: SideInputLoadTest.java
private void performTestWithList(
    PCollection<KV<byte[], byte[]>> input, Optional<SyntheticStep> syntheticStep) {
  applyStepIfPresent(input, "Synthetic step", syntheticStep);
  PCollectionView<List<KV<byte[], byte[]>>> sideInput =
      applyWindowingIfPresent(input).apply(View.asList());
  input
      .apply(ParDo.of(new SideInputTestWithList(sideInput)).withSideInputs(sideInput))
      .apply("Collect end time metrics", ParDo.of(runtimeMonitor));
}
 
 类方法
 同包方法