下面列出了org.joda.time.format.ISOPeriodFormat#org.joda.time.Instant 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public FlinkWatermarkHoldState(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
MapStateDescriptor<String, Instant> watermarkHoldStateDescriptor,
String stateId,
StateNamespace namespace,
TimestampCombiner timestampCombiner) {
this.timestampCombiner = timestampCombiner;
// Combines StateNamespace and stateId to generate a unique namespace for
// watermarkHoldsState. We do not want to use Flink's namespacing to be
// able to recover watermark holds efficiently during recovery.
this.namespaceString = namespace.stringKey() + stateId;
try {
this.watermarkHoldsState =
flinkStateBackend.getPartitionedState(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE,
watermarkHoldStateDescriptor);
} catch (Exception e) {
throw new RuntimeException("Could not access state for watermark partition view");
}
}
public DetectionConfigDTO maintain(DetectionConfigDTO config, Instant timestamp) {
Preconditions.checkArgument(!Objects.isNull(config.getComponents()) && !config.getComponents().isEmpty(), "Components not initialized");
if (isTunable(config)) {
// if the pipeline is tunable, get the model evaluators
Collection<? extends ModelEvaluator<? extends AbstractSpec>> modelEvaluators = getModelEvaluators(config);
// check the status for model evaluators
for (ModelEvaluator<? extends AbstractSpec> modelEvaluator : modelEvaluators) {
// if returns bad model status, trigger model tuning
if (modelEvaluator.evaluateModel(timestamp).getStatus().equals(ModelStatus.BAD)) {
LOG.info("Status for detection pipeline {} is {}, re-tuning", config.getId(), ModelStatus.BAD.toString());
detectionRetuneCounter.inc();
DetectionConfigTuner detectionConfigTuner = new DetectionConfigTuner(config, provider);
config = detectionConfigTuner.tune(timestamp.toDateTime().minusDays(DEFAULT_TUNING_WINDOW_DAYS).getMillis(),
timestamp.getMillis());
config.setLastTuningTimestamp(timestamp.getMillis());
break;
}
}
}
return config;
}
@Test
public void testAfterWatermarkProgram() throws Exception {
WindowingStrategy strategy = fixedWindowWithAfterWatermarkTriggerStrategy;
long initialTime = 0L;
OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
createTestingOperatorAndState(strategy, initialTime);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6),
new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 1));
expectedOutput.add(new Watermark(initialTime + 10000));
expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11),
new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 10000));
expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
expectedOutput.add(new Watermark(initialTime + 20000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
testHarness.close();
}
private boolean flushTimers(long watermark) {
if (timerInternals.currentInputWatermarkTime().isBefore(watermark)) {
try {
Instant watermarkInstant = new Instant(watermark);
timerInternals.advanceInputWatermark(watermarkInstant);
if (watermarkInstant.equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
timerInternals.advanceProcessingTime(watermarkInstant);
timerInternals.advanceSynchronizedProcessingTime(watermarkInstant);
}
fireEligibleTimers(timerInternals);
} catch (Exception e) {
throw new RuntimeException("Failed advancing processing time", e);
}
}
return outputManager.tryFlush();
}
@Test
public void processElementSideInputReadyAllWindows() {
when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
.thenReturn(true);
ImmutableList<PCollectionView<?>> views = ImmutableList.of(singletonView);
SimplePushbackSideInputDoFnRunner<Integer, Integer> runner = createRunner(views);
WindowedValue<Integer> multiWindow =
WindowedValue.of(
2,
new Instant(-2),
ImmutableList.of(
new IntervalWindow(new Instant(-500L), new Instant(0L)),
new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
GlobalWindow.INSTANCE),
PaneInfo.ON_TIME_AND_ONLY_FIRING);
Iterable<WindowedValue<Integer>> multiWindowPushback =
runner.processElementInReadyWindows(multiWindow);
assertThat(multiWindowPushback, emptyIterable());
assertThat(
underlying.inputElems,
containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray()));
}
@Test
public void testDefaultWindowMappingFn() {
PartitioningWindowFn<?, ?> windowFn = FixedWindows.of(Duration.standardMinutes(20L));
WindowMappingFn<?> mapping = windowFn.getDefaultWindowMappingFn();
assertThat(
mapping.getSideInputWindow(
new BoundedWindow() {
@Override
public Instant maxTimestamp() {
return new Instant(100L);
}
}),
equalTo(
new IntervalWindow(
new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(20L)))));
assertThat(mapping.maximumLookback(), equalTo(Duration.ZERO));
}
@Test
public void finalPane() {
SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, Iterable<Integer>> extractor =
PaneExtractors.finalPane();
Iterable<ValueInSingleWindow<Integer>> onlyOnTime =
ImmutableList.of(
ValueInSingleWindow.of(
8,
new Instant(0L),
GlobalWindow.INSTANCE,
PaneInfo.createPane(false, true, Timing.LATE, 2L, 1L)),
ValueInSingleWindow.of(
4,
new Instant(0L),
GlobalWindow.INSTANCE,
PaneInfo.createPane(false, false, Timing.ON_TIME, 1L, 0L)),
ValueInSingleWindow.of(
1,
new Instant(0L),
GlobalWindow.INSTANCE,
PaneInfo.createPane(true, false, Timing.EARLY)));
assertThat(extractor.apply(onlyOnTime), containsInAnyOrder(8));
}
@Test
public void peekValuesInWindow() throws Exception {
try (DoFnTester<Long, String> tester = DoFnTester.of(new CounterDoFn())) {
tester.startBundle();
tester.processElement(1L);
tester.processElement(2L);
tester.finishBundle();
assertThat(
tester.peekOutputElementsInWindow(GlobalWindow.INSTANCE),
containsInAnyOrder(
TimestampedValue.of("1", new Instant(1000L)),
TimestampedValue.of("2", new Instant(2000L))));
assertThat(
tester.peekOutputElementsInWindow(new IntervalWindow(new Instant(0L), new Instant(10L))),
Matchers.emptyIterable());
}
}
@Override
public Instant decode(InputStream inStream) throws CoderException, IOException {
long shiftedMillis;
try {
shiftedMillis = new DataInputStream(inStream).readLong();
} catch (EOFException | UTFDataFormatException exn) {
// These exceptions correspond to decoding problems, so change
// what kind of exception they're branded as.
throw new CoderException(exn);
}
// Produces an {@link Instant} from a {@code long} representing its millis-since-epoch,
// but shifted so that the byte representation of negative values are lexicographically
// ordered before the byte representation of positive values.
//
// This deliberately utilizes the well-defined overflow for {@code long} values.
// See http://docs.oracle.com/javase/specs/jls/se7/html/jls-15.html#jls-15.18.2
return new Instant(shiftedMillis + Long.MIN_VALUE);
}
private synchronized Map<StructuralKey<?>, List<TimerData>> extractFiredDomainTimers(
TimeDomain domain, Instant firingTime) {
Map<StructuralKey<?>, List<TimerData>> firedTimers;
switch (domain) {
case PROCESSING_TIME:
firedTimers = extractFiredTimers(firingTime, processingTimers);
break;
case SYNCHRONIZED_PROCESSING_TIME:
firedTimers =
extractFiredTimers(
INSTANT_ORDERING.min(firingTime, earliestHold.get()),
synchronizedProcessingTimers);
break;
default:
throw new IllegalArgumentException(
"Called getFiredTimers on a Synchronized Processing Time watermark"
+ " and gave a non-processing time domain "
+ domain);
}
for (Map.Entry<StructuralKey<?>, ? extends Collection<TimerData>> firedTimer :
firedTimers.entrySet()) {
pendingTimers.addAll(firedTimer.getValue());
}
return firedTimers;
}
@Test
public void nonLatePanesSingleEarly() {
SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, Iterable<Integer>> extractor =
PaneExtractors.nonLatePanes();
Iterable<ValueInSingleWindow<Integer>> onlyOnTime =
ImmutableList.of(
ValueInSingleWindow.of(
8,
new Instant(0L),
GlobalWindow.INSTANCE,
PaneInfo.createPane(true, false, Timing.EARLY)),
ValueInSingleWindow.of(
4,
new Instant(0L),
GlobalWindow.INSTANCE,
PaneInfo.createPane(true, false, Timing.EARLY)));
assertThat(extractor.apply(onlyOnTime), containsInAnyOrder(4, 8));
}
@Override
public void emit(final WindowedValue<KV<K, Iterable<InputT>>> output) {
// The watermark advances only in ON_TIME
if (output.getPane().getTiming().equals(PaneInfo.Timing.ON_TIME)) {
final K key = output.getValue().getKey();
final InMemoryTimerInternals timerInternals = (InMemoryTimerInternals)
inMemoryTimerInternalsFactory.timerInternalsForKey(key);
keyAndWatermarkHoldMap.put(key,
// adds the output timestamp to the watermark hold of each key
// +1 to the output timestamp because if the window is [0-5000), the timestamp is 4999
new Watermark(output.getTimestamp().getMillis() + 1));
timerInternals.advanceOutputWatermark(new Instant(output.getTimestamp().getMillis() + 1));
}
outputCollector.emit(output);
}
@ProcessElement
public void processElement(ProcessContext c) {
WebresourceSocialCount sc = c.element();
Instant countTime = new Instant(sc.countTime);
TableRow row = new TableRow()
.set("WebResourceHash", sc.webResourceHash)
.set("WrPublicationDateId", sc.wrPublicationDateId)
.set("CountTime", countTime.toString())
.set("DocumentCollectionId", sc.documentCollectionId)
.set("CollectionItemId", sc.collectionItemId)
.set("FbCount", sc.fbCount)
.set("TwCount", sc.twCount);
c.output(row);
}
@Override
public void set(Instant absoluteTime) {
// Verifies that the time domain of this timer is acceptable for absolute timers.
if (!TimeDomain.EVENT_TIME.equals(timeDomain)) {
throw new IllegalArgumentException(
"Can only set relative timers in processing time domain. Use #setRelative()");
}
// Ensures that the target time is reasonable. For event time timers this means that the time
// should be prior to window GC time.
if (TimeDomain.EVENT_TIME.equals(timeDomain)) {
Instant windowExpiry = LateDataUtils.garbageCollectionTime(currentWindow, allowedLateness);
checkArgument(
!absoluteTime.isAfter(windowExpiry),
"Attempted to set event time timer for %s but that is after"
+ " the expiration of window %s",
absoluteTime,
windowExpiry);
}
output(absoluteTime);
}
@Override
public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<K, V> record) {
Instant ts = timestampFunction.apply(record);
if (ts.isAfter(maxEventTimestamp)) {
maxEventTimestamp = ts;
}
return ts;
}
@Test
public void testValues() {
Instant now = Instant.now();
TimestampedValue<String> tsv = TimestampedValue.of("foobar", now);
assertEquals(now, tsv.getTimestamp());
assertEquals("foobar", tsv.getValue());
}
@SchemaCreate
static SimpleAutoValueWithStaticFactory create(
String str,
byte aByte,
short aShort,
int anInt,
long aLong,
boolean aBoolean,
DateTime dateTime,
byte[] bytes,
ByteBuffer byteBuffer,
Instant instant,
BigDecimal bigDecimal,
StringBuilder stringBuilder) {
return new AutoValue_AutoValueSchemaTest_SimpleAutoValueWithStaticFactory(
str,
aByte,
aShort,
anInt,
aLong,
aBoolean,
dateTime,
bytes,
byteBuffer,
instant,
bigDecimal,
stringBuilder);
}
public TimeStampGenerator() {
this.eventTime = new Instant(0);
this.lastGap = new Instant(0);
this.outOfOrderProbability = 0;
this.minLateness = 0;
this.maxLateness = 0;
this.sessionPeriod = 0;
this.minGap = 0;
this.maxGap = 0;
}
@Override
public void onElement(OnElementContext c) throws Exception {
GroupingState<Instant, Instant> delayUntilState = c.state().access(DELAYED_UNTIL_TAG);
Instant oldDelayUntil = delayUntilState.read();
// Since processing time can only advance, resulting in target wake-up times we would
// ignore anyhow, we don't bother with it if it is already set.
if (oldDelayUntil != null) {
return;
}
Instant targetTimestamp = getTargetTimestamp(c);
delayUntilState.add(targetTimestamp);
c.setTimer(targetTimestamp, timeDomain);
}
@Test
public void testIsReady() {
SideInputHandler sideInputHandler =
new SideInputHandler(
ImmutableList.of(view1, view2), InMemoryStateInternals.<Void>forKey(null));
IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(WINDOW_MSECS_1));
IntervalWindow secondWindow = new IntervalWindow(new Instant(0), new Instant(WINDOW_MSECS_2));
// side input should not yet be ready
assertFalse(sideInputHandler.isReady(view1, firstWindow));
// add a value for view1
sideInputHandler.addSideInputValue(
view1,
valuesInWindow(
materializeValuesFor(view1.getPipeline().getOptions(), View.asIterable(), "Hello"),
new Instant(0),
firstWindow));
// now side input should be ready
assertTrue(sideInputHandler.isReady(view1, firstWindow));
// second window input should still not be ready
assertFalse(sideInputHandler.isReady(view1, secondWindow));
}
@Test
public void testFlattenInMemoryEvaluatorWithEmptyPCollectionList() throws Exception {
PCollectionList<Integer> list = PCollectionList.empty(p);
PCollection<Integer> flattened = list.apply(Flatten.pCollections());
flattened.setCoder(VarIntCoder.of());
EvaluationContext evaluationContext = mock(EvaluationContext.class);
when(evaluationContext.createBundle(flattened))
.thenReturn(bundleFactory.createBundle(flattened));
FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory(evaluationContext);
AppliedPTransform<?, ?, ?> flattendProducer = DirectGraphs.getProducer(flattened);
TransformEvaluator<Integer> emptyEvaluator =
factory.forApplication(
flattendProducer,
bundleFactory.createRootBundle().commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
TransformResult<Integer> leftSideResult = emptyEvaluator.finishBundle();
CommittedBundle<?> outputBundle =
Iterables.getOnlyElement(leftSideResult.getOutputBundles()).commit(Instant.now());
assertThat(outputBundle.getElements(), emptyIterable());
assertThat(
leftSideResult.getTransform(),
Matchers.<AppliedPTransform<?, ?, ?>>equalTo(flattendProducer));
}
@Test
public void testStringFormatting() throws IOException {
final Instant now = Instant.now();
final Duration oneHour = Duration.standardHours(1);
HasDisplayData component =
new HasDisplayData() {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder
.add(DisplayData.item("string", "foobar"))
.add(DisplayData.item("integer", 123))
.add(DisplayData.item("float", 2.34))
.add(DisplayData.item("boolean", true))
.add(DisplayData.item("java_class", DisplayDataTest.class))
.add(DisplayData.item("timestamp", now))
.add(DisplayData.item("duration", oneHour));
}
};
DisplayData data = DisplayData.from(component);
assertThat(data, hasDisplayItem("string", "foobar"));
assertThat(data, hasDisplayItem("integer", 123));
assertThat(data, hasDisplayItem("float", 2.34));
assertThat(data, hasDisplayItem("boolean", true));
assertThat(data, hasDisplayItem("java_class", DisplayDataTest.class));
assertThat(data, hasDisplayItem("timestamp", now));
assertThat(data, hasDisplayItem("duration", oneHour));
}
@Test
public void testFixedOffsetWindow() throws Exception {
Map<IntervalWindow, Set<String>> expected = new HashMap<>();
expected.put(new IntervalWindow(new Instant(-5), new Instant(5)), set(1, 2));
expected.put(new IntervalWindow(new Instant(5), new Instant(15)), set(5, 9, 10, 11));
expected.put(new IntervalWindow(new Instant(95), new Instant(105)), set(100));
assertEquals(
expected,
runWindowFn(
FixedWindows.of(new Duration(10)).withOffset(new Duration(5)),
Arrays.asList(1L, 2L, 5L, 9L, 10L, 11L, 100L)));
}
@Test
public void testOnTimerWithWindow() throws Exception {
final String timerId = "my-timer-id";
final IntervalWindow testWindow = new IntervalWindow(new Instant(0), new Instant(15));
when(mockArgumentProvider.window()).thenReturn(testWindow);
class SimpleTimerDoFn extends DoFn<String, String> {
public IntervalWindow window = null;
@TimerId(timerId)
private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
@ProcessElement
public void process(ProcessContext c) {}
@OnTimer(timerId)
public void onMyTimer(IntervalWindow w) {
window = w;
}
}
SimpleTimerDoFn fn = new SimpleTimerDoFn();
DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
invoker.invokeOnTimer(TimerDeclaration.PREFIX + timerId, "", mockArgumentProvider);
assertThat(fn.window, equalTo(testWindow));
}
/**
* Tests that when windows merge, if the trigger is waiting for "N millis after the first element"
* that it is relative to the earlier of the two merged windows.
*/
@Test
public void testClear() throws Exception {
SimpleTriggerStateMachineTester<IntervalWindow> tester =
TriggerStateMachineTester.forTrigger(
AfterProcessingTimeStateMachine.pastFirstElementInPane()
.plusDelayOf(Duration.millis(5)),
FixedWindows.of(Duration.millis(10)));
tester.injectElements(1, 2, 3);
IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
tester.clearState(window);
tester.assertCleared(window);
}
/**
* Returns the {@link WatermarkUpdate} based on the former and current {@link Instant
* timestamps}.
*/
public static WatermarkUpdate fromTimestamps(Instant oldTime, Instant currentTime) {
if (currentTime.isAfter(oldTime)) {
return ADVANCED;
}
return NO_CHANGE;
}
/**
* Tests that when a processing time timer comes in after a window is expired but in the same
* bundle it does not cause a spurious output.
*/
@Test
public void testCombiningAccumulatingProcessingTime() throws Exception {
WindowingStrategy<?, IntervalWindow> strategy =
WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.ZERO)
.withTrigger(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10))));
ReduceFnTester<Integer, Integer, IntervalWindow> tester =
ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
tester.advanceProcessingTime(new Instant(5000));
injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
injectElement(tester, 5);
tester.advanceInputWatermarkNoTimers(new Instant(100));
tester.advanceProcessingTimeNoTimers(new Instant(5010));
// Fires the GC/EOW timer at the same time as the processing time timer.
tester.fireTimers(
new IntervalWindow(new Instant(0), new Instant(100)),
TimestampedValue.of(TimeDomain.EVENT_TIME, new Instant(100)),
TimestampedValue.of(TimeDomain.PROCESSING_TIME, new Instant(5010)));
assertThat(
tester.extractOutput(),
contains(
isSingleWindowedValue(
equalTo(7), 2, 0, 100, PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))));
}
@Override
public <AdditionalOutputT> void outputWindowedValue(
TupleTag<AdditionalOutputT> tag,
AdditionalOutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
throw new UnsupportedOperationException(
String.format(
"%s should not use tagged outputs", DirectGroupAlsoByWindow.class.getSimpleName()));
}
@Test
@Category(NeedsRunner.class)
public void testGloballyEventTimestamp() {
PCollection<String> output =
p.apply(
Create.timestamped(
TimestampedValue.of("foo", new Instant(100)),
TimestampedValue.of("bar", new Instant(300)),
TimestampedValue.of("baz", new Instant(200))))
.apply(Latest.globally());
PAssert.that(output).containsInAnyOrder("bar");
p.run();
}
@Test
public void testConsecutive() throws Exception {
Map<IntervalWindow, Set<String>> expected = new HashMap<>();
expected.put(new IntervalWindow(new Instant(1), new Instant(19)), set(1, 2, 5, 9));
expected.put(new IntervalWindow(new Instant(100), new Instant(111)), set(100, 101));
assertEquals(
expected,
runWindowFn(
Sessions.withGapDuration(new Duration(10)), Arrays.asList(1L, 2L, 5L, 9L, 100L, 101L)));
}