下面列出了org.joda.time.Instant#isBefore ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void updateWatermark() throws InterruptedException {
final long time = System.currentTimeMillis();
if (time - lastWatermarkTime > watermarkInterval) {
for (UnboundedReader reader : readers) {
final SystemStreamPartition ssp = readerToSsp.get(reader);
final Instant currentWatermark =
currentWatermarks.containsKey(ssp)
? currentWatermarks.get(ssp)
: BoundedWindow.TIMESTAMP_MIN_VALUE;
final Instant nextWatermark = reader.getWatermark();
if (currentWatermark.isBefore(nextWatermark)) {
currentWatermarks.put(ssp, nextWatermark);
enqueueWatermark(reader);
}
}
lastWatermarkTime = time;
}
}
/**
* Copies all values in the underlying table to this table, then discards the underlying table.
*
* <p>If there is an underlying table, this replaces the existing {@link
* CopyOnBindBinderFactory} with a {@link ReadThroughBinderFactory}, then reads all of the
* values in the existing table, binding the state values to this table. The old StateTable
* should be discarded after the call to {@link #commit()}.
*
* <p>After copying all of the existing values, replace the binder factory with an instance of
* {@link InMemoryStateBinderFactory} to construct new values, since all existing values are
* bound in this {@link StateTable table} and this table represents the canonical state.
*/
private void commit() {
Instant earliestHold = getEarliestWatermarkHold();
if (underlying.isPresent()) {
ReadThroughBinderFactory readThroughBinder =
new ReadThroughBinderFactory<>(underlying.get());
binderFactory = readThroughBinder;
Instant earliestUnderlyingHold = readThroughBinder.readThroughAndGetEarliestHold(this);
if (earliestUnderlyingHold.isBefore(earliestHold)) {
earliestHold = earliestUnderlyingHold;
}
}
earliestWatermarkHold = Optional.of(earliestHold);
clearEmpty();
binderFactory = new InMemoryStateBinderFactory();
underlying = Optional.empty();
}
@Override
public Instant mergeAccumulators(Iterable<Instant> accumulators) {
Instant v = new Instant(0L);
for (Instant accumulator : accumulators) {
v = accumulator.isBefore(v) ? v : accumulator;
}
return v;
}
@Internal
@Override
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
// This trigger will fire after the latest of its sub-triggers.
Instant deadline = BoundedWindow.TIMESTAMP_MIN_VALUE;
for (Trigger subTrigger : subTriggers) {
Instant subDeadline = subTrigger.getWatermarkThatGuaranteesFiring(window);
if (deadline.isBefore(subDeadline)) {
deadline = subDeadline;
}
}
return deadline;
}
public void setInputWatermark(Instant watermark) {
if (watermark.isBefore(inputWatermark)) {
throw new IllegalArgumentException("New input watermark is before current watermark");
}
LOG.debug("Advancing input watermark from {} to {}.", inputWatermark, watermark);
inputWatermark = watermark;
}
/**
* Ensures that later sliding windows have an output time that is past the end of earlier windows.
*
* <p>If this is the earliest sliding window containing {@code inputTimestamp}, that's fine.
* Otherwise, we pick the earliest time that doesn't overlap with earlier windows.
*/
@Experimental(Kind.OUTPUT_TIME)
@Override
public Instant getOutputTime(Instant inputTimestamp, IntervalWindow window) {
Instant startOfLastSegment = window.maxTimestamp().minus(period);
return startOfLastSegment.isBefore(inputTimestamp)
? inputTimestamp
: startOfLastSegment.plus(1);
}
/** Convert a harness timestamp to a Windmill timestamp. */
public static long harnessToWindmillTimestamp(Instant timestamp) {
if (!timestamp.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
// End of time.
return Long.MAX_VALUE;
} else if (timestamp.getMillis() < Long.MIN_VALUE / 1000) {
return Long.MIN_VALUE + 1;
} else {
return timestamp.getMillis() * 1000;
}
}
void add(Message message) throws Exception {
lock.writeLock().lock();
try {
Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
if (currentMessageTimestamp.isBefore(oldestMessageTimestamp)) {
oldestMessageTimestamp = currentMessageTimestamp;
}
messages.add(message);
} finally {
lock.writeLock().unlock();
}
}
public void setOutputWatermark(Instant watermark) {
if (watermark.isAfter(inputWatermark)) {
LOG.debug("Clipping new output watermark from {} to {}.", watermark, inputWatermark);
watermark = inputWatermark;
}
if (watermark.isBefore(outputWatermark)) {
throw new IllegalArgumentException("New output watermark is before current watermark");
}
LOG.debug("Advancing output watermark from {} to {}.", outputWatermark, watermark);
outputWatermark = watermark;
}
private void onSortFlushTimer(BoundedWindow window, Instant timestamp) {
StateInternals stateInternals = stepContext.stateInternals();
StateNamespace namespace = StateNamespaces.window(windowCoder, window);
BagState<WindowedValue<InputT>> sortBuffer = stateInternals.state(namespace, sortBufferTag);
ValueState<Instant> minStampState = stateInternals.state(namespace, sortBufferMinStampTag);
List<WindowedValue<InputT>> keep = new ArrayList<>();
List<WindowedValue<InputT>> flush = new ArrayList<>();
Instant newMinStamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
for (WindowedValue<InputT> e : sortBuffer.read()) {
if (!e.getTimestamp().isAfter(timestamp)) {
flush.add(e);
} else {
keep.add(e);
if (e.getTimestamp().isBefore(newMinStamp)) {
newMinStamp = e.getTimestamp();
}
}
}
flush.stream()
.sorted(Comparator.comparing(WindowedValue::getTimestamp))
.forEachOrdered(e -> processElementUnordered(window, e));
sortBuffer.clear();
keep.forEach(sortBuffer::add);
minStampState.write(newMinStamp);
if (newMinStamp.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
setupFlushTimerAndWatermarkHold(namespace, window, newMinStamp);
} else {
clearWatermarkHold(namespace);
}
}
/** Return the earliest output watermark hold in state, or null if none. */
public Instant earliestWatermarkHold() {
Instant minimum = null;
for (State storage : inMemoryState.values()) {
if (storage instanceof WatermarkHoldState) {
Instant hold = ((WatermarkHoldState) storage).read();
if (minimum == null || (hold != null && hold.isBefore(minimum))) {
minimum = hold;
}
}
}
return minimum;
}
/**
* Checkpoint the current reader, finalize the previous checkpoint, and return the current
* checkpoint.
*/
private CheckpointMarkT finishRead(
UnboundedReader<OutputT> reader,
Instant watermark,
UnboundedSourceShard<OutputT, CheckpointMarkT> shard)
throws IOException {
final CheckpointMark oldMark = shard.getCheckpoint();
@SuppressWarnings("unchecked")
final CheckpointMarkT mark = (CheckpointMarkT) reader.getCheckpointMark();
if (oldMark != null) {
oldMark.finalizeCheckpoint();
}
// If the watermark is the max value, this source may not be invoked again. Finalize after
// committing the output.
if (!watermark.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
PCollection<OutputT> outputPc =
(PCollection<OutputT>) getOnlyElement(transform.getOutputs().values());
evaluationContext.scheduleAfterOutputWouldBeProduced(
outputPc,
GlobalWindow.INSTANCE,
outputPc.getWindowingStrategy(),
() -> {
try {
mark.finalizeCheckpoint();
} catch (IOException e) {
throw new RuntimeException(
"Couldn't finalize checkpoint after the end of the Global Window", e);
}
});
}
return mark;
}
/**
* Get the earliest watermark hold in this table. Ignores the contents of any underlying table.
*/
private Instant getEarliestWatermarkHold() {
Instant earliest = BoundedWindow.TIMESTAMP_MAX_VALUE;
for (State existingState : this.values()) {
if (existingState instanceof WatermarkHoldState) {
Instant hold = ((WatermarkHoldState) existingState).read();
if (hold != null && hold.isBefore(earliest)) {
earliest = hold;
}
}
}
return earliest;
}
Instant earliestWatermarkHold() {
Instant minimum = null;
for (State storage : inMemoryState.values()) {
if (storage instanceof WatermarkHoldState) {
Instant hold = ((WatermarkHoldState) storage).read();
if (minimum == null || (hold != null && hold.isBefore(minimum))) {
minimum = hold;
}
}
}
return minimum;
}
@Override
public Instant addInput(Instant accumulator, Instant input) {
return accumulator.isBefore(input) ? input : accumulator;
}
@Override
public scala.Option<RDD<BoxedUnit>> compute(Time validTime) {
// compute parent.
scala.Option<RDD<Metadata>> parentRDDOpt = parent.getOrCompute(validTime);
final MetricsContainerStepMapAccumulator metricsAccum = MetricsAccumulator.getInstance();
long count = 0;
SparkWatermarks sparkWatermark = null;
Instant globalLowWatermarkForBatch = BoundedWindow.TIMESTAMP_MIN_VALUE;
Instant globalHighWatermarkForBatch = BoundedWindow.TIMESTAMP_MIN_VALUE;
long maxReadDuration = 0;
if (parentRDDOpt.isDefined()) {
JavaRDD<Metadata> parentRDD = parentRDDOpt.get().toJavaRDD();
for (Metadata metadata : parentRDD.collect()) {
count += metadata.getNumRecords();
// compute the global input watermark - advance to latest of all partitions.
Instant partitionLowWatermark = metadata.getLowWatermark();
globalLowWatermarkForBatch =
globalLowWatermarkForBatch.isBefore(partitionLowWatermark)
? partitionLowWatermark
: globalLowWatermarkForBatch;
Instant partitionHighWatermark = metadata.getHighWatermark();
globalHighWatermarkForBatch =
globalHighWatermarkForBatch.isBefore(partitionHighWatermark)
? partitionHighWatermark
: globalHighWatermarkForBatch;
// Update metrics reported in the read
final Gauge gauge = Metrics.gauge(NAMESPACE, READ_DURATION_MILLIS);
final MetricsContainer container = metadata.getMetricsContainers().getContainer(stepName);
try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(container)) {
final long readDurationMillis = metadata.getReadDurationMillis();
if (readDurationMillis > maxReadDuration) {
gauge.set(readDurationMillis);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
metricsAccum.value().updateAll(metadata.getMetricsContainers());
}
sparkWatermark =
new SparkWatermarks(
globalLowWatermarkForBatch,
globalHighWatermarkForBatch,
new Instant(validTime.milliseconds()));
// add to watermark queue.
GlobalWatermarkHolder.add(inputDStreamId, sparkWatermark);
}
// report - for RateEstimator and visibility.
report(validTime, count, sparkWatermark);
return scala.Option.empty();
}
public void add(Message message, Instant timestamp) {
if (timestamp.isBefore(oldestMessageTimestamp)) {
oldestMessageTimestamp = timestamp;
}
messages.add(message);
}
private <W> PaneInfo describePane(
Object key, Instant windowMaxTimestamp, PaneInfo previousPane, boolean isFinal) {
boolean isFirst = previousPane == null;
Timing previousTiming = isFirst ? null : previousPane.getTiming();
long index = isFirst ? 0 : previousPane.getIndex() + 1;
long nonSpeculativeIndex = isFirst ? 0 : previousPane.getNonSpeculativeIndex() + 1;
Instant outputWM = timerInternals.currentOutputWatermarkTime();
Instant inputWM = timerInternals.currentInputWatermarkTime();
// True if it is not possible to assign the element representing this pane a timestamp
// which will make an ON_TIME pane for any following computation.
// Ie true if the element's latest possible timestamp is before the current output watermark.
boolean isLateForOutput = outputWM != null && windowMaxTimestamp.isBefore(outputWM);
// True if all emitted panes (if any) were EARLY panes.
// Once the ON_TIME pane has fired, all following panes must be considered LATE even
// if the output watermark is behind the end of the window.
boolean onlyEarlyPanesSoFar = previousTiming == null || previousTiming == Timing.EARLY;
// True is the input watermark hasn't passed the window's max timestamp.
boolean isEarlyForInput = !inputWM.isAfter(windowMaxTimestamp);
Timing timing;
if (isLateForOutput || !onlyEarlyPanesSoFar) {
// The output watermark has already passed the end of this window, or we have already
// emitted a non-EARLY pane. Irrespective of how this pane was triggered we must
// consider this pane LATE.
timing = Timing.LATE;
} else if (isEarlyForInput) {
// This is an EARLY firing.
timing = Timing.EARLY;
nonSpeculativeIndex = -1;
} else {
// This is the unique ON_TIME firing for the window.
timing = Timing.ON_TIME;
}
WindowTracing.debug(
"describePane: {} pane (prev was {}) for key:{}; windowMaxTimestamp:{}; "
+ "inputWatermark:{}; outputWatermark:{}; isLateForOutput:{}",
timing,
previousTiming,
key,
windowMaxTimestamp,
inputWM,
outputWM,
isLateForOutput);
if (previousPane != null) {
// Timing transitions should follow EARLY* ON_TIME? LATE*
switch (previousTiming) {
case EARLY:
checkState(
timing == Timing.EARLY || timing == Timing.ON_TIME || timing == Timing.LATE,
"EARLY cannot transition to %s",
timing);
break;
case ON_TIME:
checkState(timing == Timing.LATE, "ON_TIME cannot transition to %s", timing);
break;
case LATE:
checkState(timing == Timing.LATE, "LATE cannot transtion to %s", timing);
break;
case UNKNOWN:
break;
}
checkState(!previousPane.isLast(), "Last pane was not last after all.");
}
return PaneInfo.createPane(isFirst, isFinal, timing, index, nonSpeculativeIndex);
}
/** Returns true if the step will not produce additional output. */
public boolean isDone(AppliedPTransform<?, ?, ?> transform) {
// the PTransform is done only if watermark is at the max value
Instant stepWatermark = watermarkManager.getWatermarks(transform).getOutputWatermark();
return !stepWatermark.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE);
}
/**
* Attempt to add an 'element hold'. Return the {@link Instant} at which the hold was added (ie
* the element timestamp plus any forward shift requested by the {@link
* WindowingStrategy#getTimestampCombiner}), or {@literal null} if no hold was added. The hold is
* only added if both:
*
* <ol>
* <li>The backend will be able to respect it. In other words the output watermark cannot be
* ahead of the proposed hold time.
* <li>A timer will be set (by {@link ReduceFnRunner}) to clear the hold by the end of the
* window. In other words the input watermark cannot be ahead of the end of the window.
* </ol>
*
* The hold ensures the pane which incorporates the element is will not be considered late by any
* downstream computation when it is eventually emitted.
*/
@Nullable
private Instant addElementHold(Instant timestamp, ReduceFn<?, ?, ?, W>.Context context) {
// Give the window function a chance to move the hold timestamp forward to encourage progress.
// (A later hold implies less impediment to the output watermark making progress, which in
// turn encourages end-of-window triggers to fire earlier in following computations.)
Instant elementHold = shift(timestamp, context.window());
Instant outputWM = timerInternals.currentOutputWatermarkTime();
Instant inputWM = timerInternals.currentInputWatermarkTime();
String which;
boolean tooLate;
// TODO: These case labels could be tightened.
// See the case analysis in addHolds above for the motivation.
if (outputWM != null && elementHold.isBefore(outputWM)) {
which = "too late to effect output watermark";
tooLate = true;
} else if (context.window().maxTimestamp().isBefore(inputWM)) {
which = "too late for end-of-window timer";
tooLate = true;
} else {
which = "on time";
tooLate = false;
checkState(
!elementHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
"Element hold %s is beyond end-of-time",
elementHold);
context.state().access(elementHoldTag).add(elementHold);
}
WindowTracing.trace(
"WatermarkHold.addHolds: element hold at {} is {} for "
+ "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
elementHold,
which,
context.key(),
context.window(),
inputWM,
outputWM);
return tooLate ? null : elementHold;
}