下面列出了怎么用org.apache.commons.lang3.mutable.MutableBoolean的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testValidationFailureForMissingRequestContextAttribute() {
MutableBoolean exceptionHandled = new MutableBoolean(false);
LogEventFactory.setDefaultHandler((message, ex) -> {
assertThat(ex, instanceOf(ConstraintValidationException.class));
exceptionHandled.setTrue();
});
Transfer transfer = LogEventFactory.getEvent(Transfer.class);
ThreadContext.put("companyId", "12345");
ThreadContext.put("ipAddress", "127.0.0.1");
ThreadContext.put("environment", "dev");
ThreadContext.put("product", "TestProduct");
ThreadContext.put("timeZone", "America/Phoenix");
ThreadContext.put("loginId", "TestUser");
transfer.setToAccount(123456);
transfer.setFromAccount(111111);
transfer.setAmount(new BigDecimal(111.55));
transfer.logEvent();
assertTrue("Should have thrown a ConstraintValidationException", exceptionHandled.isTrue());
}
@Test
public void testValidationFailureForMissingEventAttribute() {
MutableBoolean exceptionHandled = new MutableBoolean(false);
LogEventFactory.setDefaultHandler((message, ex) -> {
assertThat(ex, instanceOf(ConstraintValidationException.class));
exceptionHandled.setTrue();
});
Transfer transfer = LogEventFactory.getEvent(Transfer.class);
ThreadContext.put("accountNumber", "12345");
ThreadContext.put("companyId", "12345");
ThreadContext.put("userId", "JohnDoe");
ThreadContext.put("ipAddress", "127.0.0.1");
ThreadContext.put("environment", "dev");
ThreadContext.put("product", "TestProduct");
ThreadContext.put("timeZone", "America/Phoenix");
ThreadContext.put("loginId", "TestUser");
transfer.setToAccount(123456);
transfer.setFromAccount(111111);
transfer.logEvent();
assertTrue("Should have thrown a ConstraintValidationException", exceptionHandled.isTrue());
}
@Test
public void testCustomExceptionHandlerIsPassedToEvent() {
AbstractConfiguration config = setUpFailingAppender();
MutableBoolean exceptionHandled = new MutableBoolean(false);
LogEventFactory.setDefaultHandler((message, ex) -> {
assertThat(ex, instanceOf(LoggingException.class));
exceptionHandled.setTrue();
});
Transfer transfer = setUpMinimumEvent();
transfer.logEvent();
assertTrue("Exception was not handled through the custom handler", exceptionHandled.isTrue());
config.removeAppender(failingAppenderName);
}
@Test
public void firstDoneTest()
{
SimpleDoneQueueManager<Query, Void> sdqqm = new SimpleDoneQueueManager<Query, Void>();
sdqqm.setup(null);
sdqqm.beginWindow(0);
Query query = new MockQuery("1");
sdqqm.enqueue(query, null, new MutableBoolean(true));
QueryBundle<Query, Void, MutableBoolean> qb = sdqqm.dequeue();
Assert.assertEquals("Should return back null.", null, qb);
sdqqm.endWindow();
sdqqm.beginWindow(1);
qb = sdqqm.dequeue();
Assert.assertEquals("Should return back null.", null, qb);
qb = sdqqm.dequeue();
Assert.assertEquals("Should return back null.", null, qb);
sdqqm.endWindow();
sdqqm.teardown();
}
@Test
public void simpleEnqueueDequeueBlock()
{
SimpleDoneQueueManager<Query, Void> sdqqm = new SimpleDoneQueueManager<Query, Void>();
sdqqm.setup(null);
sdqqm.beginWindow(0);
Query query = new MockQuery("1");
sdqqm.enqueue(query, null, new MutableBoolean(false));
Assert.assertEquals(1, sdqqm.getNumLeft());
Assert.assertEquals(sdqqm.getNumPermits(), sdqqm.getNumLeft());
QueryBundle<Query, Void, MutableBoolean> qb = sdqqm.dequeueBlock();
Assert.assertEquals(0, sdqqm.getNumLeft());
Assert.assertEquals(sdqqm.getNumPermits(), sdqqm.getNumLeft());
Assert.assertEquals("Should return same query.", query, qb.getQuery());
sdqqm.endWindow();
sdqqm.teardown();
}
@Test
public void simpleEnqueueDequeueThenBlock() throws Exception
{
SimpleDoneQueueManager<Query, Void> sdqqm = new SimpleDoneQueueManager<Query, Void>();
sdqqm.setup(null);
sdqqm.beginWindow(0);
Query query = new MockQuery("1");
sdqqm.enqueue(query, null, new MutableBoolean(false));
Assert.assertEquals(1, sdqqm.getNumLeft());
Assert.assertEquals(sdqqm.getNumPermits(), sdqqm.getNumLeft());
QueryBundle<Query, Void, MutableBoolean> qb = sdqqm.dequeueBlock();
Assert.assertEquals(0, sdqqm.getNumLeft());
Assert.assertEquals(sdqqm.getNumPermits(), sdqqm.getNumLeft());
testBlocking(sdqqm);
sdqqm.endWindow();
sdqqm.teardown();
}
private String toRangeClause(SqlgGraph sqlgGraph, MutableBoolean mutableOrderBy) {
if (this.sqlgRangeHolder != null && this.sqlgRangeHolder.isApplyOnDb()) {
if (this.sqlgRangeHolder.hasRange()) {
//This is MssqlServer, ugly but what to do???
String sql = "";
if (mutableOrderBy.isFalse() && sqlgGraph.getSqlDialect().isMssqlServer() && this.getDbComparators().isEmpty()) {
sql = "\n\tORDER BY 1\n\t";
}
return sql + "\n" + sqlgGraph.getSqlDialect().getRangeClause(this.sqlgRangeHolder.getRange());
} else {
Preconditions.checkState(this.sqlgRangeHolder.hasSkip(), "If not a range query then it must be a skip.");
return sqlgGraph.getSqlDialect().getSkipClause(this.sqlgRangeHolder.getSkip());
}
}
return "";
}
@Test
public void clearFilters() {
final List<Person> listOfPersons = getListOfPersons(100);
FilterableListContainer<Person> container = new FilterableListContainer<>(
listOfPersons);
container.addContainerFilter(new SimpleStringFilter("firstName",
"First1", true, true));
Assert.assertNotSame(listOfPersons.size(), container.size());
container.removeAllContainerFilters();
Assert.assertEquals(listOfPersons.size(), container.size());
container.addContainerFilter(new SimpleStringFilter("firstName",
"foobar", true, true));
Assert.assertEquals(0, container.size());
final MutableBoolean fired = new MutableBoolean(false);
container.addListener(new Container.ItemSetChangeListener() {
@Override
public void containerItemSetChange(
Container.ItemSetChangeEvent event) {
fired.setTrue();
}
});
container.removeAllContainerFilters();
Assert.assertTrue(fired.booleanValue());
Assert.assertEquals(listOfPersons.size(), container.size());
}
/**
* Used to fetch the first data holding batch from either the build or probe side.
* @param outcome The current upstream outcome for either the build or probe side.
* @param prefetched A flag indicating if we have already done a prefetch of the first data holding batch for the probe or build side.
* @param isEmpty A flag indicating if the probe or build side is empty.
* @param index The upstream index of the probe or build batch.
* @param batch The probe or build batch itself.
* @param memoryManagerUpdate A lambda function to execute the memory manager update for the probe or build batch.
* @return The current {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}.
*/
private IterOutcome prefetchFirstBatch(IterOutcome outcome,
final MutableBoolean prefetched,
final MutableBoolean isEmpty,
final int index,
final RecordBatch batch,
final Runnable memoryManagerUpdate) {
if (prefetched.booleanValue()) {
// We have already prefetch the first data holding batch
return outcome;
}
// If we didn't retrieve our first data holding batch, we need to do it now.
prefetched.setValue(true);
if (outcome != IterOutcome.NONE) {
// We can only get data if there is data available
outcome = sniffNonEmptyBatch(outcome, index, batch);
}
isEmpty.setValue(outcome == IterOutcome.NONE); // If we recieved NONE there is no data.
if (outcome == IterOutcome.OUT_OF_MEMORY) {
// We reached a termination state
state = BatchState.OUT_OF_MEMORY;
} else if (outcome == IterOutcome.STOP) {
// We reached a termination state
state = BatchState.STOP;
} else {
// Got our first batch(es)
if (spilledState.isFirstCycle()) {
// Only collect stats for the first cylce
memoryManagerUpdate.run();
}
state = BatchState.FIRST;
}
return outcome;
}
private boolean isExpectedElementsQuantity(SearchAttributes locator, Matcher<Integer> elementsMatcher,
MutableBoolean firstIteration)
{
if (firstIteration.isTrue())
{
firstIteration.setValue(false);
return softAssert.assertThat("Elements number", getElementsNumber(locator), elementsMatcher);
}
return elementsMatcher.matches(getElementsNumber(locator));
}
private static void processRuleResult(XccdfTestResult testResult,
List<TestResultRuleResult> ruleResults,
String label,
MutableBoolean truncated) {
for (TestResultRuleResult rr : ruleResults) {
XccdfRuleResult ruleResult = new XccdfRuleResult();
ruleResult.setTestResult(testResult);
testResult.getResults().add(ruleResult);
Optional<XccdfRuleResultType> resultType
= ScapFactory.lookupRuleResultType(label);
ruleResult.setResultType(
resultType.orElseThrow(() ->
new RuntimeException("no xccdf result type found for label=" +
label)));
ruleResult.getIdents().add(
getOrCreateIdent("#IDREF#", truncate(rr.getId(), 255, truncated)));
if (rr.getIdents() != null) {
for (TestResultRuleResultIdent rrIdent : rr.getIdents()) {
String text = truncate(rrIdent.getText(), 255, truncated);
if (StringUtils.isEmpty(text)) {
continue;
}
ruleResult.getIdents().add(
getOrCreateIdent(
rrIdent.getSystem(),
text));
}
}
}
}
private static XccdfProfile getOrCreateProfile(Profile profile,
MutableBoolean truncated) {
long profileId = lookupProfile(profile.getId(),
truncate(profile.getTitle(),
120, truncated));
return ScapFactory.lookupProfileById(profileId)
.orElseThrow(() ->
new RuntimeException(
"Xccdf benchmark not found in db identifier=" +
profile.getId() +
", version=" +
profile.getTitle()));
}
private static XccdfBenchmark getOrCreateBenchmark(BenchmarkResume resume,
MutableBoolean truncated) {
long benchId = lookupBenchmark(truncate(resume.getId(), 120, truncated),
truncate(resume.getVersion(), 80, truncated));
return ScapFactory.lookupBenchmarkById(benchId)
.orElseThrow(() ->
new RuntimeException(
"Xccdf benchmark not found in db identifier=" +
resume.getId() +
", version=" +
resume.getVersion()));
}
private static String truncate(String string, int maxLen, MutableBoolean truncated) {
if (string != null && string.length() > maxLen) {
truncated.setValue(true);
return string.substring(0, maxLen - 3) + "...";
}
return string;
}
@Test
public void testValidationFailureForInvalidRequestContextAttribute() {
MutableBoolean exceptionHandled = new MutableBoolean(false);
LogEventFactory.setDefaultHandler((message, ex) -> {
assertThat(ex, instanceOf(ConstraintValidationException.class));
exceptionHandled.setTrue();
});
Transfer transfer = LogEventFactory.getEvent(Transfer.class);
transfer.setToAccount(0);
assertTrue("Should have thrown a ConstraintValidationException", exceptionHandled.isTrue());
}
@Test
public void simpleEnqueueDequeue()
{
SimpleDoneQueueManager<Query, Void> sdqqm = new SimpleDoneQueueManager<Query, Void>();
sdqqm.setup(null);
sdqqm.beginWindow(0);
Query query = new MockQuery("1");
sdqqm.enqueue(query, null, new MutableBoolean(false));
QueryBundle<Query, Void, MutableBoolean> qb = sdqqm.dequeue();
Assert.assertEquals("Should return same query.", query, qb.getQuery());
qb = sdqqm.dequeue();
Assert.assertEquals("Should return back null.", null, qb);
sdqqm.endWindow();
sdqqm.beginWindow(1);
qb = sdqqm.dequeue();
Assert.assertEquals("Should return same query.", query, qb.getQuery());
qb = sdqqm.dequeue();
Assert.assertEquals("Should return back null.", null, qb);
sdqqm.endWindow();
sdqqm.teardown();
}
@Test
public void simpleExpire1()
{
SimpleDoneQueueManager<Query, Void> sdqqm = new SimpleDoneQueueManager<Query, Void>();
sdqqm.setup(null);
sdqqm.beginWindow(0);
Query query = new MockQuery("1");
sdqqm.enqueue(query, null, new MutableBoolean(false));
QueryBundle<Query, Void, MutableBoolean> qb = sdqqm.dequeue();
Assert.assertEquals("Should return same query.", query, qb.getQuery());
qb.getQueueContext().setValue(true);
qb = sdqqm.dequeue();
Assert.assertEquals("Should return back null.", null, qb);
sdqqm.endWindow();
sdqqm.beginWindow(1);
qb = sdqqm.dequeue();
Assert.assertEquals("Should return back null.", null, qb);
sdqqm.endWindow();
sdqqm.teardown();
}
@Test
public void expiredTestBlocking() throws Exception
{
SimpleDoneQueueManager<Query, Void> sdqqm = new SimpleDoneQueueManager<Query, Void>();
sdqqm.setup(null);
sdqqm.beginWindow(0);
Query query = new MockQuery("1");
MutableBoolean queueContext = new MutableBoolean(false);
sdqqm.enqueue(query, null, queueContext);
Assert.assertEquals(1, sdqqm.getNumLeft());
Assert.assertEquals(sdqqm.getNumPermits(), sdqqm.getNumLeft());
QueryBundle<Query, Void, MutableBoolean> qb = sdqqm.dequeueBlock();
Assert.assertEquals(0, sdqqm.getNumLeft());
Assert.assertEquals(sdqqm.getNumPermits(), sdqqm.getNumLeft());
sdqqm.endWindow();
sdqqm.beginWindow(1);
Assert.assertEquals(1, sdqqm.getNumLeft());
Assert.assertEquals(sdqqm.getNumPermits(), sdqqm.getNumLeft());
queueContext.setValue(true);
testBlocking(sdqqm);
Assert.assertEquals(0, sdqqm.getNumLeft());
Assert.assertEquals(sdqqm.getNumPermits(), sdqqm.getNumLeft());
sdqqm.endWindow();
sdqqm.teardown();
}
@Test
public void simpleExpire1ThenBlock()
{
SimpleDoneQueueManager<Query, Void> sdqqm = new SimpleDoneQueueManager<Query, Void>();
sdqqm.setup(null);
sdqqm.beginWindow(0);
Query query = new MockQuery("1");
sdqqm.enqueue(query, null, new MutableBoolean(false));
QueryBundle<Query, Void, MutableBoolean> qb = sdqqm.dequeue();
Assert.assertEquals("Should return same query.", query, qb.getQuery());
qb.getQueueContext().setValue(true);
qb = sdqqm.dequeue();
Assert.assertEquals("Should return back null.", null, qb);
sdqqm.endWindow();
sdqqm.beginWindow(1);
qb = sdqqm.dequeue();
Assert.assertEquals("Should return back null.", null, qb);
sdqqm.endWindow();
sdqqm.teardown();
}
@Test
public void simpleExpireBlockThenUnblock() throws Exception
{
SimpleDoneQueueManager<Query, Void> sdqqm = new SimpleDoneQueueManager<Query, Void>();
sdqqm.setup(null);
sdqqm.beginWindow(0);
Query query = new MockQuery("1");
MutableBoolean expire = new MutableBoolean(false);
sdqqm.enqueue(query, null, expire);
sdqqm.endWindow();
sdqqm.beginWindow(1);
//Expire
expire.setValue(true);
ExceptionSaverExceptionHandler eseh = new ExceptionSaverExceptionHandler();
testBlockingNoStop(sdqqm, eseh);
query = new MockQuery("2");
sdqqm.enqueue(query, null, new MutableBoolean(false));
Thread.sleep(1000);
Assert.assertNull(eseh.getCaughtThrowable());
sdqqm.endWindow();
sdqqm.teardown();
}
@SuppressWarnings({"deprecation", "CallToThreadStopSuspendOrResumeManager"})
private void testBlocking(SimpleDoneQueueManager<Query, Void> sdqqm) throws InterruptedException
{
Thread thread = new Thread(new BlockedThread<Query, Void, MutableBoolean>(sdqqm));
//thread.setUncaughtExceptionHandler(new RethrowExceptionHandler(Thread.currentThread()));
thread.start();
Thread.sleep(100);
Assert.assertEquals(Thread.State.WAITING, thread.getState());
thread.stop();
}
private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries,
ServerName serverName, MultiResponse resp) {
ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(),
serverName, resp);
List<Action> failedActions = new ArrayList<>();
MutableBoolean retryImmediately = new MutableBoolean(false);
actionsByRegion.forEach((rn, regionReq) -> {
RegionResult regionResult = resp.getResults().get(rn);
Throwable regionException = resp.getException(rn);
if (regionResult != null) {
regionReq.actions.forEach(action -> onComplete(action, regionReq, tries, serverName,
regionResult, failedActions, regionException, retryImmediately));
} else {
Throwable error;
if (regionException == null) {
LOG.error("Server sent us neither results nor exceptions for {}",
Bytes.toStringBinary(rn));
error = new RuntimeException("Invalid response");
} else {
error = translateException(regionException);
}
logException(tries, () -> Stream.of(regionReq), error, serverName);
conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
failAll(regionReq.actions.stream(), tries, error, serverName);
return;
}
if (!retryImmediately.booleanValue() && error instanceof RetryImmediatelyException) {
retryImmediately.setTrue();
}
addError(regionReq.actions, error, serverName);
failedActions.addAll(regionReq.actions);
}
});
if (!failedActions.isEmpty()) {
tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue(), false);
}
}
public static <T> Iterable<T> iterable(Iterator<T> seq) {
final MutableBoolean used = new MutableBoolean(false);
return new Iterable<T>() {
@Override
public Iterator<T> iterator() {
if (!used.booleanValue()) {
used.setValue(true);
return seq;
} else {
throw new IllegalStateException("only allowed to iterate this iterable once");
}
}
};
}
/**
* Steps designed to perform steps while specified condition persists
* <b>if</b> condition changed cycle ends.
* Actions performed by step:
* <ul>
* <li>1. Searches for elements using locator</li>
* <li>2. Checks that elements quantity matches comparison rule and elements number</li>
* <li>3. Performs steps</li>
* <li>4. Restores previously set context</li>
* <li>Repeat 1-4 until iteration limit reached or elements quantity changed</li>
* </ul>
* To avoid infinite loops used iterationLimit. If iteration's limit reached step will fail.
* <br> Usage example:
* <code>
* <br>When I find = 1 elements By.xpath(.//*[contains(@class,'fancybox-wrap')]) and while elements
* number persists do up to 5 iteration of
* <br>|step|
* <br>|When I compare against baseline with name 'test_composit1_step'|
* <br>|When I click on all elements by xpath './/a[@title='Close']'|
* </code>
* @param comparisonRule use to check elements quantity
* @param number of elements to find
* @param locator to search for elements
* @param iterationLimit max iterations to perform
* @param stepsToExecute examples table with steps to execute for each found elements
*/
@When(value = "I find $comparisonRule `$number` elements `$locator` and while they exist do up "
+ "to $iterationLimit iteration of$stepsToExecute", priority = 5)
@Alias("I find $comparisonRule '$number' elements $locator and while they exist do up "
+ "to $iterationLimit iteration of$stepsToExecute")
public void performAllStepsWhileElementsExist(ComparisonRule comparisonRule, int number, SearchAttributes locator,
int iterationLimit, SubSteps stepsToExecute)
{
int iterationsCounter = iterationLimit;
Matcher<Integer> elementNumberMatcher = comparisonRule.getComparisonRule(number);
MutableBoolean firstIteration = new MutableBoolean(true);
while (iterationsCounter > 0 && isExpectedElementsQuantity(locator, elementNumberMatcher, firstIteration))
{
runStepsWithContextReset(() -> stepsToExecute.execute(Optional.empty()));
iterationsCounter--;
}
if (iterationsCounter == 0)
{
softAssert.recordFailedAssertion(
String.format("Elements number %s was not changed after %d iteration(s)",
elementNumberMatcher.toString(), iterationLimit));
}
}
@Test
public void testPeriodicWatermark() {
final MutableLong clock = new MutableLong();
final MutableBoolean isTemporaryIdle = new MutableBoolean();
final List<Watermark> watermarks = new ArrayList<>();
String fakeStream1 = "fakeStream1";
StreamShardHandle shardHandle =
new StreamShardHandle(
fakeStream1,
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)));
TestSourceContext<String> sourceContext =
new TestSourceContext<String>() {
@Override
public void emitWatermark(Watermark mark) {
watermarks.add(mark);
}
@Override
public void markAsTemporarilyIdle() {
isTemporaryIdle.setTrue();
}
};
HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest = new HashMap<>();
final KinesisDataFetcher<String> fetcher =
new TestableKinesisDataFetcher<String>(
Collections.singletonList(fakeStream1),
sourceContext,
new java.util.Properties(),
new KinesisDeserializationSchemaWrapper<>(new org.apache.flink.streaming.util.serialization.SimpleStringSchema()),
1,
1,
new AtomicReference<>(),
new LinkedList<>(),
subscribedStreamsToLastSeenShardIdsUnderTest,
FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(new HashMap<>())) {
@Override
protected long getCurrentTimeMillis() {
return clock.getValue();
}
};
Whitebox.setInternalState(fetcher, "periodicWatermarkAssigner", watermarkAssigner);
SequenceNumber seq = new SequenceNumber("fakeSequenceNumber");
// register shards to subsequently emit records
int shardIndex =
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(
KinesisDataFetcher.convertToStreamShardMetadata(shardHandle), shardHandle, seq));
StreamRecord<String> record1 =
new StreamRecord<>(String.valueOf(Long.MIN_VALUE), Long.MIN_VALUE);
fetcher.emitRecordAndUpdateState(record1.getValue(), record1.getTimestamp(), shardIndex, seq);
Assert.assertEquals(record1, sourceContext.getCollectedOutputs().poll());
fetcher.emitWatermark();
Assert.assertTrue("potential watermark equals previous watermark", watermarks.isEmpty());
StreamRecord<String> record2 = new StreamRecord<>(String.valueOf(1), 1);
fetcher.emitRecordAndUpdateState(record2.getValue(), record2.getTimestamp(), shardIndex, seq);
Assert.assertEquals(record2, sourceContext.getCollectedOutputs().poll());
fetcher.emitWatermark();
Assert.assertFalse("watermark advanced", watermarks.isEmpty());
Assert.assertEquals(new Watermark(record2.getTimestamp()), watermarks.remove(0));
Assert.assertFalse("not idle", isTemporaryIdle.booleanValue());
// test idle timeout
long idleTimeout = 10;
// advance clock idleTimeout
clock.add(idleTimeout + 1);
fetcher.emitWatermark();
Assert.assertFalse("not idle", isTemporaryIdle.booleanValue());
Assert.assertTrue("not idle, no new watermark", watermarks.isEmpty());
// activate idle timeout
Whitebox.setInternalState(fetcher, "shardIdleIntervalMillis", idleTimeout);
fetcher.emitWatermark();
Assert.assertTrue("idle", isTemporaryIdle.booleanValue());
Assert.assertTrue("idle, no watermark", watermarks.isEmpty());
}
@Test
public void testPeriodicWatermark() {
final MutableLong clock = new MutableLong();
final MutableBoolean isTemporaryIdle = new MutableBoolean();
final List<Watermark> watermarks = new ArrayList<>();
String fakeStream1 = "fakeStream1";
StreamShardHandle shardHandle =
new StreamShardHandle(
fakeStream1,
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)));
TestSourceContext<String> sourceContext =
new TestSourceContext<String>() {
@Override
public void emitWatermark(Watermark mark) {
watermarks.add(mark);
}
@Override
public void markAsTemporarilyIdle() {
isTemporaryIdle.setTrue();
}
};
HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest = new HashMap<>();
final KinesisDataFetcher<String> fetcher =
new TestableKinesisDataFetcher<String>(
Collections.singletonList(fakeStream1),
sourceContext,
new java.util.Properties(),
new KinesisDeserializationSchemaWrapper<>(new org.apache.flink.streaming.util.serialization.SimpleStringSchema()),
1,
1,
new AtomicReference<>(),
new LinkedList<>(),
subscribedStreamsToLastSeenShardIdsUnderTest,
FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(new HashMap<>())) {
@Override
protected long getCurrentTimeMillis() {
return clock.getValue();
}
};
Whitebox.setInternalState(fetcher, "periodicWatermarkAssigner", watermarkAssigner);
SequenceNumber seq = new SequenceNumber("fakeSequenceNumber");
// register shards to subsequently emit records
int shardIndex =
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(
KinesisDataFetcher.convertToStreamShardMetadata(shardHandle), shardHandle, seq));
StreamRecord<String> record1 =
new StreamRecord<>(String.valueOf(Long.MIN_VALUE), Long.MIN_VALUE);
fetcher.emitRecordAndUpdateState(record1.getValue(), record1.getTimestamp(), shardIndex, seq);
Assert.assertEquals(record1, sourceContext.getCollectedOutputs().poll());
fetcher.emitWatermark();
Assert.assertTrue("potential watermark equals previous watermark", watermarks.isEmpty());
StreamRecord<String> record2 = new StreamRecord<>(String.valueOf(1), 1);
fetcher.emitRecordAndUpdateState(record2.getValue(), record2.getTimestamp(), shardIndex, seq);
Assert.assertEquals(record2, sourceContext.getCollectedOutputs().poll());
fetcher.emitWatermark();
Assert.assertFalse("watermark advanced", watermarks.isEmpty());
Assert.assertEquals(new Watermark(record2.getTimestamp()), watermarks.remove(0));
Assert.assertFalse("not idle", isTemporaryIdle.booleanValue());
// test idle timeout
long idleTimeout = 10;
// advance clock idleTimeout
clock.add(idleTimeout + 1);
fetcher.emitWatermark();
Assert.assertFalse("not idle", isTemporaryIdle.booleanValue());
Assert.assertTrue("not idle, no new watermark", watermarks.isEmpty());
// activate idle timeout
Whitebox.setInternalState(fetcher, "shardIdleIntervalMillis", idleTimeout);
fetcher.emitWatermark();
Assert.assertTrue("idle", isTemporaryIdle.booleanValue());
Assert.assertTrue("idle, no watermark", watermarks.isEmpty());
}
/**
* Evaluate the SCAP results report and store the results in the db.
* @param server the server
* @param action the action
* @param returnCode openscap return code
* @param errors openscap errors output
* @param resumeXml the SCAP report in intermediary XML format
* @return the {@link XccdfTestResult} that was saved in the db
*/
public static XccdfTestResult xccdfEvalResume(Server server, ScapAction action,
int returnCode, String errors,
InputStream resumeXml) {
ScapFactory.clearTestResult(server.getId(), action.getId());
try {
BenchmarkResume resume = createXmlPersister()
.read(BenchmarkResume.class, resumeXml);
Profile profile = Optional.ofNullable(resume.getProfile()).orElse(
new Profile("None",
"No profile selected. Using defaults.",
""));
TestResult testResults = resume.getTestResult();
if (testResults == null) {
log.error("Scap report misses profile or testresult element");
throw new RuntimeException(
"Scap report misses profile or testresult element");
}
MutableBoolean truncated = new MutableBoolean();
XccdfTestResult result = new XccdfTestResult();
result.setServer(server);
result.setIdentifier(testResults.getId());
result.setScapActionDetails(action.getScapActionDetails());
XccdfBenchmark xccdfBenchmark =
getOrCreateBenchmark(resume, truncated);
result.setBenchmark(xccdfBenchmark);
XccdfProfile xccdfProfile =
getOrCreateProfile(profile, truncated);
result.setProfile(xccdfProfile);
result.setStartTime(testResults.getStartTime());
result.setEndTime(testResults.getEndTime());
processRuleResult(result, testResults.getPass(), "pass", truncated);
processRuleResult(result, testResults.getFail(), "fail", truncated);
processRuleResult(result, testResults.getError(), "error", truncated);
processRuleResult(result, testResults.getUnknown(), "unknown", truncated);
processRuleResult(result, testResults.getNotapplicable(),
"notapplicable", truncated);
processRuleResult(result, testResults.getNotchecked(),
"notchecked", truncated);
processRuleResult(result, testResults.getNotselected(),
"notselected", truncated);
processRuleResult(result, testResults.getInformational(),
"informational", truncated);
processRuleResult(result, testResults.getFixed(), "fixed", truncated);
String errs = errors;
if (returnCode != 0) {
errs += String.format("xccdf_eval: oscap tool returned %d\n", returnCode);
}
if (truncated.isTrue()) {
errs = errors +
"\nSome text strings were truncated when saving to the database.";
}
result.setErrors(HibernateFactory.stringToByteArray(errs));
ScapFactory.save(result);
return result;
}
catch (Exception e) {
log.error("Scap xccdf eval failed", e);
throw new RuntimeException("Scap xccdf eval failed", e);
}
}
/**
* Attempt to shed some bundles off every broker which is overloaded.
*
* @param loadData
* The load data to used to make the unloading decision.
* @param conf
* The service configuration.
* @return A map from bundles to unload to the brokers on which they are loaded.
*/
@Override
public Multimap<String, String> findBundlesForUnloading(final LoadData loadData, final ServiceConfiguration conf) {
selectedBundlesCache.clear();
final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
final Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles();
// Check every broker and select
loadData.getBrokerData().forEach((broker, brokerData) -> {
final LocalBrokerData localData = brokerData.getLocalData();
final double currentUsage = localData.getMaxResourceUsage();
if (currentUsage < overloadThreshold) {
if (log.isDebugEnabled()) {
log.debug("[{}] Broker is not overloaded, ignoring at this point ({})", broker,
localData.printResourceUsage());
}
return;
}
// We want to offload enough traffic such that this broker will go below the overload threshold
// Also, add a small margin so that this broker won't be very close to the threshold edge.
double percentOfTrafficToOffload = currentUsage - overloadThreshold + ADDITIONAL_THRESHOLD_PERCENT_MARGIN;
double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
double minimumThroughputToOffload = brokerCurrentThroughput * percentOfTrafficToOffload;
log.info(
"Attempting to shed load on {}, which has resource usage {}% above threshold {}% -- Offloading at least {} MByte/s of traffic ({})",
broker, 100 * currentUsage, 100 * overloadThreshold, minimumThroughputToOffload / 1024 / 1024,
localData.printResourceUsage());
MutableDouble trafficMarkedToOffload = new MutableDouble(0);
MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false);
if (localData.getBundles().size() > 1) {
// Sort bundles by throughput, then pick the biggest N which combined make up for at least the minimum throughput to offload
loadData.getBundleData().entrySet().stream()
.filter(e -> localData.getBundles().contains(e.getKey()))
.map((e) -> {
// Map to throughput value
// Consider short-term byte rate to address system resource burden
String bundle = e.getKey();
BundleData bundleData = e.getValue();
TimeAverageMessageData shortTermData = bundleData.getShortTermData();
double throughput = shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut();
return Pair.of(bundle, throughput);
}).filter(e -> {
// Only consider bundles that were not already unloaded recently
return !recentlyUnloadedBundles.containsKey(e.getLeft());
}).filter(e ->
localData.getBundles().contains(e.getLeft())
).sorted((e1, e2) -> {
// Sort by throughput in reverse order
return Double.compare(e2.getRight(), e1.getRight());
}).forEach(e -> {
if (trafficMarkedToOffload.doubleValue() < minimumThroughputToOffload
|| atLeastOneBundleSelected.isFalse()) {
selectedBundlesCache.put(broker, e.getLeft());
trafficMarkedToOffload.add(e.getRight());
atLeastOneBundleSelected.setTrue();
}
});
} else if (localData.getBundles().size() == 1) {
log.warn(
"HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. "
+ "No Load Shedding will be done on this broker",
localData.getBundles().iterator().next(), broker);
} else {
log.warn("Broker {} is overloaded despite having no bundles", broker);
}
});
return selectedBundlesCache;
}
@Override
public Multimap<String, String> findBundlesForUnloading(final LoadData loadData, final ServiceConfiguration conf) {
selectedBundlesCache.clear();
final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
final Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles();
final double minThroughputThreshold = conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
final double avgUsage = getBrokerAvgUsage(loadData, conf.getLoadBalancerHistoryResourcePercentage(), conf);
if (avgUsage == 0) {
log.warn("average max resource usage is 0");
return selectedBundlesCache;
}
loadData.getBrokerData().forEach((broker, brokerData) -> {
final LocalBrokerData localData = brokerData.getLocalData();
final double currentUsage = brokerAvgResourceUsage.getOrDefault(broker, 0.0);
if (currentUsage < avgUsage + threshold) {
if (log.isDebugEnabled()) {
log.debug("[{}] broker is not overloaded, ignoring at this point", broker);
}
return;
}
double percentOfTrafficToOffload = currentUsage - avgUsage - threshold + ADDITIONAL_THRESHOLD_PERCENT_MARGIN;
double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
double minimumThroughputToOffload = brokerCurrentThroughput * percentOfTrafficToOffload;
if (minimumThroughputToOffload < minThroughputThreshold) {
if (log.isDebugEnabled()) {
log.info("[{}] broker is planning to shed throughput {} MByte/s less than " +
"minimumThroughputThreshold {} MByte/s, skipping bundle unload.",
broker, minimumThroughputToOffload / MB, minThroughputThreshold / MB);
}
return;
}
log.info(
"Attempting to shed load on {}, which has max resource usage above avgUsage and threshold {}%" +
" > {}% + {}% -- Offloading at least {} MByte/s of traffic, left throughput {} MByte/s",
broker, currentUsage, avgUsage, threshold, minimumThroughputToOffload / MB,
(brokerCurrentThroughput - minimumThroughputToOffload) / MB);
MutableDouble trafficMarkedToOffload = new MutableDouble(0);
MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false);
if (localData.getBundles().size() > 1) {
loadData.getBundleData().entrySet().stream().map((e) -> {
String bundle = e.getKey();
BundleData bundleData = e.getValue();
TimeAverageMessageData shortTermData = bundleData.getShortTermData();
double throughput = shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut();
return Pair.of(bundle, throughput);
}).filter(e ->
!recentlyUnloadedBundles.containsKey(e.getLeft())
).filter(e ->
localData.getBundles().contains(e.getLeft())
).sorted((e1, e2) ->
Double.compare(e2.getRight(), e1.getRight())
).forEach(e -> {
if (trafficMarkedToOffload.doubleValue() < minimumThroughputToOffload
|| atLeastOneBundleSelected.isFalse()) {
selectedBundlesCache.put(broker, e.getLeft());
trafficMarkedToOffload.add(e.getRight());
atLeastOneBundleSelected.setTrue();
}
});
} else if (localData.getBundles().size() == 1) {
log.warn(
"HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. " +
"No Load Shadding will be done on this broker",
localData.getBundles().iterator().next(), broker);
} else {
log.warn("Broker {} is overloaded despit having no bundles", broker);
}
});
return selectedBundlesCache;
}
@Override
public boolean removeBundle(QueryBundle<QUERY_TYPE, META_QUERY, MutableBoolean> queryQueueable)
{
return queryQueueable.getQueueContext().booleanValue();
}