下面列出了org.joda.time.Duration#ZERO 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void previousPair_using_exactInterval() {
final Duration delta0 = Duration.ZERO;
final Duration delta2 = Duration.standardMinutes(2);
setup(ExpressionLookBack.fromScrapeCount(5));
// Test 0 lookback, as it is a special case
assertEquals(input.get(0).getTimestamp(), impl.getPreviousCollectionPair(delta0).getCurrentCollection().getTimestamp());
assertEquals(input.get(1).getTimestamp(), impl.getPreviousCollectionPair(delta0).getPreviousCollection().getTimestamp());
assertEquals(input.get(1).getTimestamp(), impl.getPreviousCollectionPair(delta0).getPreviousCollection(1).get().getTimestamp());
assertEquals(input.get(2).getTimestamp(), impl.getPreviousCollectionPair(delta0).getPreviousCollection(2).get().getTimestamp());
assertEquals(input.get(5).getTimestamp(), impl.getPreviousCollectionPair(delta0).getPreviousCollection(5).get().getTimestamp());
assertEquals(Optional.empty(), impl.getPreviousCollectionPair(delta0).getPreviousCollection(6));
// Test 2 lookback
assertEquals(input.get(2).getTimestamp(), impl.getPreviousCollectionPair(delta2).getCurrentCollection().getTimestamp());
assertEquals(input.get(3).getTimestamp(), impl.getPreviousCollectionPair(delta2).getPreviousCollection().getTimestamp());
assertEquals(input.get(3).getTimestamp(), impl.getPreviousCollectionPair(delta2).getPreviousCollection(1).get().getTimestamp());
assertEquals(input.get(4).getTimestamp(), impl.getPreviousCollectionPair(delta2).getPreviousCollection(2).get().getTimestamp());
assertEquals(input.get(5).getTimestamp(), impl.getPreviousCollectionPair(delta2).getPreviousCollection(3).get().getTimestamp());
assertEquals(Optional.empty(), impl.getPreviousCollectionPair(delta2).getPreviousCollection(4));
}
public static <InputT, W extends BoundedWindow>
TriggerStateMachineTester<InputT, W> forAdvancedTrigger(
TriggerStateMachine stateMachine, WindowFn<Object, W> windowFn) throws Exception {
ExecutableTriggerStateMachine executableTriggerStateMachine =
ExecutableTriggerStateMachine.create(stateMachine);
// Merging requires accumulation mode or early firings can break up a session.
// Not currently an issue with the tester (because we never GC) but we don't want
// mystery failures due to violating this need.
AccumulationMode mode =
windowFn.isNonMerging()
? AccumulationMode.DISCARDING_FIRED_PANES
: AccumulationMode.ACCUMULATING_FIRED_PANES;
return new TriggerStateMachineTester<>(executableTriggerStateMachine, windowFn, Duration.ZERO);
}
/**
* Gets the duration of this time interval.
* <p>
* The duration is equal to the end millis minus the start millis.
*
* @return the duration of the time interval
* @throws ArithmeticException if the duration exceeds the capacity of a long
*/
public Duration toDuration() {
long durMillis = toDurationMillis();
if (durMillis == 0) {
return Duration.ZERO;
} else {
return new Duration(durMillis);
}
}
public Duration getTotalDuration() {
Duration duration = Duration.ZERO;
Collection<Lesson> lessons = getAssociatedLessonsSet();
for (Lesson lesson : lessons) {
duration = duration.plus(lesson.getTotalDuration());
}
return duration;
}
@Test
public void constructor_hairtrigger() {
Alert alert = new Alert(t0, alert_name, () -> "", Optional.of(true), Duration.ZERO, "test", EMPTY_MAP);
assertEquals(t0, alert.getCur());
assertEquals(t0, alert.getStart());
assertEquals(alert_name, alert.getName());
assertEquals(AlertState.FIRING, alert.getAlertState());
assertEquals(true, alert.isFiring());
assertEquals(Optional.of(true), alert.isTriggered());
}
/**
* Is the length of this duration longer than the duration passed in.
*
* @param duration another duration to compare to, null means zero milliseconds
* @return true if this duration is equal to than the duration passed in
*/
public boolean isLongerThan(ReadableDuration duration) {
if (duration == null) {
duration = Duration.ZERO;
}
return compareTo(duration) > 0;
}
@Before
public void before() throws Exception {
simpleQuery =
Message.newQuery(Record.newRecord(Name.fromString("example.com."), Type.A, DClass.IN));
expectedResponse = responseMessageWithCode(simpleQuery, Rcode.NOERROR);
when(mockFactory.createSocket(InetAddress.getByName(UPDATE_HOST), DnsMessageTransport.DNS_PORT))
.thenReturn(mockSocket);
resolver = new DnsMessageTransport(mockFactory, UPDATE_HOST, Duration.ZERO);
}
public OutOfBandManagementDriverCommand(final ImmutableMap<OutOfBandManagement.Option, String> options, final Long timeoutSeconds) {
this.options = options;
if (timeoutSeconds != null && timeoutSeconds > 0) {
this.timeout = new Duration(timeoutSeconds * 1000);
} else {
this.timeout = Duration.ZERO;
}
}
/**
* Gets the duration of this time interval.
* <p>
* The duration is equal to the end millis minus the start millis.
*
* @return the duration of the time interval
* @throws ArithmeticException if the duration exceeds the capacity of a long
*/
public Duration toDuration() {
long durMillis = toDurationMillis();
if (durMillis == 0) {
return Duration.ZERO;
} else {
return new Duration(durMillis);
}
}
/**
* Is the length of this duration shorter than the duration passed in.
*
* @param duration another duration to compare to, null means zero milliseconds
* @return true if this duration is equal to than the duration passed in
*/
public boolean isShorterThan(ReadableDuration duration) {
if (duration == null) {
duration = Duration.ZERO;
}
return compareTo(duration) < 0;
}
/**
* Demonstrates that attempting to output an element before the timestamp of the current element
* with zero {@link DoFn#getAllowedTimestampSkew() allowed timestamp skew} throws.
*/
@Test
public void testBackwardsInTimeNoSkew() {
SkewingDoFn fn = new SkewingDoFn(Duration.ZERO);
DoFnRunner<Duration, Duration> runner =
new SimpleDoFnRunner<>(
null,
fn,
NullSideInputReader.empty(),
new ListOutputManager(),
new TupleTag<>(),
Collections.emptyList(),
mockStepContext,
null,
Collections.emptyMap(),
WindowingStrategy.of(new GlobalWindows()),
DoFnSchemaInformation.create(),
Collections.emptyMap());
runner.startBundle();
// An element output at the current timestamp is fine.
runner.processElement(
WindowedValue.timestampedValueInGlobalWindow(Duration.ZERO, new Instant(0)));
thrown.expect(UserCodeException.class);
thrown.expectCause(isA(IllegalArgumentException.class));
thrown.expectMessage("must be no earlier");
thrown.expectMessage(
String.format("timestamp of the current input (%s)", new Instant(0).toString()));
thrown.expectMessage(
String.format(
"the allowed skew (%s)", PeriodFormat.getDefault().print(Duration.ZERO.toPeriod())));
// An element output before (current time - skew) is forbidden
runner.processElement(
WindowedValue.timestampedValueInGlobalWindow(Duration.millis(1L), new Instant(0)));
}
@Before
public void setup() {
inject.setStaticField(Ofy.class, "clock", clock);
action = new RdeStagingAction();
action.clock = clock;
action.mrRunner = makeDefaultRunner();
action.lenient = false;
action.reducerFactory = new RdeStagingReducer.Factory();
action.reducerFactory.taskQueueUtils = new TaskQueueUtils(new Retrier(new SystemSleeper(), 1));
action.reducerFactory.lockHandler = new FakeLockHandler(true);
action.reducerFactory.gcsBufferSize = 0;
action.reducerFactory.bucket = "rde-bucket";
action.reducerFactory.lockTimeout = Duration.standardHours(1);
action.reducerFactory.stagingKeyBytes = PgpHelper.convertPublicKeyToBytes(encryptKey);
action.pendingDepositChecker = new PendingDepositChecker();
action.pendingDepositChecker.brdaDayOfWeek = DateTimeConstants.TUESDAY;
action.pendingDepositChecker.brdaInterval = Duration.standardDays(7);
action.pendingDepositChecker.clock = clock;
action.pendingDepositChecker.rdeInterval = Duration.standardDays(1);
action.response = response;
action.transactionCooldown = Duration.ZERO;
action.directory = Optional.empty();
action.modeStrings = ImmutableSet.of();
action.tlds = ImmutableSet.of();
action.watermarks = ImmutableSet.of();
action.revision = Optional.empty();
}
@Override
public Duration hintDuration() { return Duration.ZERO; }
/** Create a new {@link WindowMappingFn} with {@link Duration#ZERO zero} maximum lookback. */
protected WindowMappingFn() {
this(Duration.ZERO);
}
/** Indicates that there is more work to be done for the current element. */
public static ProcessContinuation resume() {
return new AutoValue_DoFn_ProcessContinuation(true, Duration.ZERO);
}
/** Create a new {@link UnboundedCountingSource}. */
// package-private to return a typed UnboundedCountingSource rather than the UnboundedSource type.
static UnboundedCountingSource createUnboundedFrom(long start) {
return new UnboundedCountingSource(start, 1, 1L, Duration.ZERO, new NowTimestampFn());
}
/**
* Assigns timestamps into half-open intervals of the form [N * period, N * period + size), where
* 0 is the epoch.
*
* <p>If {@link SlidingWindows#every} is not called, the period defaults to the largest time unit
* smaller than the given duration. For example, specifying a size of 5 seconds will result in a
* default period of 1 second.
*/
public static SlidingWindows of(Duration size) {
return new SlidingWindows(getDefaultPeriod(size), size, Duration.ZERO);
}
/**
* For a {@link SerializableFunction} {@code fn} from {@code T} to {@link Instant}, outputs a
* {@link PTransform} that takes an input {@link PCollection PCollection<T>} and outputs a
* {@link PCollection PCollection<T>} containing every element {@code v} in the input where
* each element is output with a timestamp obtained as the result of {@code fn.apply(v)}.
*
* <p>If the input {@link PCollection} elements have timestamps, the output timestamp for each
* element must not be before the input element's timestamp minus the value of {@link
* #getAllowedTimestampSkew()}. If an output timestamp is before this time, the transform will
* throw an {@link IllegalArgumentException} when executed. Use {@link
* #withAllowedTimestampSkew(Duration)} to update the allowed skew.
*
* <p>CAUTION: Use of {@link #withAllowedTimestampSkew(Duration)} permits elements to be emitted
* behind the watermark. These elements are considered late, and if behind the {@link
* Window#withAllowedLateness(Duration) allowed lateness} of a downstream {@link PCollection} may
* be silently dropped. See https://issues.apache.org/jira/browse/BEAM-644 for details on a
* replacement.
*
* <p>Each output element will be in the same windows as the input element. If a new window based
* on the new output timestamp is desired, apply a new instance of {@link Window#into(WindowFn)}.
*
* <p>This transform will fail at execution time with a {@link NullPointerException} if for any
* input element the result of {@code fn.apply(v)} is {@code null}.
*
* <p>Example of use in Java 8:
*
* <pre>{@code
* PCollection<Record> timestampedRecords = records.apply(
* WithTimestamps.of((Record rec) -> rec.getInstant());
* }</pre>
*/
public static <T> WithTimestamps<T> of(SerializableFunction<T, Instant> fn) {
return new WithTimestamps<>(fn, Duration.ZERO);
}
/**
* Returns the allowed timestamp skew duration, which is the maximum duration that timestamps can
* be shifted backward in {@link WindowedContext#outputWithTimestamp}.
*
* <p>The default value is {@code Duration.ZERO}, in which case timestamps can only be shifted
* forward to future. For infinite skew, return {@code Duration.millis(Long.MAX_VALUE)}.
*
* @deprecated This method permits a {@link DoFn} to emit elements behind the watermark. These
* elements are considered late, and if behind the {@link Window#withAllowedLateness(Duration)
* allowed lateness} of a downstream {@link PCollection} may be silently dropped. See
* https://issues.apache.org/jira/browse/BEAM-644 for details on a replacement.
*/
@Deprecated
public Duration getAllowedTimestampSkew() {
return Duration.ZERO;
}
/**
* Creates an {@link UnboundedSource} that will produce numbers starting from {@code 0} up to
* {@link Long#MAX_VALUE}, with element timestamps supplied by the specified function.
*
* <p>After {@link Long#MAX_VALUE}, the source never produces more output. (In practice, this
* limit should never be reached.)
*
* <p>Note that the timestamps produced by {@code timestampFn} may not decrease.
*
* @deprecated use {@link GenerateSequence} and call {@link
* GenerateSequence#withTimestampFn(SerializableFunction)} instead
*/
@Deprecated
public static UnboundedSource<Long, CounterMark> unboundedWithTimestampFn(
SerializableFunction<Long, Instant> timestampFn) {
return new UnboundedCountingSource(0, 1, 1L, Duration.ZERO, timestampFn);
}