下面列出了怎么用org.apache.beam.sdk.transforms.windowing.FixedWindows的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testIncompatibleWindowFnPropagationFailure() {
p.enableAbandonedNodeEnforcement(false);
PCollection<String> input1 =
p.apply("CreateInput1", Create.of("Input1"))
.apply("Window1", Window.into(FixedWindows.of(Duration.standardMinutes(1))));
PCollection<String> input2 =
p.apply("CreateInput2", Create.of("Input2"))
.apply("Window2", Window.into(FixedWindows.of(Duration.standardMinutes(2))));
try {
PCollectionList.of(input1).and(input2).apply(Flatten.pCollections());
Assert.fail("Exception should have been thrown");
} catch (IllegalStateException e) {
Assert.assertTrue(
e.getMessage().startsWith("Inputs to Flatten had incompatible window windowFns"));
}
}
/** Test a WriteFiles with a windowed PCollection. */
@Test
@Category(NeedsRunner.class)
public void testWriteWindowed() throws IOException {
List<String> inputs =
Arrays.asList(
"Critical canary",
"Apprehensive eagle",
"Intimidating pigeon",
"Pedantic gull",
"Frisky finch");
runWrite(
inputs,
new WindowAndReshuffle<>(Window.into(FixedWindows.of(Duration.millis(2)))),
getBaseOutputFilename(),
WriteFiles.to(makeSimpleSink()));
}
@Test
public void testOnlyT1ShouldFireFixedWindows() throws Exception {
tester =
TriggerStateMachineTester.forTrigger(
AfterFirstStateMachine.of(mockTrigger1, mockTrigger2),
FixedWindows.of(Duration.millis(10)));
tester.injectElements(1);
IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11));
when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(true);
when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(false);
assertTrue(tester.shouldFire(window)); // should fire
tester.fireIfShouldFire(window);
assertTrue(tester.isMarkedFinished(window));
}
public static void main(String[] args) throws Exception {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(Options.class);
options.setRunner(FlinkRunner.class);
Pipeline p = Pipeline.create(options);
KafkaIO.Read<byte[], String> kafkaIOReader = KafkaIO.read()
.withBootstrapServers("192.168.99.100:32771")
.withTopics(Arrays.asList("beam".split(",")))
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))
.withValueCoder(StringUtf8Coder.of());
p.apply(kafkaIOReader.withoutMetadata())
.apply(Values.<String>create())
.apply(Window.<String>into(
FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.Write.to(options.getOutput()));
p.run();
}
@Override
public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> infos) {
return infos
.apply(
"LeaderboardTeamFixedWindows",
Window.<GameActionInfo>into(FixedWindows.of(teamWindowDuration))
// We will get early (speculative) results as well as cumulative
// processing of late data.
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(FIVE_MINUTES))
.withLateFirings(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(TEN_MINUTES)))
.withAllowedLateness(allowedLateness)
.accumulatingFiredPanes())
// Extract and sum teamname/score pairs from the event data.
.apply("ExtractTeamScore", new ExtractAndSumScore("team"));
}
/**
* Tests that when two elements are combined via a GroupByKey their output timestamp agrees with
* the windowing function customized to actually be the same as the default, the earlier of the
* two values.
*/
@Test
@Category(ValidatesRunner.class)
public void testTimestampCombinerEarliest() {
p.apply(
Create.timestamped(
TimestampedValue.of(KV.of(0, "hello"), new Instant(0)),
TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10))))
.apply(
Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10)))
.withTimestampCombiner(TimestampCombiner.EARLIEST))
.apply(GroupByKey.create())
.apply(ParDo.of(new AssertTimestamp(new Instant(0))));
p.run();
}
@Test
public void testCompositePipeline() {
p.apply(Create.timestamped(TimestampedValue.of(KV.of(1, 1), new Instant(1))))
.apply(Window.into(FixedWindows.of(Duration.millis(10))))
.apply(Sum.integersPerKey());
assertEquals(
"digraph {"
+ " rankdir=LR"
+ " 0 [label=\"Create.TimestampedValues\\n\"]"
+ " 1 [label=\"Window.Into()\\n\"]"
+ " 0 -> 1 [style=solid label=\"Create.TimestampedValues/ParDo(ConvertTimestamps)/ParMultiDo(ConvertTimestamps).output\"]"
+ " 2 [label=\"Combine.perKey(SumInteger)\\nbeam:transform:combine_per_key:v1\"]"
+ " 1 -> 2 [style=solid label=\"Window.Into()/Window.Assign.out\"]"
+ "}",
PortablePipelineDotRenderer.toDotString(PipelineTranslation.toProto(p))
.replaceAll(System.lineSeparator(), ""));
}
@Test
public void windowingSection() {
PCollection<Integer> input =
pipeline.apply(Create.of(1, 2, 3, 4)).setTypeDescriptor(TypeDescriptors.integers());
PCollection<KV<Integer, Long>> countedElements =
CountByKey.of(input)
.keyBy(e -> e)
.windowBy(FixedWindows.of(Duration.standardSeconds(1)))
.triggeredBy(DefaultTrigger.of())
.discardingFiredPanes()
.withAllowedLateness(Duration.standardSeconds(5))
.withOnTimeBehavior(OnTimeBehavior.FIRE_IF_NON_EMPTY)
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.output();
pipeline.run();
}
@Test
public void testBuild_Windowing() {
final PCollection<String> dataset = TestUtils.createMockDataset(TypeDescriptors.strings());
final PCollection<KV<String, Long>> reduced =
ReduceByKey.of(dataset)
.keyBy(s -> s)
.valueBy(s -> 1L)
.combineBy(Sums.ofLongs())
.windowBy(FixedWindows.of(Duration.standardHours(1)))
.triggeredBy(DefaultTrigger.of())
.accumulationMode(AccumulationMode.DISCARDING_FIRED_PANES)
.output();
final ReduceByKey reduce = (ReduceByKey) TestUtils.getProducer(reduced);
assertTrue(reduce.getWindow().isPresent());
@SuppressWarnings("unchecked")
final Window<? extends BoundedWindow> window = (Window) reduce.getWindow().get();
assertEquals(FixedWindows.of(org.joda.time.Duration.standardHours(1)), window.getWindowFn());
assertEquals(DefaultTrigger.of(), WindowDesc.of(window).getTrigger());
assertSame(
AccumulationMode.DISCARDING_FIRED_PANES, WindowDesc.of(window).getAccumulationMode());
assertFalse(reduce.getValueComparator().isPresent());
}
@Test
public void testBuild_sortedValues() {
final PCollection<String> dataset = TestUtils.createMockDataset(TypeDescriptors.strings());
final PCollection<KV<String, List<Long>>> reduced =
ReduceByKey.of(dataset)
.keyBy(s -> s)
.valueBy(s -> 1L)
.reduceBy(s -> s.collect(Collectors.toList()))
.withSortedValues(Long::compare)
.windowBy(FixedWindows.of(Duration.standardHours(1)))
.triggeredBy(DefaultTrigger.of())
.accumulationMode(AccumulationMode.DISCARDING_FIRED_PANES)
.output();
final ReduceByKey reduce = (ReduceByKey) TestUtils.getProducer(reduced);
assertTrue(reduce.getValueComparator().isPresent());
}
@Test
public void testBuild_Windowing() {
final PCollection<String> dataset = TestUtils.createMockDataset(TypeDescriptors.strings());
final PCollection<KV<String, Long>> counted =
SumByKey.of(dataset)
.keyBy(s -> s)
.valueBy(s -> 1L)
.windowBy(FixedWindows.of(org.joda.time.Duration.standardHours(1)))
.triggeredBy(DefaultTrigger.of())
.discardingFiredPanes()
.withAllowedLateness(Duration.millis(1000))
.output();
final SumByKey sum = (SumByKey) TestUtils.getProducer(counted);
assertTrue(sum.getWindow().isPresent());
@SuppressWarnings("unchecked")
final WindowDesc<?> windowDesc = WindowDesc.of((Window) sum.getWindow().get());
assertEquals(
FixedWindows.of(org.joda.time.Duration.standardHours(1)), windowDesc.getWindowFn());
assertEquals(DefaultTrigger.of(), windowDesc.getTrigger());
assertEquals(AccumulationMode.DISCARDING_FIRED_PANES, windowDesc.getAccumulationMode());
assertEquals(Duration.millis(1000), windowDesc.getAllowedLateness());
}
@Test
public void garbageCollectionTimeAfterEndOfGlobalWindowWithLateness() {
FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(5));
Duration allowedLateness = Duration.millis(Long.MAX_VALUE);
WindowingStrategy<?, ?> strategy =
WindowingStrategy.globalDefault()
.withWindowFn(windowFn)
.withAllowedLateness(allowedLateness);
IntervalWindow window = windowFn.assignWindow(new Instant(-100));
assertThat(
window.maxTimestamp().plus(allowedLateness),
Matchers.greaterThan(GlobalWindow.INSTANCE.maxTimestamp()));
assertThat(
LateDataUtils.garbageCollectionTime(window, strategy),
equalTo(GlobalWindow.INSTANCE.maxTimestamp()));
}
/**
* When the watermark passes the end-of-window and window expiration time in a single update, this
* tests that it does not crash.
*/
@Test
public void testFixedWindowsEowAndGcTogether() throws Exception {
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
ReduceFnTester.nonCombining(
FixedWindows.of(Duration.millis(10)),
DefaultTriggerStateMachine.of(),
AccumulationMode.ACCUMULATING_FIRED_PANES,
Duration.millis(50),
ClosingBehavior.FIRE_ALWAYS);
tester.setAutoAdvanceOutputWatermark(true);
tester.advanceInputWatermark(new Instant(0));
injectElement(tester, 1);
tester.advanceInputWatermark(new Instant(100));
assertThat(
tester.extractOutput(),
contains(
isSingleWindowedValue(
contains(1), 1, 0, 10, PaneInfo.createPane(true, true, Timing.ON_TIME))));
}
@Test
@Category(ValidatesRunner.class)
public void testReshuffleAfterSlidingWindowsAndGroupByKey() {
PCollection<KV<String, Iterable<Integer>>> input =
pipeline
.apply(
Create.of(GBK_TESTABLE_KVS)
.withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(10L))))
.apply(GroupByKey.create());
PCollection<KV<String, Iterable<Integer>>> output = input.apply(Reshuffle.of());
PAssert.that(output).satisfies(new AssertThatHasExpectedContents());
assertEquals(input.getWindowingStrategy(), output.getWindowingStrategy());
pipeline.run();
}
@Test
@Category(NeedsRunner.class)
public void testWriteNoSpilling() throws IOException {
List<String> inputs = Lists.newArrayList();
for (int i = 0; i < 100; ++i) {
inputs.add("mambo_number_" + i);
}
runWrite(
inputs,
Window.into(FixedWindows.of(Duration.millis(1))),
getBaseOutputFilename(),
WriteFiles.to(makeSimpleSink())
.withMaxNumWritersPerBundle(2)
.withWindowedWrites()
.withNoSpilling());
}
@Test
public void testJoinsUnboundedWithinWindowsWithDefaultTrigger() throws Exception {
String sql =
"SELECT o1.order_id, o1.price, o1.site_id, o2.order_id, o2.price, o2.site_id "
+ "FROM ORDER_DETAILS1 o1"
+ " JOIN ORDER_DETAILS2 o2"
+ " on "
+ " o1.order_id=o2.site_id AND o2.price=o1.site_id";
PCollection<Row> orders =
ordersUnbounded()
.apply("window", Window.into(FixedWindows.of(Duration.standardSeconds(50))));
PCollectionTuple inputs = tuple("ORDER_DETAILS1", orders, "ORDER_DETAILS2", orders);
PAssert.that(inputs.apply("sql", SqlTransform.query(sql)))
.containsInAnyOrder(
TestUtils.RowsBuilder.of(RESULT_ROW_TYPE)
.addRows(1, 2, 2, 2, 2, 1, 1, 4, 3, 3, 3, 1)
.getRows());
pipeline.run();
}
@Test
public void testEarlyAndAtWatermark() throws Exception {
tester =
TriggerStateMachineTester.forTrigger(
AfterWatermarkStateMachine.pastEndOfWindow().withEarlyFirings(mockEarly),
FixedWindows.of(Duration.millis(100)));
injectElements(1);
IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
testRunningAsTrigger(mockEarly, window);
// Fire due to watermark
when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
tester.advanceInputWatermark(new Instant(100));
assertTrue(tester.shouldFire(window));
tester.fireIfShouldFire(window);
assertTrue(tester.isMarkedFinished(window));
}
@Test
@Category(NeedsRunner.class)
public void testIdentityWindowFnPropagation() {
List<KV<String, Integer>> ungroupedPairs = Arrays.asList();
PCollection<KV<String, Integer>> input =
p.apply(
Create.of(ungroupedPairs)
.withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))));
PCollection<KV<String, Iterable<Integer>>> output = input.apply(GroupByKey.create());
p.run();
Assert.assertTrue(
output
.getWindowingStrategy()
.getWindowFn()
.isCompatible(FixedWindows.of(Duration.standardMinutes(1))));
}
@Parameters(name = "{index}: {0}")
public static Iterable<WindowFn<?, ?>> data() {
// This pipeline exists for construction, not to run any test.
return ImmutableList.<WindowFn<?, ?>>builder()
.add(FixedWindows.of(Duration.standardMinutes(10L)))
.add(new GlobalWindows())
.add(Sessions.withGapDuration(Duration.standardMinutes(15L)))
.add(SlidingWindows.of(Duration.standardMinutes(5L)).every(Duration.standardMinutes(1L)))
.add(new CustomWindows())
.build();
}
@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
return input
.apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES)))
.apply(Sum.integersPerKey())
.apply(ParDo.of(new FormatAsStrings()));
}
/**
* Tests that the given GABW implementation correctly groups elements that fall into overlapping
* windows that are not merged.
*/
public static void groupsIntoOverlappingNonmergingWindows(
GroupAlsoByWindowDoFnFactory<String, String, Iterable<String>> gabwFactory) throws Exception {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(FixedWindows.of(Duration.millis(10)));
List<WindowedValue<KV<String, Iterable<String>>>> result =
runGABW(
gabwFactory,
windowingStrategy,
"key",
WindowedValue.of("v1", new Instant(1), Arrays.asList(window(0, 5)), PaneInfo.NO_FIRING),
WindowedValue.of("v2", new Instant(4), Arrays.asList(window(1, 5)), PaneInfo.NO_FIRING),
WindowedValue.of(
"v3", new Instant(4), Arrays.asList(window(0, 5)), PaneInfo.NO_FIRING));
assertThat(result, hasSize(2));
TimestampedValue<KV<String, Iterable<String>>> item0 =
getOnlyElementInWindow(result, window(0, 5));
assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v3"));
assertThat(item0.getTimestamp(), equalTo(window(1, 5).maxTimestamp()));
TimestampedValue<KV<String, Iterable<String>>> item1 =
getOnlyElementInWindow(result, window(1, 5));
assertThat(item1.getValue().getValue(), contains("v2"));
assertThat(item1.getTimestamp(), equalTo(window(0, 5).maxTimestamp()));
}
@Before
public void setUp() {
PCollection<String> pc = Pipeline.create().apply(Create.of("1"));
view1 =
pc.apply(Window.into(FixedWindows.of(new Duration(WINDOW_MSECS_1))))
.apply(View.asIterable());
view2 =
pc.apply(Window.into(FixedWindows.of(new Duration(WINDOW_MSECS_2))))
.apply(View.asIterable());
}
@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
return input
.apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES)))
.apply(Sum.integersPerKey())
.apply(ParDo.of(new FormatAsStrings()));
}
@Override
public PCollection<Bid> expand(PCollection<Event> events) {
// Window the bids.
PCollection<Bid> slidingBids =
events
.apply(NexmarkQueryUtil.JUST_BIDS)
.apply(
Window.into(
FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))));
// Find the largest price in all bids.
// NOTE: It would be more efficient to write this query much as we did for Query5, using
// a binary combiner to accumulate the bids with maximal price. As written this query
// requires an additional scan per window, with the associated cost of snapshotted state and
// its I/O. We'll keep this implementation since it illustrates the use of side inputs.
final PCollectionView<Long> maxPriceView =
slidingBids
.apply("BidToPrice", NexmarkQueryUtil.BID_TO_PRICE)
.apply(Max.longsGlobally().withFanout(configuration.fanout).asSingletonView());
return slidingBids
// Select all bids which have that maximum price (there may be more than one).
.apply(
name + ".Select",
ParDo.of(
new DoFn<Bid, Bid>() {
@ProcessElement
public void processElement(ProcessContext c) {
long maxPrice = c.sideInput(maxPriceView);
Bid bid = c.element();
if (bid.price == maxPrice) {
c.output(bid);
}
}
})
.withSideInputs(maxPriceView));
}
/** Tests that at least one result is emitted per element written in each window. */
@Test
public void testWritingEmitsResultsWhenDoneInFixedWindow() throws Exception {
final String table = "table";
final String key = "key";
final String value = "value";
service.createTable(table);
Instant elementTimestamp = Instant.parse("2019-06-10T00:00:00");
Duration windowDuration = Duration.standardMinutes(1);
TestStream<Integer> input =
TestStream.create(VarIntCoder.of())
.advanceWatermarkTo(elementTimestamp)
.addElements(1)
.advanceWatermarkTo(elementTimestamp.plus(windowDuration))
.addElements(2)
.advanceWatermarkToInfinity();
BoundedWindow expectedFirstWindow = new IntervalWindow(elementTimestamp, windowDuration);
BoundedWindow expectedSecondWindow =
new IntervalWindow(elementTimestamp.plus(windowDuration), windowDuration);
PCollection<BigtableWriteResult> results =
p.apply("rows", input)
.apply("window", Window.into(FixedWindows.of(windowDuration)))
.apply("expand", ParDo.of(new WriteGeneratorDoFn()))
.apply("write", defaultWrite.withTableId(table).withWriteResults());
PAssert.that(results)
.inWindow(expectedFirstWindow)
.containsInAnyOrder(BigtableWriteResult.create(1));
PAssert.that(results)
.inWindow(expectedSecondWindow)
.containsInAnyOrder(BigtableWriteResult.create(2));
p.run();
}
@Test
public void testGlobalWindowErrorMessageShows() {
PCollection<String> input = p.apply(Create.of(NO_LINES).withCoder(StringUtf8Coder.of()));
PCollection<String> windowed =
input.apply(Window.into(FixedWindows.of(Duration.standardDays(1))));
String expected = Count.combineFn().getIncompatibleGlobalWindowErrorMessage();
exceptionRule.expect(IllegalStateException.class);
exceptionRule.expectMessage(expected);
windowed.apply(Count.globally());
}
@Test
public void testOnElementCombiningAccumulating() throws Exception {
// Test basic execution of a trigger using a non-combining window set and accumulating mode.
WindowingStrategy<?, IntervalWindow> strategy =
WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100));
ReduceFnTester<Integer, Integer, IntervalWindow> tester =
ReduceFnTester.combining(
strategy, mockTriggerStateMachine, Sum.ofIntegers(), VarIntCoder.of());
injectElement(tester, 1);
when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
injectElement(tester, 2);
when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
triggerShouldFinish(mockTriggerStateMachine);
injectElement(tester, 3);
// This element shouldn't be seen, because the trigger has finished
injectElement(tester, 4);
assertThat(
tester.extractOutput(),
contains(
isSingleWindowedValue(equalTo(3), 1, 0, 10),
isSingleWindowedValue(equalTo(6), 3, 0, 10)));
assertTrue(tester.isMarkedFinished(firstWindow));
tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
}
/**
* Main function for the MR BEAM program.
*
* @param args arguments.
*/
public static void main(final String[] args) {
final String outputFilePath = args[0];
final String windowType = args[1];
final Window<KV<String, Long>> windowFn;
if (windowType.equals("fixed")) {
windowFn = Window.<KV<String, Long>>into(FixedWindows.of(Duration.standardSeconds(5)));
} else {
windowFn = Window.<KV<String, Long>>into(SlidingWindows.of(Duration.standardSeconds(10))
.every(Duration.standardSeconds(5)));
}
final PipelineOptions options = NemoPipelineOptionsFactory.create();
options.setJobName("WindowedWordCount");
final Pipeline p = Pipeline.create(options);
getSource(p, args)
.apply(windowFn)
.apply(Sum.longsPerKey())
.apply(MapElements.<KV<String, Long>, String>via(new SimpleFunction<KV<String, Long>, String>() {
@Override
public String apply(final KV<String, Long> kv) {
return kv.getKey() + ": " + kv.getValue();
}
}))
.apply(new WriteOneFilePerWindow(outputFilePath, 1));
p.run().waitUntilFinish();
}
@Test
@Category(ValidatesRunner.class)
public void testWindowedSideInputFixedToGlobal() {
final PCollectionView<Integer> view =
pipeline
.apply(
"CreateSideInput",
Create.timestamped(
TimestampedValue.of(1, new Instant(1)),
TimestampedValue.of(2, new Instant(11)),
TimestampedValue.of(3, new Instant(13))))
.apply("WindowSideInput", Window.into(new GlobalWindows()))
.apply(Sum.integersGlobally())
.apply(View.asSingleton());
PCollection<String> output =
pipeline
.apply(
"CreateMainInput",
Create.timestamped(
TimestampedValue.of("A", new Instant(4)),
TimestampedValue.of("B", new Instant(15)),
TimestampedValue.of("C", new Instant(7))))
.apply("WindowMainInput", Window.into(FixedWindows.of(Duration.millis(10))))
.apply(
"OutputMainAndSideInputs",
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element() + c.sideInput(view));
}
})
.withSideInputs(view));
PAssert.that(output).containsInAnyOrder("A6", "B6", "C6");
pipeline.run();
}
@Override
public WithFailures.Result<PDone, PubsubMessage> expand(PCollection<PubsubMessage> input) {
ValueProvider<DynamicPathTemplate> pathTemplate = NestedValueProvider.of(outputPrefix,
DynamicPathTemplate::new);
ValueProvider<String> staticPrefix = NestedValueProvider.of(pathTemplate,
value -> value.staticPrefix);
FileIO.Write<List<String>, PubsubMessage> write = FileIO
.<List<String>, PubsubMessage>writeDynamic()
// We can't pass the attribute map to by() directly since MapCoder isn't
// deterministic;
// instead, we extract an ordered list of the needed placeholder values.
// That list is later available to withNaming() to determine output location.
.by(message -> pathTemplate.get()
.extractValuesFrom(DerivedAttributesMap.of(message.getAttributeMap())))
.withDestinationCoder(ListCoder.of(StringUtf8Coder.of())) //
.withCompression(compression) //
.via(Contextful.fn(format::encodeSingleMessage), TextIO.sink()) //
.to(staticPrefix) //
.withNaming(placeholderValues -> NoColonFileNaming.defaultNaming(
pathTemplate.get().replaceDynamicPart(placeholderValues), format.suffix()));
if (inputType == InputType.pubsub) {
// Passing a ValueProvider to withNumShards disables runner-determined sharding, so we
// need to be careful to pass this only for streaming input (where runner-determined
// sharding is not an option).
write = write.withNumShards(numShards);
}
input //
.apply(Window.<PubsubMessage>into(FixedWindows.of(windowDuration))
// We allow lateness up to the maximum Cloud Pub/Sub retention of 7 days documented in
// https://cloud.google.com/pubsub/docs/subscriber
.withAllowedLateness(Duration.standardDays(7)) //
.discardingFiredPanes())
.apply(write);
return WithFailures.Result.of(PDone.in(input.getPipeline()),
EmptyErrors.in(input.getPipeline()));
}