下面列出了怎么用org.joda.time.Duration的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void getTuplesTest() throws Exception {
final CompletableFuture<Collection<ResolverTuple>> fut = new CompletableFuture<>();
final List<ResolverTuple> RESULT = new ArrayList<>();
when(mockedAsyncResolver.getTuples()).thenReturn((CompletableFuture) fut);
final Collection<ResolverTuple> entries;
try (CachingResolver cr = new CachingResolver(mockedAsyncResolver, Duration.standardMinutes(10), Duration.standardMinutes(60))) {
fut.complete(RESULT);
Thread.sleep(5000); // Racy test, but this should work in most cases to allow callbacks to propagate.
entries = cr.getTuples();
}
assertSame(RESULT, entries);
}
public boolean configure(final String name, final Map<String, Object> params) throws ConfigurationException {
_name = name;
_params = params;
String value = (String)params.get("ssh.sleep");
_sleep = NumbersUtil.parseInt(value, 10) * 1000;
value = (String)params.get("ssh.retry");
_retry = NumbersUtil.parseInt(value, 36);
value = (String)params.get("ssh.port");
_port = NumbersUtil.parseInt(value, 3922);
value = (String)params.get("router.aggregation.command.each.timeout");
_eachTimeout = Duration.standardSeconds(NumbersUtil.parseInt(value, (int)VRScripts.VR_SCRIPT_EXEC_TIMEOUT.getStandardSeconds()));
if (s_logger.isDebugEnabled()){
s_logger.debug("The router.aggregation.command.each.timeout in seconds is set to " + _eachTimeout.getStandardSeconds());
}
if (_vrDeployer == null) {
throw new ConfigurationException("Unable to find the resource for VirtualRouterDeployer!");
}
_vrAggregateCommandsSet = new HashMap<>();
return true;
}
/**
* Check if topics exist.
*
* @param project GCP project identifier.
* @param timeoutDuration Joda duration that sets a period of time before checking times out.
*/
public void checkIfAnySubscriptionExists(String project, Duration timeoutDuration)
throws InterruptedException, IllegalArgumentException, IOException, TimeoutException {
if (timeoutDuration.getMillis() <= 0) {
throw new IllegalArgumentException(String.format("timeoutDuration should be greater than 0"));
}
DateTime startTime = new DateTime();
int sizeOfSubscriptionList = 0;
while (sizeOfSubscriptionList == 0
&& Seconds.secondsBetween(new DateTime(), startTime).getSeconds()
< timeoutDuration.toStandardSeconds().getSeconds()) {
// Sleep 1 sec
Thread.sleep(1000);
sizeOfSubscriptionList =
listSubscriptions(projectPathFromPath(String.format("projects/%s", project)), topicPath())
.size();
}
if (sizeOfSubscriptionList > 0) {
return;
} else {
throw new TimeoutException("Timed out when checking if topics exist for " + topicPath());
}
}
@Test
@Category(NeedsRunner.class)
public void testSampleAnyZero() {
PCollection<Integer> input =
pipeline.apply(
Create.timestamped(ImmutableList.of(tv(0), tv(1), tv(2), tv(3), tv(4), tv(5)))
.withCoder(BigEndianIntegerCoder.of()));
PCollection<Integer> output =
input
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(3))))
.apply(Sample.any(0));
PAssert.that(output)
.inWindow(new IntervalWindow(new Instant(0), Duration.standardSeconds(3)))
.satisfies(new VerifyCorrectSample<>(0, EMPTY));
PAssert.that(output)
.inWindow(new IntervalWindow(new Instant(3000), Duration.standardSeconds(3)))
.satisfies(new VerifyCorrectSample<>(0, EMPTY));
pipeline.run();
}
private void invalidateAllIfStale(CachingContext context) {
if (!lock.compareAndSet(false, true)) {
return;
}
try {
final Duration elapsed = new Duration(lastSync, DateTime.now());
if (!elapsed.isLongerThan(period)) {
return;
}
lastSync = DateTime.now();
} finally {
lock.set(false);
}
if (context.checkIfAnyStale()) {
context.invalidateAll(true);
}
}
/** Wait for a success signal for {@code duration}. */
public void waitForSuccess(Duration duration) throws IOException {
SubscriptionPath resultSubscriptionPath =
PubsubClient.subscriptionPathFromName(
pipelineOptions.getProject(),
"result-subscription-" + String.valueOf(ThreadLocalRandom.current().nextLong()));
pubsub.createSubscription(
resultTopicPath, resultSubscriptionPath, (int) duration.getStandardSeconds());
String result = pollForResultForDuration(resultSubscriptionPath, duration);
if (!RESULT_SUCCESS_MESSAGE.equals(result)) {
throw new AssertionError(result);
}
}
public static <K, V> Cache<K, V> newInstance(CacheFactory.CacheType cacheType, String cacheName, CacheInformation.ExpiryType expiryType, Duration expiryDuration) {
try {
ConfigurableFactoryHelper helper;
switch(cacheType) {
case MEMORY:
helper = new ConfigurableFactoryHelper("be.ehealth.technicalconnector.cache.memory.impl", DEFAULT_CACHE_MEMORY_IMPL);
break;
case PERSISTENT:
helper = new ConfigurableFactoryHelper("be.ehealth.technicalconnector.cache.persistent.impl", DEFAULT_CACHE_PERSISTENT_IMPL);
break;
default:
throw new IllegalArgumentException("Unsupported cache type [" + cacheType + "]");
}
Map<String, Object> options = new HashMap();
options.put("cacheName", cacheName);
if (expiryType != null) {
options.put("cacheExpiryType", CacheFactory.ExpiryType.valueOf(expiryType.name()));
}
options.put("cacheExpiryDuration", expiryDuration);
return (Cache)helper.getImplementation(options);
} catch (TechnicalConnectorException var6) {
throw new ConfigurationException(var6);
}
}
@Test
public void testBuild() {
final PCollection<String> dataset = TestUtils.createMockDataset(TypeDescriptors.strings());
final FixedWindows windowing = FixedWindows.of(org.joda.time.Duration.standardHours(1));
final DefaultTrigger trigger = DefaultTrigger.of();
final PCollection<KV<String, Long>> counted =
CountByKey.named("CountByKey1")
.of(dataset)
.keyBy(s -> s)
.windowBy(windowing)
.triggeredBy(trigger)
.discardingFiredPanes()
.withAllowedLateness(Duration.millis(1000))
.output();
final CountByKey count = (CountByKey) TestUtils.getProducer(counted);
assertTrue(count.getName().isPresent());
assertEquals("CountByKey1", count.getName().get());
assertNotNull(count.getKeyExtractor());
assertTrue(count.getWindow().isPresent());
final WindowDesc<?> desc = WindowDesc.of((Window<?>) count.getWindow().get());
assertEquals(windowing, desc.getWindowFn());
assertEquals(trigger, desc.getTrigger());
assertEquals(AccumulationMode.DISCARDING_FIRED_PANES, desc.getAccumulationMode());
assertEquals(Duration.millis(1000), desc.getAllowedLateness());
}
@Test
public void testKeyedAccumulation()
{
KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator(false);
windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
windowedOperator.setup(testMeta.operatorContext);
windowedOperator.beginWindow(1);
windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 100L, new KeyValPair<>("a", 2L)));
windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 200L, new KeyValPair<>("a", 3L)));
windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 300L, new KeyValPair<>("b", 4L)));
windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 150L, new KeyValPair<>("b", 5L)));
windowedOperator.endWindow();
Assert.assertEquals(1, keyedDataStorage.size());
Assert.assertEquals(5L, keyedDataStorage.get(new Window.TimeWindow(BASE, 1000), "a").longValue());
Assert.assertEquals(9L, keyedDataStorage.get(new Window.TimeWindow(BASE, 1000), "b").longValue());
windowedOperator.teardown();
}
@Override
public PCollection<Instant> expand(PBegin input) {
PCollection<Instant> result =
input
.apply(
Create.<PeriodicSequence.SequenceDefinition>of(
new PeriodicSequence.SequenceDefinition(
startTimestamp, stopTimestamp, fireInterval)))
.apply(PeriodicSequence.create());
if (this.applyWindowing) {
result =
result.apply(
Window.<Instant>into(FixedWindows.of(Duration.millis(fireInterval.getMillis()))));
}
return result;
}
@Override
public State waitUntilFinish(@Nullable Duration duration) {
try {
if (duration == null) {
runner.waitForFinish();
} else {
runner.waitForFinish(java.time.Duration.ofMillis(duration.getMillis()));
}
} catch (Exception e) {
throw new Pipeline.PipelineExecutionException(e);
}
final StateInfo stateInfo = getStateInfo();
if (listener != null && (stateInfo.state == State.DONE || stateInfo.state == State.FAILED)) {
listener.onFinish();
}
if (stateInfo.state == State.FAILED) {
throw stateInfo.error;
}
LOG.info("Pipeline finished. Final state: {}", stateInfo.state);
return stateInfo.state;
}
@Test
public void testBuild_Windowing() {
final PCollection<String> dataset = TestUtils.createMockDataset(TypeDescriptors.strings());
final PCollection<String> uniq =
Distinct.of(dataset)
.windowBy(FixedWindows.of(org.joda.time.Duration.standardHours(1)))
.triggeredBy(DefaultTrigger.of())
.accumulationMode(AccumulationMode.DISCARDING_FIRED_PANES)
.output();
final Distinct distinct = (Distinct) TestUtils.getProducer(uniq);
assertTrue(distinct.getWindow().isPresent());
@SuppressWarnings("unchecked")
final WindowDesc<?> windowDesc = WindowDesc.of((Window) distinct.getWindow().get());
assertEquals(
FixedWindows.of(org.joda.time.Duration.standardHours(1)), windowDesc.getWindowFn());
assertEquals(DefaultTrigger.of(), windowDesc.getTrigger());
}
@Test
@Category(ValidatesRunner.class)
public void testNonPartitioningWindowing() {
PCollection<String> input =
p.apply(
Create.timestamped(
TimestampedValue.of("a", new Instant(1)),
TimestampedValue.of("a", new Instant(7)),
TimestampedValue.of("b", new Instant(8))));
PCollection<String> output =
input.apply(new WindowedCount(SlidingWindows.of(new Duration(10)).every(new Duration(5))));
PAssert.that(output)
.containsInAnyOrder(
output("a", 1, 1, -5, 5),
output("a", 2, 5, 0, 10),
output("a", 1, 10, 5, 15),
output("b", 1, 8, 0, 10),
output("b", 1, 10, 5, 15));
p.run();
}
public static void main(String[] args) throws Exception {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
options.setCheckpointingInterval(1000L);
options.setNumberOfExecutionRetries(5);
options.setExecutionRetryDelay(3000L);
options.setRunner(FlinkPipelineRunner.class);
PTransform<? super PBegin, PCollection<String>> readSourceA =
Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("FirstStream");
PTransform<? super PBegin, PCollection<String>> readSourceB =
Read.from(new UnboundedSocketSource<>("localhost", 9998, '\n', 3)).named("SecondStream");
WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize()));
Pipeline p = Pipeline.create(options);
// the following two 'applys' create multiple inputs to our pipeline, one for each
// of our two input sources.
PCollection<String> streamA = p.apply(readSourceA)
.apply(Window.<String>into(windowFn)
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
PCollection<String> streamB = p.apply(readSourceB)
.apply(Window.<String>into(windowFn)
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
PCollection<String> formattedResults = joinEvents(streamA, streamB);
formattedResults.apply(TextIO.Write.to("./outputJoin.txt"));
p.run();
}
@Test
@Category({NeedsRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class})
public void testUnboundedSourceMetrics() {
long numElements = 1000;
// Use withMaxReadTime to force unbounded mode.
pipeline.apply(
GenerateSequence.from(0).to(numElements).withMaxReadTime(Duration.standardDays(1)));
PipelineResult pipelineResult = pipeline.run();
MetricQueryResults metrics =
pipelineResult
.metrics()
.queryMetrics(
MetricsFilter.builder()
.addNameFilter(
MetricNameFilter.named(
ELEMENTS_READ.getNamespace(), ELEMENTS_READ.getName()))
.build());
assertThat(
metrics.getCounters(),
hasItem(
attemptedMetricsResult(
ELEMENTS_READ.getNamespace(),
ELEMENTS_READ.getName(),
"Read(UnboundedCountingSource)",
1000L)));
}
/**
* Tests that {@link UpdateTeamScoreFn} {@link org.apache.beam.sdk.transforms.DoFn} outputs
* correctly for multiple teams.
*/
@Test
public void testScoreUpdatesPerTeam() {
TestStream<KV<String, GameActionInfo>> createEvents =
TestStream.create(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(GameActionInfo.class)))
.advanceWatermarkTo(baseTime)
.addElements(
event(TestUser.RED_ONE, 50, Duration.standardSeconds(10)),
event(TestUser.RED_TWO, 50, Duration.standardSeconds(20)),
event(TestUser.BLUE_ONE, 70, Duration.standardSeconds(30)),
event(TestUser.BLUE_TWO, 80, Duration.standardSeconds(40)),
event(TestUser.BLUE_TWO, 50, Duration.standardSeconds(50)))
.advanceWatermarkToInfinity();
PCollection<KV<String, Integer>> teamScores =
p.apply(createEvents).apply(ParDo.of(new UpdateTeamScoreFn(100)));
String redTeam = TestUser.RED_ONE.getTeam();
String blueTeam = TestUser.BLUE_ONE.getTeam();
PAssert.that(teamScores)
.inWindow(GlobalWindow.INSTANCE)
.containsInAnyOrder(KV.of(redTeam, 100), KV.of(blueTeam, 150), KV.of(blueTeam, 200));
p.run().waitUntilFinish();
}
/**
* Tests that if the first trigger rewinds to be non-finished in the merged window, then it
* becomes the currently active trigger again, with real triggers.
*/
@Test
public void testShouldFireAfterMerge() throws Exception {
tester =
TriggerStateMachineTester.forTrigger(
AfterEachStateMachine.inOrder(
AfterPaneStateMachine.elementCountAtLeast(5)
.orFinally(AfterWatermarkStateMachine.pastEndOfWindow()),
RepeatedlyStateMachine.forever(AfterPaneStateMachine.elementCountAtLeast(1))),
Sessions.withGapDuration(Duration.millis(10)));
// Finished the orFinally in the first window
tester.injectElements(1);
IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
assertFalse(tester.shouldFire(firstWindow));
tester.advanceInputWatermark(new Instant(11));
assertTrue(tester.shouldFire(firstWindow));
tester.fireIfShouldFire(firstWindow);
// Set up second window where it is not done
tester.injectElements(5);
IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
assertFalse(tester.shouldFire(secondWindow));
// Merge them, if the merged window were on the second trigger, it would be ready
tester.mergeWindows();
IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
assertFalse(tester.shouldFire(mergedWindow));
// Now adding 3 more makes the main trigger ready to fire
tester.injectElements(1, 2, 3, 4, 5);
tester.mergeWindows();
assertTrue(tester.shouldFire(mergedWindow));
}
private FluentBackoff(
double exponent,
Duration initialBackoff,
Duration maxBackoff,
Duration maxCumulativeBackoff,
int maxRetries) {
this.exponent = exponent;
this.initialBackoff = initialBackoff;
this.maxBackoff = maxBackoff;
this.maxRetries = maxRetries;
this.maxCumulativeBackoff = maxCumulativeBackoff;
}
@Test
public void uniqueJobIdPerWindow() {
TestStream<FeatureRow> featureRowTestStream =
TestStream.create(ProtoCoder.of(FeatureRow.class))
.advanceWatermarkTo(Instant.now())
.addElements(generateRow("myproject/fs"))
.addElements(generateRow("myproject/fs"))
.advanceWatermarkTo(Instant.now().plus(Duration.standardSeconds(10)))
.addElements(generateRow("myproject/fs"))
.addElements(generateRow("myproject/fs"))
.advanceWatermarkToInfinity();
FeatureSink sink =
makeSink(
ValueProvider.StaticValueProvider.of(bigQuery),
p.apply(
"StaticSpecs",
Create.of(
ImmutableMap.of(
FeatureSetReference.of(spec.getProject(), spec.getName(), 1), spec))));
p.apply(featureRowTestStream).apply(sink.writer());
p.run();
assertThat(jobService.getAllJobs().size(), is(2));
assertThat(
jobService.getAllJobs().stream()
.map(j -> j.getJobReference().getJobId())
.distinct()
.count(),
is(2L));
}
@Test
public void testSuccess_oneRequest_oneResponse() {
Object request = new Object();
Object response = new Object();
// Inbound message passed to the next handler.
assertThat(channel.writeInbound(request)).isTrue();
assertThat((Object) channel.readInbound()).isEqualTo(request);
fakeClock.advanceOneMilli();
// Outbound message passed to the next handler.
assertThat(channel.writeOutbound(response)).isTrue();
assertThat((Object) channel.readOutbound()).isEqualTo(response);
// Verify that latency is recorded.
verify(metrics).responseSent(PROTOCOL_NAME, CLIENT_CERT_HASH, Duration.millis(1));
verifyNoMoreInteractions(metrics);
}
@Test
public void testResumeSetsTimer() throws Exception {
DoFn<Integer, String> fn = new SelfInitiatedResumeFn();
Instant base = Instant.now();
dateTimeProvider.setDateTimeFixed(base.getMillis());
ProcessFnTester<Integer, String, SomeRestriction, Void, Void> tester =
new ProcessFnTester<>(
base,
fn,
BigEndianIntegerCoder.of(),
SerializableCoder.of(SomeRestriction.class),
VoidCoder.of(),
MAX_OUTPUTS_PER_BUNDLE,
MAX_BUNDLE_DURATION);
tester.startElement(42, new SomeRestriction());
assertThat(tester.takeOutputElements(), contains("42"));
// Should resume after 5 seconds: advancing by 3 seconds should have no effect.
assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
assertTrue(tester.takeOutputElements().isEmpty());
// 6 seconds should be enough should invoke the fn again.
assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
assertThat(tester.takeOutputElements(), contains("42"));
// Should again resume after 5 seconds: advancing by 3 seconds should again have no effect.
assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
assertTrue(tester.takeOutputElements().isEmpty());
// 6 seconds should again be enough.
assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
assertThat(tester.takeOutputElements(), contains("42"));
}
public UUID runQuery(final ExecutionRequest request,
final AirpalUser user,
final String schema,
final Duration timeout)
{
return runQuery(request.getQuery(), request.getTmpTable(), user, schema, timeout);
}
@Test
public void testNumBuckets()
{
testMeta.timeBucketAssigner.setExpireBefore(Duration.standardHours(1));
testMeta.timeBucketAssigner.setBucketSpan(Duration.standardMinutes(30));
testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext);
Assert.assertEquals("num buckets", 2, testMeta.timeBucketAssigner.getNumBuckets());
testMeta.timeBucketAssigner.teardown();
}
@Test
public void testTimeoutReceivingResponse() throws Exception {
InputStream mockInputStream = mock(InputStream.class);
when(mockInputStream.read()).thenThrow(new SocketTimeoutException("testing"));
when(mockSocket.getInputStream()).thenReturn(mockInputStream);
when(mockSocket.getOutputStream()).thenReturn(new ByteArrayOutputStream());
Duration testTimeout = Duration.standardSeconds(1);
DnsMessageTransport resolver = new DnsMessageTransport(mockFactory, UPDATE_HOST, testTimeout);
Message expectedQuery = new Message();
assertThrows(SocketTimeoutException.class, () -> resolver.send(expectedQuery));
verify(mockSocket).setSoTimeout((int) testTimeout.getMillis());
}
public boolean configureHostParams(final Map<String, String> params) {
if (_params.get("router.aggregation.command.each.timeout") != null) {
String value = (String)params.get("router.aggregation.command.each.timeout");
_eachTimeout = Duration.standardSeconds(NumbersUtil.parseLong(value, 600));
if (s_logger.isDebugEnabled()){
s_logger.debug("The router.aggregation.command.each.timeout in seconds is set to " + _eachTimeout.getStandardSeconds());
}
}
return true;
}
@Test
public void shouldAdvanceWatermarkToNowWithProcessingTimePolicy() {
WatermarkPolicy policy =
WatermarkPolicyFactory.withProcessingTimePolicy().createWatermarkPolicy();
mockStatic(Instant.class);
Instant time1 = NOW.minus(Duration.standardSeconds(5));
Instant time2 = NOW.minus(Duration.standardSeconds(4));
when(Instant.now()).thenReturn(time1).thenReturn(time2);
assertThat(policy.getWatermark()).isEqualTo(time1);
assertThat(policy.getWatermark()).isEqualTo(time2);
}
@Test
@Category({NeedsRunner.class, UsesTestStream.class})
public void testInDifferentWindows() {
Instant base = new Instant(0);
TestStream<String> values =
TestStream.create(StringUtf8Coder.of())
.advanceWatermarkTo(base)
.addElements(
TimestampedValue.of("k1", base),
TimestampedValue.of("k2", base.plus(Duration.standardSeconds(10))),
TimestampedValue.of("k3", base.plus(Duration.standardSeconds(20))),
TimestampedValue.of("k1", base.plus(Duration.standardSeconds(30))),
TimestampedValue.of("k2", base.plus(Duration.standardSeconds(40))),
TimestampedValue.of("k3", base.plus(Duration.standardSeconds(50))),
TimestampedValue.of("k4", base.plus(Duration.standardSeconds(60))),
TimestampedValue.of("k5", base.plus(Duration.standardSeconds(70))),
TimestampedValue.of("k6", base.plus(Duration.standardSeconds(80))))
.advanceWatermarkToInfinity();
PCollection<String> distinctValues =
p.apply(values)
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(30))))
.apply(Deduplicate.values());
PAssert.that(distinctValues)
.inWindow(new IntervalWindow(base, base.plus(Duration.standardSeconds(30))))
.containsInAnyOrder("k1", "k2", "k3");
PAssert.that(distinctValues)
.inWindow(
new IntervalWindow(
base.plus(Duration.standardSeconds(30)), base.plus(Duration.standardSeconds(60))))
.containsInAnyOrder("k1", "k2", "k3");
PAssert.that(distinctValues)
.inWindow(
new IntervalWindow(
base.plus(Duration.standardSeconds(60)), base.plus(Duration.standardSeconds(90))))
.containsInAnyOrder("k4", "k5", "k6");
p.run();
}
@Test
public void testLoadAtPointInTime_beforeCreated_returnsNull() {
clock.advanceOneMilli();
// Don't save a commit log, we shouldn't need one.
HostResource host = persistResource(
newHostResource("ns1.cat.tld").asBuilder()
.setCreationTimeForTest(clock.nowUtc())
.build());
assertThat(loadAtPointInTime(host, clock.nowUtc().minus(Duration.millis(1))).now()).isNull();
}
@Test(expected = RuntimeException.class)
public void testRetryableASEException2() throws Exception {
AmazonServiceException ase = new AmazonServiceException("Test");
ase.setErrorCode("ArbitRetryableException");
ase.setStatusCode(503);
when(call.call()).thenThrow(ase);
DynamoDBFibonacciRetryer retryer = new DynamoDBFibonacciRetryer(Duration.standardSeconds(10));
try {
retryer.runWithRetry(call, null, null);
} finally {
verify(call, atLeast(2)).call();
verify(call, atMost(15)).call();
}
}
@Override
public Stream<Collection<CollectHistory.NamedEvaluation>> evaluate(Map<String, ? extends TimeSeriesMetricExpression> query, DateTime begin, Duration stepSize) {
LOG.log(Level.FINE, "request received({0}, {1})", new Object[]{begin, stepSize});
Stream<Collection<CollectHistory.NamedEvaluation>> result = history.evaluate(query, begin, stepSize);
LOG.log(Level.FINE, "returning({0}, {1}) => {2}", new Object[]{begin, stepSize, result});
return result;
}