下面列出了org.joda.time.Instant#isAfter ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public Instant getWatermark() {
Instant now = Instant.now();
Instant watermarkIdleThreshold =
now.minus(watermarkParameters.getWatermarkIdleDurationThreshold());
Instant newWatermark =
watermarkParameters.getLastUpdateTime().isBefore(watermarkIdleThreshold)
? watermarkIdleThreshold
: watermarkParameters.getEventTime();
if (newWatermark.isAfter(watermarkParameters.getCurrentWatermark())) {
watermarkParameters =
watermarkParameters.toBuilder().setCurrentWatermark(newWatermark).build();
}
return watermarkParameters.getCurrentWatermark();
}
private void cancelEndOfWindowAndGarbageCollectionTimers(
ReduceFn<?, ?, ?, W>.Context directContext) {
WindowTracing.debug(
"ReduceFnRunner.cancelEndOfWindowAndGarbageCollectionTimers: Deleting timers for "
+ "key:{}; window:{} where inputWatermark:{}; outputWatermark:{}",
key,
directContext.window(),
timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
Instant eow = directContext.window().maxTimestamp();
directContext.timers().deleteTimer(eow, TimeDomain.EVENT_TIME);
Instant gc = LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy);
if (gc.isAfter(eow)) {
directContext.timers().deleteTimer(gc, TimeDomain.EVENT_TIME);
}
}
/**
* Setup timer for flush time @{code flush}. The time is adjusted to respect allowed lateness and
* window garbage collection time. Setup watermark hold for the flush time.
*
* <p>Note that this is equivalent to {@link org.apache.beam.sdk.state.Timer#withOutputTimestamp}
* and should be reworked to use that feature once that is stable.
*/
private void setupFlushTimerAndWatermarkHold(
StateNamespace namespace, BoundedWindow window, Instant flush) {
Instant flushWithLateness = flush.plus(windowingStrategy.getAllowedLateness());
Instant windowGcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
if (flushWithLateness.isAfter(windowGcTime)) {
flushWithLateness = windowGcTime;
}
WatermarkHoldState watermark = stepContext.stateInternals().state(namespace, watermarkHold);
stepContext
.timerInternals()
.setTimer(
namespace,
SORT_FLUSH_TIMER,
SORT_FLUSH_TIMER,
flushWithLateness,
flush,
TimeDomain.EVENT_TIME);
watermark.clear();
watermark.add(flush);
}
@Override
public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<K, V> record) {
Instant ts = timestampFunction.apply(record);
if (ts.isAfter(maxEventTimestamp)) {
maxEventTimestamp = ts;
}
return ts;
}
/** Advances output watermark to the given value. */
public void advanceOutputWatermark(Instant newOutputWatermark) {
checkNotNull(newOutputWatermark);
final Instant adjustedOutputWatermark;
if (newOutputWatermark.isAfter(inputWatermarkTime)) {
WindowTracing.trace(
"{}.advanceOutputWatermark: clipping output watermark from {} to {}",
getClass().getSimpleName(),
newOutputWatermark,
inputWatermarkTime);
adjustedOutputWatermark = inputWatermarkTime;
} else {
adjustedOutputWatermark = newOutputWatermark;
}
checkState(
outputWatermarkTime == null || !adjustedOutputWatermark.isBefore(outputWatermarkTime),
"Cannot move output watermark time backwards from %s to %s",
outputWatermarkTime,
adjustedOutputWatermark);
WindowTracing.trace(
"{}.advanceOutputWatermark: from {} to {}",
getClass().getSimpleName(),
outputWatermarkTime,
adjustedOutputWatermark);
outputWatermarkTime = adjustedOutputWatermark;
}
/**
* For event time timers the target time should be prior to window GC time. So it returns
* min(time to set, GC Time of window).
*/
private Instant minTargetAndGcTime(Instant target) {
if (TimeDomain.EVENT_TIME.equals(timeDomain)) {
Instant windowExpiry = LateDataUtils.garbageCollectionTime(currentWindow, allowedLateness);
if (target.isAfter(windowExpiry)) {
return windowExpiry;
}
}
return target;
}
/**
* Return job messages sorted in ascending order by timestamp.
*
* @param jobId The id of the job to get the messages for.
* @param startTimestampMs Return only those messages with a timestamp greater than this value.
* @return collection of messages
*/
public List<JobMessage> getJobMessages(String jobId, long startTimestampMs) throws IOException {
// TODO: Allow filtering messages by importance
Instant startTimestamp = new Instant(startTimestampMs);
ArrayList<JobMessage> allMessages = new ArrayList<>();
String pageToken = null;
while (true) {
ListJobMessagesResponse response = dataflowClient.listJobMessages(jobId, pageToken);
if (response == null || response.getJobMessages() == null) {
return allMessages;
}
for (JobMessage m : response.getJobMessages()) {
@Nullable Instant timestamp = fromCloudTime(m.getTime());
if (timestamp == null) {
continue;
}
if (timestamp.isAfter(startTimestamp)) {
allMessages.add(m);
}
}
if (response.getNextPageToken() == null) {
break;
} else {
pageToken = response.getNextPageToken();
}
}
allMessages.sort(new TimeStampComparator());
return allMessages;
}
/**
* Validates that a given timestamp is within min and max bounds.
*
* @param timestamp timestamp to validate
*/
public static void validateTimestampBounds(Instant timestamp) {
if (timestamp.isBefore(TIMESTAMP_MIN_VALUE) || timestamp.isAfter(TIMESTAMP_MAX_VALUE)) {
throw new IllegalArgumentException(
String.format(
"Provided timestamp %s must be within bounds [%s, %s].",
timestamp, TIMESTAMP_MIN_VALUE, TIMESTAMP_MAX_VALUE));
}
}
/**
* For event time timers the target time should be prior to window GC time. So it return
* min(time to set, GC Time of window).
*/
private Instant minTargetAndGcTime(Instant target) {
if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
Instant windowExpiry = LateDataUtils.garbageCollectionTime(window, allowedLateness);
if (target.isAfter(windowExpiry)) {
return windowExpiry;
}
}
return target;
}
@Override
public Instant currentWatermark() {
// Beyond bounds error checking isn't important since the runner is expected to perform
// watermark bounds checking.
Instant now = Instant.now();
this.watermark = now.isAfter(watermark) ? now : watermark;
return watermark;
}
private Instant ensureTimestampWithinBounds(Instant timestamp) {
if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
} else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
}
return timestamp;
}
void delete(final Collection<Message> messages) {
for (Message message : messages) {
if (messagesToDelete.contains(message)) {
source.getSqs().deleteMessage(source.getRead().queueUrl(), message.getReceiptHandle());
Instant currentMessageTimestamp = getTimestamp(message);
if (currentMessageTimestamp.isAfter(oldestPendingTimestamp)) {
oldestPendingTimestamp = currentMessageTimestamp;
}
}
}
}
@Override
public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<K, V> record) {
Instant ts = timestampFunction.apply(record);
if (ts.isAfter(maxEventTimestamp)) {
maxEventTimestamp = ts;
}
return ts;
}
public void setOutputWatermark(Instant watermark) {
if (watermark.isAfter(inputWatermark)) {
LOG.debug("Clipping new output watermark from {} to {}.", watermark, inputWatermark);
watermark = inputWatermark;
}
if (watermark.isBefore(outputWatermark)) {
throw new IllegalArgumentException("New output watermark is before current watermark");
}
LOG.debug("Advancing output watermark from {} to {}.", outputWatermark, watermark);
outputWatermark = watermark;
}
public void start(
@Nullable Object key,
Windmill.WorkItem work,
Instant inputDataWatermark,
@Nullable Instant outputDataWatermark,
@Nullable Instant synchronizedProcessingTime,
WindmillStateReader stateReader,
StateFetcher stateFetcher,
Windmill.WorkItemCommitRequest.Builder outputBuilder) {
this.key = key;
this.work = work;
this.stateFetcher = stateFetcher;
this.outputBuilder = outputBuilder;
this.sideInputCache.clear();
clearSinkFullHint();
Instant processingTime = Instant.now();
// Ensure that the processing time is greater than any fired processing time
// timers. Otherwise a trigger could ignore the timer and orphan the window.
for (Windmill.Timer timer : work.getTimers().getTimersList()) {
if (timer.getType() == Windmill.Timer.Type.REALTIME) {
Instant inferredFiringTime =
WindmillTimeUtils.windmillToHarnessTimestamp(timer.getTimestamp())
.plus(Duration.millis(1));
if (inferredFiringTime.isAfter(processingTime)) {
processingTime = inferredFiringTime;
}
}
}
for (StepContext stepContext : getAllStepContexts()) {
stepContext.start(
stateReader,
inputDataWatermark,
processingTime,
outputDataWatermark,
synchronizedProcessingTime);
}
}
/**
* Return (a future for) the earliest hold for {@code context}. Clear all the holds after reading,
* but add/restore an end-of-window or garbage collection hold if required.
*
* <p>The returned timestamp is the output timestamp according to the {@link TimestampCombiner}
* from the windowing strategy of this {@link WatermarkHold}, combined across all the non-late
* elements in the current pane. If there is no such value the timestamp is the end of the window.
*/
public ReadableState<OldAndNewHolds> extractAndRelease(
final ReduceFn<?, ?, ?, W>.Context context, final boolean isFinished) {
WindowTracing.debug(
"WatermarkHold.extractAndRelease: for key:{}; window:{}; inputWatermark:{}; "
+ "outputWatermark:{}",
context.key(),
context.window(),
timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
final WatermarkHoldState elementHoldState = context.state().access(elementHoldTag);
final WatermarkHoldState extraHoldState = context.state().access(EXTRA_HOLD_TAG);
return new ReadableState<OldAndNewHolds>() {
@Override
public ReadableState<OldAndNewHolds> readLater() {
elementHoldState.readLater();
extraHoldState.readLater();
return this;
}
@Override
public OldAndNewHolds read() {
// Read both the element and extra holds.
@Nullable Instant elementHold = elementHoldState.read();
@Nullable Instant extraHold = extraHoldState.read();
@Nullable Instant oldHold;
// Find the minimum, accounting for null.
if (elementHold == null) {
oldHold = extraHold;
} else if (extraHold == null) {
oldHold = elementHold;
} else if (elementHold.isBefore(extraHold)) {
oldHold = elementHold;
} else {
oldHold = extraHold;
}
if (oldHold == null || oldHold.isAfter(context.window().maxTimestamp())) {
// If no hold (eg because all elements came in before the output watermark), or
// the hold was for garbage collection, take the end of window as the result.
WindowTracing.debug(
"WatermarkHold.extractAndRelease.read: clipping from {} to end of window "
+ "for key:{}; window:{}",
oldHold,
context.key(),
context.window());
oldHold = context.window().maxTimestamp();
}
WindowTracing.debug(
"WatermarkHold.extractAndRelease.read: clearing for key:{}; window:{}",
context.key(),
context.window());
// Clear the underlying state to allow the output watermark to progress.
elementHoldState.clear();
extraHoldState.clear();
@Nullable Instant newHold = null;
if (!isFinished) {
newHold = addGarbageCollectionHold(context, true /*paneIsEmpty*/);
}
return new OldAndNewHolds(oldHold, newHold);
}
};
}
/**
* Run the {@link ReduceFn#onTrigger} method and produce any necessary output.
*
* @return output watermark hold added, or {@literal null} if none.
*/
@Nullable
private Instant onTrigger(
final ReduceFn<K, InputT, OutputT, W>.Context directContext,
ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
final boolean isFinished,
boolean isEndOfWindow)
throws Exception {
// Extract the window hold, and as a side effect clear it.
final WatermarkHold.OldAndNewHolds pair =
watermarkHold.extractAndRelease(renamedContext, isFinished).read();
// TODO: This isn't accurate if the elements are late. See BEAM-2262
final Instant outputTimestamp = pair.oldHold;
@Nullable Instant newHold = pair.newHold;
final boolean isEmpty = nonEmptyPanes.isEmpty(renamedContext.state()).read();
if (isEmpty
&& windowingStrategy.getClosingBehavior() == ClosingBehavior.FIRE_IF_NON_EMPTY
&& windowingStrategy.getOnTimeBehavior() == Window.OnTimeBehavior.FIRE_IF_NON_EMPTY) {
return newHold;
}
Instant inputWM = timerInternals.currentInputWatermarkTime();
if (newHold != null) {
// We can't be finished yet.
checkState(!isFinished, "new hold at %s but finished %s", newHold, directContext.window());
// The hold cannot be behind the input watermark.
checkState(
!newHold.isBefore(inputWM), "new hold %s is before input watermark %s", newHold, inputWM);
if (newHold.isAfter(directContext.window().maxTimestamp())) {
// The hold must be for garbage collection, which can't have happened yet.
checkState(
newHold.isEqual(
LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy)),
"new hold %s should be at garbage collection for window %s plus %s",
newHold,
directContext.window(),
windowingStrategy.getAllowedLateness());
} else {
// The hold must be for the end-of-window, which can't have happened yet.
checkState(
newHold.isEqual(directContext.window().maxTimestamp()),
"new hold %s should be at end of window %s",
newHold,
directContext.window());
checkState(
!isEndOfWindow,
"new hold at %s for %s but this is the watermark trigger",
newHold,
directContext.window());
}
}
// Calculate the pane info.
final PaneInfo pane = paneInfoTracker.getNextPaneInfo(directContext, isFinished).read();
// Only emit a pane if it has data or empty panes are observable.
if (needToEmit(isEmpty, isFinished, pane.getTiming())) {
// Run reduceFn.onTrigger method.
final List<W> windows = Collections.singletonList(directContext.window());
ReduceFn<K, InputT, OutputT, W>.OnTriggerContext renamedTriggerContext =
contextFactory.forTrigger(
directContext.window(),
pane,
StateStyle.RENAMED,
toOutput -> {
// We're going to output panes, so commit the (now used) PaneInfo.
// This is unnecessary if the trigger isFinished since the saved
// state will be immediately deleted.
if (!isFinished) {
paneInfoTracker.storeCurrentPaneInfo(directContext, pane);
}
// Output the actual value.
outputter.outputWindowedValue(KV.of(key, toOutput), outputTimestamp, windows, pane);
});
reduceFn.onTrigger(renamedTriggerContext);
}
return newHold;
}
public boolean shouldFire(Instant currentWatermark) {
return currentWatermark.isAfter(fireAfter)
|| currentWatermark.equals(BoundedWindow.TIMESTAMP_MAX_VALUE);
}
/**
* Retrieves the timers that are eligible for processing by {@link
* org.apache.beam.runners.core.ReduceFnRunner}.
*
* @return A collection of timers that are eligible for processing. For a {@link
* TimeDomain#EVENT_TIME} timer, this implies that the watermark has passed the timer's
* timestamp. For other <code>TimeDomain</code>s (e.g., {@link
* TimeDomain#PROCESSING_TIME}), a timer is always considered eligible for processing (no
* restrictions).
*/
private Collection<TimerInternals.TimerData> filterTimersEligibleForProcessing(
final Collection<TimerInternals.TimerData> timers, final Instant inputWatermark) {
final Predicate<TimerInternals.TimerData> eligibleForProcessing =
timer ->
!timer.getDomain().equals(TimeDomain.EVENT_TIME)
|| inputWatermark.isAfter(timer.getTimestamp());
return FluentIterable.from(timers).filter(eligibleForProcessing).toSet();
}
/**
* Advances the watermark to the provided time, provided said time is after the current
* watermark. If the provided time is before the latest, this function no-ops.
*
* @param time The time to advance the watermark to
*/
public void advanceWatermark(Instant time) {
if (time.isAfter(latestTimestamp)) {
latestTimestamp = time;
}
}