下面列出了怎么用org.apache.beam.sdk.transforms.Combine的API类实例代码及写法,或者点击链接到github查看源代码。
public AggregatorCombiner(
Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
WindowingStrategy<?, ?> windowingStrategy,
Coder<AccumT> accumulatorCoder,
Coder<OutputT> outputCoder) {
this.combineFn = combineFn;
this.windowingStrategy = (WindowingStrategy<InputT, W>) windowingStrategy;
this.timestampCombiner = windowingStrategy.getTimestampCombiner();
this.accumulatorCoder =
IterableCoder.of(
WindowedValue.FullWindowedValueCoder.of(
accumulatorCoder, windowingStrategy.getWindowFn().windowCoder()));
this.outputCoder =
IterableCoder.of(
WindowedValue.FullWindowedValueCoder.of(
outputCoder, windowingStrategy.getWindowFn().windowCoder()));
}
@SuppressWarnings("unchecked")
private static <K, InputT, OutputT>
SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> getSystemReduceFn(
PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform,
Pipeline pipeline,
KvCoder<K, InputT> kvInputCoder) {
if (transform instanceof GroupByKey) {
return (SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow>)
SystemReduceFn.buffering(kvInputCoder.getValueCoder());
} else if (transform instanceof Combine.PerKey) {
final CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> combineFn =
((Combine.PerKey) transform).getFn();
return SystemReduceFn.combining(
kvInputCoder.getKeyCoder(),
AppliedCombineFn.withInputCoder(combineFn, pipeline.getCoderRegistry(), kvInputCoder));
} else {
throw new RuntimeException("Transform " + transform + " cannot be translated as GroupByKey.");
}
}
@Test
public void testBinaryCombineWithSlidingWindows() {
PCollection<Integer> input =
pipeline
.apply(
Create.timestamped(
TimestampedValue.of(1, new Instant(1)),
TimestampedValue.of(3, new Instant(2)),
TimestampedValue.of(5, new Instant(3))))
.apply(Window.into(SlidingWindows.of(Duration.millis(3)).every(Duration.millis(1))))
.apply(
Combine.globally(
Combine.BinaryCombineFn.of(
(SerializableBiFunction<Integer, Integer, Integer>)
(integer1, integer2) -> integer1 > integer2 ? integer1 : integer2))
.withoutDefaults());
PAssert.that(input).containsInAnyOrder(1, 3, 5, 5, 5);
pipeline.run();
}
private static Combine.CombineFn<Integer, Long, Long> getSumFn() {
return new Combine.CombineFn<Integer, Long, Long>() {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long addInput(Long mutableAccumulator, Integer input) {
return mutableAccumulator + input;
}
@Override
public Long mergeAccumulators(Iterable<Long> accumulators) {
return StreamSupport.stream(accumulators.spliterator(), false).mapToLong(e -> e).sum();
}
@Override
public Long extractOutput(Long accumulator) {
return accumulator;
}
};
}
@Before
public void createPipeline() throws Exception {
// Create pipeline with an input pCollection, combine, and output pCollection.
TestCombineFn combineFn = new TestCombineFn();
Combine.PerKey<String, String, Integer> combine = Combine.perKey(combineFn);
Pipeline p = Pipeline.create();
PCollection<KV<String, String>> inputPCollection = p.apply(Create.of(KV.of("unused", "0")));
inputPCollection.setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
PCollection<KV<String, Integer>> outputPCollection =
inputPCollection.apply(TEST_COMBINE_ID, combine);
outputPCollection.setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
// Create FnApi protos needed for the runner.
SdkComponents sdkComponents = SdkComponents.create(p.getOptions());
pProto = PipelineTranslation.toProto(p, sdkComponents);
inputPCollectionId = sdkComponents.registerPCollection(inputPCollection);
outputPCollectionId = sdkComponents.registerPCollection(outputPCollection);
pTransform = pProto.getComponents().getTransformsOrThrow(TEST_COMBINE_ID);
}
@Test
public void testCombineGloballyPreservesWindowing() {
PCollection<Integer> input =
pipeline
.apply(
Create.timestamped(
TimestampedValue.of(1, new Instant(1)),
TimestampedValue.of(2, new Instant(2)),
TimestampedValue.of(3, new Instant(11)),
TimestampedValue.of(4, new Instant(3)),
TimestampedValue.of(5, new Instant(11)),
TimestampedValue.of(6, new Instant(12))))
.apply(Window.into(FixedWindows.of(Duration.millis(10))))
.apply(Combine.globally(Sum.ofIntegers()).withoutDefaults());
PAssert.that(input).containsInAnyOrder(7, 14);
}
@Test
@Category({ValidatesRunner.class, UsesCustomWindowMerging.class})
public void testMergingCustomWindows() {
Instant startInstant = new Instant(0L);
PCollection<String> inputCollection =
pipeline.apply(
Create.timestamped(
TimestampedValue.of("big", startInstant.plus(Duration.standardSeconds(10))),
TimestampedValue.of("small1", startInstant.plus(Duration.standardSeconds(20))),
// This one will be outside of bigWindow thus not merged
TimestampedValue.of("small2", startInstant.plus(Duration.standardSeconds(39)))));
PCollection<String> windowedCollection =
inputCollection.apply(Window.into(new CustomWindowFn<>()));
PCollection<Long> count =
windowedCollection.apply(Combine.globally(Count.<String>combineFn()).withoutDefaults());
// "small1" and "big" elements merged into bigWindow "small2" not merged
// because timestamp is not in bigWindow
PAssert.that("Wrong number of elements in output collection", count).containsInAnyOrder(2L, 1L);
pipeline.run();
}
@Test
@Category({ValidatesRunner.class, UsesCustomWindowMerging.class})
public void testMergingCustomWindowsKeyedCollection() {
Instant startInstant = new Instant(0L);
PCollection<KV<Integer, String>> inputCollection =
pipeline.apply(
Create.timestamped(
TimestampedValue.of(
KV.of(0, "big"), startInstant.plus(Duration.standardSeconds(10))),
TimestampedValue.of(
KV.of(1, "small1"), startInstant.plus(Duration.standardSeconds(20))),
// This element is not contained within the bigWindow and not merged
TimestampedValue.of(
KV.of(2, "small2"), startInstant.plus(Duration.standardSeconds(39)))));
PCollection<KV<Integer, String>> windowedCollection =
inputCollection.apply(Window.into(new CustomWindowFn<>()));
PCollection<Long> count =
windowedCollection.apply(
Combine.globally(Count.<KV<Integer, String>>combineFn()).withoutDefaults());
// "small1" and "big" elements merged into bigWindow "small2" not merged
// because it is not contained in bigWindow
PAssert.that("Wrong number of elements in output collection", count).containsInAnyOrder(2L, 1L);
pipeline.run();
}
/** Read the test dataset from hbase and validate its contents. */
private void runRead() {
PCollection<Result> tableRows =
pipelineRead.apply(HBaseIO.read().withConfiguration(conf).withTableId(TABLE_NAME));
PAssert.thatSingleton(tableRows.apply("Count All", Count.<Result>globally()))
.isEqualTo((long) numberOfRows);
PCollection<String> consolidatedHashcode =
tableRows
.apply(ParDo.of(new SelectNameFn()))
.apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults());
PAssert.that(consolidatedHashcode)
.containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows));
pipelineRead.run().waitUntilFinish();
}
@Override
boolean canTranslate(
PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform,
FlinkStreamingTranslationContext context) {
// if we have a merging window strategy and side inputs we cannot
// translate as a proper combine. We have to group and then run the combine
// over the final grouped values.
PCollection<KV<K, InputT>> input = context.getInput(transform);
@SuppressWarnings("unchecked")
WindowingStrategy<?, BoundedWindow> windowingStrategy =
(WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();
return windowingStrategy.getWindowFn().isNonMerging()
|| ((Combine.PerKey) transform).getSideInputs().isEmpty();
}
private void runRead() {
PCollection<Scientist> output =
pipelineRead.apply(
CassandraIO.<Scientist>read()
.withHosts(options.getCassandraHost())
.withPort(options.getCassandraPort())
.withMinNumberOfSplits(20)
.withKeyspace(KEYSPACE)
.withTable(TABLE)
.withEntity(Scientist.class)
.withCoder(SerializableCoder.of(Scientist.class)));
PCollection<String> consolidatedHashcode =
output
.apply(ParDo.of(new SelectNameFn()))
.apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults());
PAssert.thatSingleton(consolidatedHashcode)
.isEqualTo(TestRow.getExpectedHashForRowCount(options.getNumberOfRecords()));
pipelineRead.run().waitUntilFinish();
}
/**
* Test to read data from embedded Elasticsearch instance and verify whether data is read
* successfully.
*/
@Test
public void testHifIOWithElastic() {
// Expected hashcode is evaluated during insertion time one time and hardcoded here.
String expectedHashCode = "a62a85f5f081e3840baf1028d4d6c6bc";
Configuration conf = getConfiguration();
PCollection<KV<Text, LinkedMapWritable>> esData =
pipeline.apply(HadoopFormatIO.<Text, LinkedMapWritable>read().withConfiguration(conf));
PCollection<Long> count = esData.apply(Count.globally());
// Verify that the count of objects fetched using HIFInputFormat IO is correct.
PAssert.thatSingleton(count).isEqualTo((long) TEST_DATA_ROW_COUNT);
PCollection<LinkedMapWritable> values = esData.apply(Values.create());
PCollection<String> textValues = values.apply(transformFunc);
// Verify the output values using checksum comparison.
PCollection<String> consolidatedHashcode =
textValues.apply(Combine.globally(new HashingFn()).withoutDefaults());
PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedHashCode);
pipeline.run().waitUntilFinish();
}
/** This test reads data from the Cassandra instance and verifies if data is read successfully. */
@Test
public void testHIFReadForCassandra() {
// Expected hashcode is evaluated during insertion time one time and hardcoded here.
String expectedHashCode = "1a30ad400afe4ebf5fde75f5d2d95408";
Long expectedRecordsCount = 1000L;
Configuration conf = getConfiguration(options);
PCollection<KV<Long, String>> cassandraData =
pipeline.apply(
HadoopFormatIO.<Long, String>read()
.withConfiguration(conf)
.withValueTranslation(myValueTranslate));
PAssert.thatSingleton(cassandraData.apply("Count", Count.globally()))
.isEqualTo(expectedRecordsCount);
PCollection<String> textValues = cassandraData.apply(Values.create());
// Verify the output values using checksum comparison.
PCollection<String> consolidatedHashcode =
textValues.apply(Combine.globally(new HashingFn()).withoutDefaults());
PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedHashCode);
pipeline.run().waitUntilFinish();
}
/**
* Test to read data from embedded Cassandra instance and verify whether data is read
* successfully.
*/
@Test
public void testHIFReadForCassandra() {
// Expected hashcode is evaluated during insertion time one time and hardcoded here.
String expectedHashCode = "1b9780833cce000138b9afa25ba63486";
Configuration conf = getConfiguration();
PCollection<KV<Long, String>> cassandraData =
p.apply(
HadoopFormatIO.<Long, String>read()
.withConfiguration(conf)
.withValueTranslation(myValueTranslate));
// Verify the count of data retrieved from Cassandra matches expected count.
PAssert.thatSingleton(cassandraData.apply("Count", Count.globally()))
.isEqualTo(TEST_DATA_ROW_COUNT);
PCollection<String> textValues = cassandraData.apply(Values.create());
// Verify the output values using checksum comparison.
PCollection<String> consolidatedHashcode =
textValues.apply(Combine.globally(new HashingFn()).withoutDefaults());
PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedHashCode);
p.run().waitUntilFinish();
}
@Override
public FunctionSpec translate(
AppliedPTransform<?, ?, Combine.PerKey<?, ?, ?>> transform, SdkComponents components)
throws IOException {
if (transform.getTransform().getSideInputs().isEmpty()) {
GlobalCombineFn<?, ?, ?> combineFn = transform.getTransform().getFn();
Coder<?> accumulatorCoder =
extractAccumulatorCoder(combineFn, (AppliedPTransform) transform);
return FunctionSpec.newBuilder()
.setUrn(getUrn(transform.getTransform()))
.setPayload(combinePayload(combineFn, accumulatorCoder, components).toByteString())
.build();
} else {
// Combines with side inputs are translated as generic composites, which have a blank
// FunctionSpec.
return null;
}
}
@Override
public FunctionSpec translate(
AppliedPTransform<?, ?, Combine.Globally<?, ?>> transform, SdkComponents components)
throws IOException {
if (transform.getTransform().getSideInputs().isEmpty()) {
return FunctionSpec.newBuilder()
.setUrn(getUrn(transform.getTransform()))
.setPayload(
payloadForCombineGlobally((AppliedPTransform) transform, components).toByteString())
.build();
} else {
// Combines with side inputs are translated as generic composites, which have a blank
// FunctionSpec.
return null;
}
}
private static <K, InputT, AccumT> Coder<AccumT> extractAccumulatorCoder(
GlobalCombineFn<InputT, AccumT, ?> combineFn,
AppliedPTransform<
PCollection<KV<K, Iterable<InputT>>>, ?, Combine.GroupedValues<K, InputT, ?>>
transform)
throws IOException {
try {
@SuppressWarnings("unchecked")
PCollection<KV<K, Iterable<InputT>>> mainInput =
(PCollection<KV<K, Iterable<InputT>>>)
Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(transform));
KvCoder<K, Iterable<InputT>> kvCoder = (KvCoder<K, Iterable<InputT>>) mainInput.getCoder();
IterableCoder<InputT> iterCoder = (IterableCoder<InputT>) kvCoder.getValueCoder();
return combineFn.getAccumulatorCoder(
transform.getPipeline().getCoderRegistry(), iterCoder.getElemCoder());
} catch (CannotProvideCoderException e) {
throw new IOException("Could not obtain a Coder for the accumulator", e);
}
}
@Test
public void testCombineValuesFnExtract() throws Exception {
TestReceiver receiver = new TestReceiver();
MeanInts mean = new MeanInts();
Combine.CombineFn<Integer, CountSum, String> combiner = mean;
ParDoFn combineParDoFn =
createCombineValuesFn(
CombinePhase.EXTRACT,
combiner,
StringUtf8Coder.of(),
BigEndianIntegerCoder.of(),
new CountSumCoder(),
WindowingStrategy.globalDefault());
combineParDoFn.startBundle(receiver);
combineParDoFn.processElement(
WindowedValue.valueInGlobalWindow(KV.of("a", new CountSum(6, 27))));
combineParDoFn.processElement(
WindowedValue.valueInGlobalWindow(KV.of("b", new CountSum(3, 21))));
combineParDoFn.finishBundle();
assertArrayEquals(
new Object[] {
WindowedValue.valueInGlobalWindow(KV.of("a", String.format("%.1f", 4.5))),
WindowedValue.valueInGlobalWindow(KV.of("b", String.format("%.1f", 7.0)))
},
receiver.receivedElems.toArray());
}
@Test
public void buildDatabaseManifestFile() throws InvalidProtocolBufferException {
Map<String, String> tablesAndManifests =
ImmutableMap.of("table1", "table1 manifest", "table2", "table2 manifest");
Export.Builder builder = Export.newBuilder();
builder.addTablesBuilder().setName("table1").setManifestFile("table1-manifest.json");
builder.addTablesBuilder().setName("table2").setManifestFile("table2-manifest.json");
String expectedManifest = JsonFormat.printer().print(builder.build());
PCollection<String> databaseManifest =
pipeline
.apply(Create.of(tablesAndManifests))
.apply(Combine.globally(new CreateDatabaseManifest()));
// The output JSON may contain the tables in any order, so a string comparison is not
// sufficient. Have to convert the manifest string to a protobuf. Also for the checker function
// to be serializable, it has to be written as a lambda.
PAssert.thatSingleton(databaseManifest)
.satisfies( // Checker function.
(SerializableFunction<String, Void>)
input -> {
Builder builder1 = Export.newBuilder();
try {
JsonFormat.parser().merge(input, builder1);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
Export manifestProto = builder1.build();
assertThat(manifestProto.getTablesCount(), is(2));
String table1Name = manifestProto.getTables(0).getName();
assertThat(table1Name, startsWith("table"));
assertThat(
manifestProto.getTables(0).getManifestFile(),
is(table1Name + "-manifest.json"));
return null;
});
pipeline.run();
}
/** Creates a simple pipeline with a {@link Combine.GroupedValues}. */
private static TestPipeline createCombineGroupedValuesPipeline() {
TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
PCollection<KV<String, Integer>> input =
pipeline
.apply(Create.of(KV.of("key", 1)))
.setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
input.apply(GroupByKey.create()).apply(Combine.groupedValues(new SumCombineFn()));
return pipeline;
}
@Override
public PCollection<TableRow> expand(PCollection<KV<String, LaneInfo>> flowInfo) {
// stationId, LaneInfo => stationId + max lane flow info
PCollection<KV<String, LaneInfo>> flowMaxes = flowInfo.apply(Combine.perKey(new MaxFlow()));
// <stationId, max lane flow info>... => row...
PCollection<TableRow> results = flowMaxes.apply(ParDo.of(new FormatMaxesFn()));
return results;
}
protected SamzaAccumulatorCombiningState(
StateNamespace namespace,
StateTag<? extends State> address,
Coder<AccumT> coder,
Combine.CombineFn<InT, AccumT, OutT> combineFn) {
super(namespace, address, coder);
this.combineFn = combineFn;
}
private static boolean canTranslate(String urn, PTransform<?, ?> transform) {
if (!TRANSLATORS.containsKey(urn)) {
return false;
} else if (urn.equals(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN)) {
// According to BEAM, Combines with side inputs are translated as generic composites
return ((Combine.PerKey) transform).getSideInputs().isEmpty();
} else {
return true;
}
}
static PCollection<Long> applyTransform(PCollection<String> events) {
return events
.apply(
Window.<String>into(FixedWindows.of(Duration.standardDays(1)))
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane()))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes())
.apply(Combine.globally(Count.<String>combineFn()).withoutDefaults());
}
static PCollection<Long> applyTransform(PCollection<String> events) {
return events
.apply(
Window.<String>into(FixedWindows.of(Duration.standardSeconds(5)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
.apply(Combine.globally(Count.<String>combineFn()).withoutDefaults());
}
FlinkKeyedCombiningState(
OperatorStateBackend flinkStateBackend,
StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
Coder<AccumT> accumCoder,
FlinkBroadcastStateInternals<K2> flinkStateInternals) {
super(flinkStateBackend, address.getId(), namespace, accumCoder);
this.namespace = namespace;
this.address = address;
this.combineFn = combineFn;
this.flinkStateInternals = flinkStateInternals;
}
@Override
public PCollection<Sketch<InputT>> expand(PCollection<InputT> input) {
return input.apply(
"Compute Count-Min Sketch",
Combine.globally(
CountMinSketchFn.create(input.getCoder())
.withAccuracy(relativeError(), confidence())));
}
@Override
public PCollection<KV<K, Sketch<V>>> expand(PCollection<KV<K, V>> input) {
KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();
return input.apply(
"Compute Count-Min Sketch perKey",
Combine.perKey(
CountMinSketchFn.create(inputCoder.getValueCoder())
.withAccuracy(relativeError(), confidence())));
}
@Override
public PCollection<Long> expand(PCollection<InputT> input) {
return input
.apply(
"Compute HyperLogLog Structure",
Combine.globally(
ApproximateDistinctFn.create(input.getCoder())
.withPrecision(this.precision())
.withSparseRepresentation(this.sparsePrecision())))
.apply("Retrieve Cardinality", ParDo.of(RetrieveCardinality.globally()));
}
@Override
public PCollection<?> expand(PCollection<T> input) {
input = input.apply(Combine.globally(combineFn).withoutDefaults().withFanout(fanout));
@SuppressWarnings("unchecked")
Coder<BoundedWindow> windowCoder =
(Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder();
return BatchViewAsSingleton.applyForSingleton(
runner,
input,
new IsmRecordForSingularValuePerWindowDoFn<>(windowCoder),
input.getCoder(),
view);
}