下面列出了怎么用com.google.common.util.concurrent.Runnables的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public CompletableFuture<Void> remove(@NonNull String segmentName, @NonNull Collection<TableKey> keys, Duration timeout) {
Exceptions.checkNotClosed(this.closed.get(), this);
TimeoutTimer timer = new TimeoutTimer(timeout);
// Generate an Update Batch for all the keys (since we need to know their Key Hashes and relative offsets in
// the batch itself).
val removeBatch = batch(keys, key -> key, this.serializer::getRemovalLength, TableKeyBatch.removal());
logRequest("remove", segmentName, removeBatch.isConditional(), removeBatch.isRemoval(), keys.size(), removeBatch.getLength());
return this.segmentContainer
.forSegment(segmentName, timer.getRemaining())
.thenComposeAsync(segment -> this.keyIndex.update(segment, removeBatch,
() -> commit(keys, this.serializer::serializeRemoval, segment, timer.getRemaining()), timer),
this.executor)
.thenRun(Runnables.doNothing());
}
/**
* Tests the ability of the MemoryStateUpdater to delegate Enter/Exit recovery mode to the read index.
*/
@Test
public void testRecoveryMode() throws Exception {
// Check it's properly delegated to Read index.
SequencedItemList<Operation> opLog = new SequencedItemList<>();
ArrayList<TestReadIndex.MethodInvocation> methodInvocations = new ArrayList<>();
TestReadIndex readIndex = new TestReadIndex(methodInvocations::add);
MemoryStateUpdater updater = new MemoryStateUpdater(opLog, readIndex, Runnables.doNothing());
UpdateableContainerMetadata metadata1 = new MetadataBuilder(1).build();
updater.enterRecoveryMode(metadata1);
updater.exitRecoveryMode(true);
Assert.assertEquals("Unexpected number of method invocations.", 2, methodInvocations.size());
TestReadIndex.MethodInvocation enterRecovery = methodInvocations.get(0);
Assert.assertEquals("ReadIndex.enterRecoveryMode was not called when expected.", TestReadIndex.ENTER_RECOVERY_MODE, enterRecovery.methodName);
Assert.assertEquals("ReadIndex.enterRecoveryMode was called with the wrong arguments.", metadata1, enterRecovery.args.get("recoveryMetadataSource"));
TestReadIndex.MethodInvocation exitRecovery = methodInvocations.get(1);
Assert.assertEquals("ReadIndex.exitRecoveryMode was not called when expected.", TestReadIndex.EXIT_RECOVERY_MODE, exitRecovery.methodName);
Assert.assertEquals("ReadIndex.exitRecoveryMode was called with the wrong arguments.", true, exitRecovery.args.get("successfulRecovery"));
}
@Test
void messageOnValidIsInformation() {
givenValidationResults(true, true);
final SaveFunction.SaveResult result =
SaveFunction.saveSettings(
List.of(mockSelectionComponent, mockSelectionComponent2), Runnables.doNothing());
assertThat(
"There will always be a message back to the user",
result.message,
is(not(emptyString())));
assertThat(
"All valid, message type should informational",
result.dialogType,
is(JOptionPane.INFORMATION_MESSAGE));
}
private void waitForTask(final String taskId) {
boolean success = Repeater.create()
.repeat(Runnables.doNothing())
.until(new Callable<Boolean>() {
@Override public Boolean call() {
Response response = client().path("/activities/"+taskId).get();
if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()) {
return true;
}
TaskSummary summary = response.readEntity(TaskSummary.class);
return summary != null && summary.getEndTimeUtc() != null;
}})
.every(10L, TimeUnit.MILLISECONDS)
.limitTimeTo(TIMEOUT_MS, TimeUnit.MILLISECONDS)
.run();
assertTrue(success, "task "+taskId+" not finished");
}
private void confirmKickPlayer(final String kickPlayerName)
{
chatboxPanelManager.openTextMenuInput("Attempting to kick: " + kickPlayerName)
.option("1. Confirm kick", () ->
clientThread.invoke(() ->
{
kickConfirmed = true;
client.runScript(ScriptID.FRIENDS_CHAT_SEND_KICK, kickPlayerName);
kickConfirmed = false;
})
)
.option("2. Cancel", Runnables::doNothing)
.build();
}
public boolean mouseClicked(double double_1, double double_2, int int_1) {
if (super.mouseClicked(double_1, double_2, int_1)) {
return true;
} else if (this.warning != null && this.warning.onClick(double_1, double_2)) {
return true;
} else {
if (double_1 > (double) this.copyrightTextX && double_1 < (double) (this.copyrightTextX + this.copyrightTextWidth) && double_2 > (double) (this.height - 10) && double_2 < (double) this.height) {
this.minecraft.openScreen(new CreditsScreen(false, Runnables.doNothing()));
}
return false;
}
}
public void savePlaylist(Command command, TextChannel channel, String ownerId, boolean overwriteAllowed, String name, List<String> songs) {
CassandraController.runTask(session -> {
if (savePlaylistStatement == null)
savePlaylistStatement = session.prepare("SELECT * FROM flarebot.playlist " +
"WHERE playlist_name = ? AND guild_id = ?");
ResultSet set =
session.execute(savePlaylistStatement.bind().setString(0, name).setString(1, channel.getGuild().getId()));
if (set.one() != null) {
if (ConfirmUtil.checkExists(ownerId, command.getClass())) {
MessageUtils.sendWarningMessage("Overwriting playlist!", channel);
} else if (!overwriteAllowed) {
MessageUtils.sendErrorMessage("That name is already taken! You need the `flarebot.queue.save.overwrite` permission to overwrite", channel);
return;
} else {
MessageUtils.sendErrorMessage("That name is already taken! Do this again within 1 minute to overwrite!", channel);
ConfirmUtil.pushAction(ownerId, new RunnableWrapper(Runnables.doNothing(), command.getClass()));
return;
}
}
if (insertPlaylistStatement == null)
insertPlaylistStatement = session.prepare("INSERT INTO flarebot.playlist" +
" (playlist_name, guild_id, owner, songs, scope, times_played) VALUES (?, ?, ?, ?, ?, ?)");
session.execute(insertPlaylistStatement.bind().setString(0, name).setString(1, channel.getGuild().getId())
.setString(2, ownerId).setList(3, songs).setString(4, "local").setInt(5, 0));
channel.sendMessage(MessageUtils.getEmbed(Getters.getUserById(ownerId))
.setDescription("Successfully saved the playlist: " + MessageUtils.escapeMarkdown(name)).build()).queue();
});
}
public LambdaStreamObserver(Consumer<T> onNext, Consumer<Throwable> onError) {
this(
onNext,
onError,
Runnables.doNothing()
);
}
public LambdaStreamObserver(Consumer<T> onNext) {
this(
onNext,
t -> {
throw new OnErrorNotImplementedException(t);
},
Runnables.doNothing()
);
}
@Test
public void testShutdown() throws InterruptedException {
DynamicScheduledThreadConfig dynamicScheduledThreadConfig = new DynamicScheduledThreadConfig(ArtemisClientConstants.Properties,
new RangePropertyConfig<Integer>(20, 0, 200), new RangePropertyConfig<Integer>(500, 500, 5 * 1000));
DynamicScheduledThread t = new DynamicScheduledThread("client", Runnables.doNothing(), dynamicScheduledThreadConfig);
t.setDaemon(true);
t.start();
t.shutdown();
Thread.sleep(500);
Assert.assertFalse(t.isAlive());
}
public void reloadProvider(String providerKey, boolean startSpinners) {
SensorDiscoverer discoverer = discoverers.get(providerKey);
if (discoverer == null) {
throw new IllegalArgumentException("Couldn't find " + providerKey + " in " + discoverers);
}
startScanning(
providerKey,
discoverer,
new TaskPool(Runnables.doNothing()),
new HashSet<String>(),
startSpinners);
}
private void confirmKickPlayer(final String kickPlayerName)
{
chatboxPanelManager.openTextMenuInput("Attempting to kick: " + kickPlayerName)
.option("1. Confirm kick", () ->
clientThread.invoke(() ->
{
kickConfirmed = true;
client.runScript(ScriptID.FRIENDS_CHAT_SEND_KICK, kickPlayerName);
kickConfirmed = false;
})
)
.option("2. Cancel", Runnables::doNothing)
.build();
}
private MetadataCheckpointPolicy getNoOpCheckpointPolicy() {
// Turn off any MetadataCheckpointing. In these tests, we are doing that manually.
DurableLogConfig dlConfig = DurableLogConfig
.builder()
.with(DurableLogConfig.CHECKPOINT_COMMIT_COUNT, Integer.MAX_VALUE)
.with(DurableLogConfig.CHECKPOINT_TOTAL_COMMIT_LENGTH, Long.MAX_VALUE)
.build();
return new MetadataCheckpointPolicy(dlConfig, Runnables.doNothing(), executorService());
}
TestContext() {
this.storage = InMemoryStorageFactory.newStorage(executorService());
this.storage.initialize(1);
this.metadata = new MetadataBuilder(CONTAINER_ID).build();
ReadIndexConfig readIndexConfig = ReadIndexConfig.builder().with(ReadIndexConfig.STORAGE_READ_ALIGNMENT, 1024).build();
this.cacheStorage = new DirectMemoryCache(Integer.MAX_VALUE);
this.cacheManager = new CacheManager(CachePolicy.INFINITE, this.cacheStorage, executorService());
this.readIndex = new ContainerReadIndex(readIndexConfig, this.metadata, this.storage, this.cacheManager, executorService());
this.memoryLog = new SequencedItemList<>();
this.stateUpdater = new MemoryStateUpdater(this.memoryLog, this.readIndex, Runnables.doNothing());
}
private CompletableFuture<Void> waitForSegmentInStorage(SegmentProperties metadataProps, TestContext context) {
if (metadataProps.getLength() == 0) {
// Empty segments may or may not exist in Storage, so don't bother complicating ourselves with this.
return CompletableFuture.completedFuture(null);
}
// Check if the Storage Segment is caught up. If sealed, we want to make sure that both the Segment and its
// Attribute Segment are sealed (or the latter has been deleted - for transactions). For all other, we want to
// ensure that the length and truncation offsets have caught up.
BiFunction<SegmentProperties, SegmentProperties, Boolean> meetsConditions = (segmentProps, attrProps) ->
metadataProps.isSealed() == (segmentProps.isSealed() && (attrProps.isSealed() || attrProps.isDeleted()))
&& segmentProps.getLength() >= metadataProps.getLength()
&& context.storageFactory.truncationOffsets.getOrDefault(metadataProps.getName(), 0L) >= metadataProps.getStartOffset();
String attributeSegmentName = NameUtils.getAttributeSegmentName(metadataProps.getName());
AtomicBoolean canContinue = new AtomicBoolean(true);
TimeoutTimer timer = new TimeoutTimer(TIMEOUT);
return Futures.loop(
canContinue::get,
() -> {
val segInfo = getStorageSegmentInfo(metadataProps.getName(), timer, context);
val attrInfo = getStorageSegmentInfo(attributeSegmentName, timer, context);
return CompletableFuture.allOf(segInfo, attrInfo)
.thenCompose(v -> {
if (meetsConditions.apply(segInfo.join(), attrInfo.join())) {
canContinue.set(false);
return CompletableFuture.completedFuture(null);
} else if (!timer.hasRemaining()) {
return Futures.failedFuture(new TimeoutException());
} else {
return Futures.delayedFuture(Duration.ofMillis(10), executorService());
}
}).thenRun(Runnables.doNothing());
},
executorService());
}
@Test
void title() {
final String value = "testing title";
final JButton button =
new JButtonBuilder().title(value).actionListener(Runnables.doNothing()).build();
assertThat(button.getText(), is(value));
}
@Test
void messageOnNotValidResultIsWarning() {
givenValidationResults(false, false);
final SaveFunction.SaveResult result =
SaveFunction.saveSettings(
List.of(mockSelectionComponent, mockSelectionComponent2), Runnables.doNothing());
assertThat(result.message, is(not(emptyString())));
assertThat(result.dialogType, is(JOptionPane.WARNING_MESSAGE));
}
@Test
void messageOnMixedResultIsWarning() {
givenValidationResults(true, false);
final SaveFunction.SaveResult result =
SaveFunction.saveSettings(
List.of(mockSelectionComponent, mockSelectionComponent2), Runnables.doNothing());
assertThat(result.message, is(not(emptyString())));
assertThat(
"At least one value was not updated, should be warning message type",
result.dialogType,
is(JOptionPane.WARNING_MESSAGE));
}
@Test
void shouldWaitUntilThreadIsDead() {
final Thread thread = new Thread(Runnables.doNothing());
thread.start();
assertTimeoutPreemptively(
Duration.ofSeconds(5L), () -> assertThat(Interruptibles.join(thread), is(true)));
}
@Override
public void eval(InputStreamReader inputStreamReader, Consumer<Exception> errorCallback) {
try {
String script = IOUtils.toString(inputStreamReader);
eval(script, Runnables.doNothing(), errorCallback);
} catch (IOException ioe) {
handleError(errorCallback, ioe);
}
}
@VisibleForTesting
// Integration test for this in BrooklynNodeIntegrationTest in this project doesn't use this method,
// but a Unit test for this does, in DeployBlueprintTest -- but in the REST server project (since it runs against local)
public String submitPlan(final String plan) {
final MutableMap<String, String> headers = MutableMap.of(com.google.common.net.HttpHeaders.CONTENT_TYPE, "application/yaml");
final AtomicReference<byte[]> response = new AtomicReference<byte[]>();
Repeater.create()
.every(Duration.ONE_SECOND)
.backoffTo(Duration.FIVE_SECONDS)
.limitTimeTo(Duration.minutes(5))
.repeat(Runnables.doNothing())
.rethrowExceptionImmediately()
.until(new Callable<Boolean>() {
@Override
public Boolean call() {
HttpToolResponse result = ((BrooklynNode)entity()).http()
//will throw on non-{2xx, 403} response
.responseSuccess(Predicates.<Integer>or(ResponseCodePredicates.success(), Predicates.equalTo(HttpStatus.SC_FORBIDDEN)))
.post("/v1/applications", headers, plan.getBytes());
if (result.getResponseCode() == HttpStatus.SC_FORBIDDEN) {
log.debug("Remote is not ready to accept requests, response is " + result.getResponseCode());
return false;
} else {
byte[] content = result.getContent();
response.set(content);
return true;
}
}
})
.runRequiringTrue();
return (String)new Gson().fromJson(new String(response.get()), Map.class).get("entityId");
}
public static CreationResult<List<Entity>,List<String>> addChildrenStarting(final Entity parent, String yaml) {
final List<Entity> children = addChildrenUnstarted(parent, yaml);
String childrenCountString;
int size = children.size();
childrenCountString = size+" "+(size!=1 ? "children" : "child");
TaskBuilder<List<String>> taskM = Tasks.<List<String>>builder().displayName("add children")
.dynamic(true)
.tag(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG)
.body(new Callable<List<String>>() {
@Override public List<String> call() throws Exception {
return ImmutableList.copyOf(Iterables.transform(children, EntityFunctions.id()));
}})
.description("Add and start "+childrenCountString);
TaskBuilder<?> taskS = Tasks.builder().parallel(true).displayName("add (parallel)").description("Start each new entity");
// autostart if requested
for (Entity child: children) {
if (child instanceof Startable) {
taskS.add(Effectors.invocation(child, Startable.START, ImmutableMap.of("locations", ImmutableList.of())));
} else {
// include a task, just to give feedback in the GUI
taskS.add(Tasks.builder().displayName("create").description("Skipping start (not a Startable Entity)")
.body(Runnables.doNothing())
.tag(BrooklynTaskTags.tagForTargetEntity(child))
.build());
}
}
taskM.add(taskS.build());
Task<List<String>> task = Entities.submit(parent, taskM.build());
return CreationResult.of(children, task);
}
@Test
public void testReportsActionExecutedEvent() throws Exception {
Artifact pear = createDerivedArtifact("pear");
ActionEventRecorder recorder = new ActionEventRecorder();
eventBus.register(recorder);
Action action =
registerAction(
new TestAction(Runnables.doNothing(), emptyNestedSet, ImmutableSet.of(pear)));
buildArtifacts(createBuilder(DEFAULT_NUM_JOBS, true), pear);
assertThat(recorder.actionExecutedEvents).hasSize(1);
assertThat(recorder.actionExecutedEvents.get(0).getAction()).isEqualTo(action);
}
@Test
public void testNamedThread() {
String name = "test";
Runnable runnable = Runnables.doNothing();
Thread thread = Threads.namedThread(name, runnable);
assertNotNull(thread);
assertFalse(thread.isDaemon());
assertEquals(State.NEW, thread.getState());
assertEquals(name, thread.getName());
}
private static JimfsInputStream newInputStream(int... bytes) throws IOException {
byte[] b = new byte[bytes.length];
for (int i = 0; i < bytes.length; i++) {
b[i] = (byte) bytes[i];
}
RegularFile file = regularFile(0);
file.write(0, b, 0, b.length);
return new JimfsInputStream(file, new FileSystemState(Runnables.doNothing()));
}
@Before
public void setUp() {
fs = (JimfsFileSystem) Jimfs.newFileSystem(Configuration.unix());
watcher =
new PollingWatchService(
fs.getDefaultView(),
fs.getPathService(),
new FileSystemState(Runnables.doNothing()),
4,
MILLISECONDS);
}
private void handleTagTab(ScriptEvent event)
{
switch (event.getOp())
{
case Tab.OPEN_TAG:
client.setVarbit(Varbits.CURRENT_BANK_TAB, 0);
Widget clicked = event.getSource();
TagTab tab = tabManager.find(Text.removeTags(clicked.getName()));
if (tab.equals(activeTab))
{
bankSearch.reset(true);
rememberedSearch = "";
clientThread.invokeLater(() -> client.runScript(ScriptID.MESSAGE_LAYER_CLOSE, 0, 0));
}
else
{
openTag(Text.removeTags(clicked.getName()));
}
client.playSoundEffect(SoundEffectID.UI_BOOP);
break;
case Tab.CHANGE_ICON:
final String tag = Text.removeTags(event.getOpbase());
searchProvider
.tooltipText(CHANGE_ICON + " (" + tag + ")")
.onItemSelected((itemId) ->
{
TagTab iconToSet = tabManager.find(tag);
if (iconToSet != null)
{
iconToSet.setIconItemId(itemId);
iconToSet.getIcon().setItemId(itemId);
iconToSet.getMenu().setItemId(itemId);
tabManager.setIcon(iconToSet.getTag(), itemId + "");
}
})
.build();
break;
case Tab.DELETE_TAB:
String target = Text.standardize(event.getOpbase());
chatboxPanelManager.openTextMenuInput("Delete " + target)
.option("1. Tab and tag from all items", () ->
clientThread.invoke(() ->
{
tagManager.removeTag(target);
deleteTab(target);
})
)
.option("2. Only tab", () -> clientThread.invoke(() -> deleteTab(target)))
.option("3. Cancel", Runnables::doNothing)
.build();
break;
case Tab.EXPORT_TAB:
final List<String> data = new ArrayList<>();
final TagTab tagTab = tabManager.find(Text.removeTags(event.getOpbase()));
data.add(tagTab.getTag());
data.add(String.valueOf(tagTab.getIconItemId()));
for (Integer item : tagManager.getItemsForTag(tagTab.getTag()))
{
data.add(String.valueOf(item));
}
final StringSelection stringSelection = new StringSelection(Text.toCSV(data));
Toolkit.getDefaultToolkit().getSystemClipboard().setContents(stringSelection, null);
notifier.notify("Tag tab " + tagTab.getTag() + " has been copied to your clipboard!");
break;
case Tab.RENAME_TAB:
String renameTarget = Text.standardize(event.getOpbase());
renameTab(renameTarget);
break;
}
}
private void handleTagTab(ScriptEvent event)
{
switch (event.getOp())
{
case Tab.OPEN_TAG:
client.setVarbit(Varbits.CURRENT_BANK_TAB, 0);
Widget clicked = event.getSource();
TagTab tab = tabManager.find(Text.removeTags(clicked.getName()));
if (tab.equals(activeTab))
{
bankSearch.reset(true);
rememberedSearch = "";
clientThread.invokeLater(() -> client.runScript(ScriptID.MESSAGE_LAYER_CLOSE, 0, 0));
}
else
{
openTag(Text.removeTags(clicked.getName()));
}
client.playSoundEffect(SoundEffectID.UI_BOOP);
break;
case Tab.CHANGE_ICON:
final String tag = Text.removeTags(event.getOpbase());
searchProvider
.tooltipText(CHANGE_ICON + " (" + tag + ")")
.onItemSelected((itemId) ->
{
TagTab iconToSet = tabManager.find(tag);
if (iconToSet != null)
{
iconToSet.setIconItemId(itemId);
iconToSet.getIcon().setItemId(itemId);
iconToSet.getMenu().setItemId(itemId);
tabManager.setIcon(iconToSet.getTag(), itemId + "");
}
})
.build();
break;
case Tab.DELETE_TAB:
String target = Text.standardize(event.getOpbase());
chatboxPanelManager.openTextMenuInput("Delete " + target)
.option("1. Tab and tag from all items", () ->
clientThread.invoke(() ->
{
tagManager.removeTag(target);
deleteTab(target);
})
)
.option("2. Only tab", () -> clientThread.invoke(() -> deleteTab(target)))
.option("3. Cancel", Runnables::doNothing)
.build();
break;
case Tab.EXPORT_TAB:
final List<String> data = new ArrayList<>();
final TagTab tagTab = tabManager.find(Text.removeTags(event.getOpbase()));
data.add(tagTab.getTag());
data.add(String.valueOf(tagTab.getIconItemId()));
for (Integer item : tagManager.getItemsForTag(tagTab.getTag()))
{
data.add(String.valueOf(item));
}
final StringSelection stringSelection = new StringSelection(Text.toCSV(data));
Toolkit.getDefaultToolkit().getSystemClipboard().setContents(stringSelection, null);
notifier.notify("Tag tab " + tagTab.getTag() + " has been copied to your clipboard!");
break;
case Tab.RENAME_TAB:
String renameTarget = Text.standardize(event.getOpbase());
renameTab(renameTarget);
break;
}
}
@Override
protected CompletableFuture<Void> updateSegmentInfo(String segmentName, ArrayView segmentInfo, Duration timeout) {
ensureInitialized();
TableEntry entry = TableEntry.unversioned(getTableKey(segmentName), segmentInfo);
return this.tableStore.put(this.metadataSegmentName, Collections.singletonList(entry), timeout).thenRun(Runnables.doNothing());
}
/**
* Tests the ability of the OperationProcessor to process Operations when a simulated DataCorruptionException
* is generated.
*/
@Test
public void testWithDataCorruptionFailures() throws Exception {
// If a DataCorruptionException is thrown for a particular Operation, the OperationQueueProcessor should
// immediately shut down and stop accepting other ops.
int streamSegmentCount = 10;
int appendsPerStreamSegment = 80;
int failAtOperationIndex = 123; // Fail Operation at index X.
@Cleanup
TestContext context = new TestContext();
// Create a different state updater and Memory log - and use these throughout this test.
CorruptedMemoryOperationLog corruptedMemoryLog = new CorruptedMemoryOperationLog(failAtOperationIndex);
MemoryStateUpdater stateUpdater = new MemoryStateUpdater(corruptedMemoryLog, context.readIndex, Runnables.doNothing());
// Generate some test data (no need to complicate ourselves with Transactions here; that is tested in the no-failure test).
HashSet<Long> streamSegmentIds = createStreamSegmentsInMetadata(streamSegmentCount, context.metadata);
List<Operation> operations = generateOperations(streamSegmentIds, new HashMap<>(), appendsPerStreamSegment,
METADATA_CHECKPOINT_EVERY, false, false);
// Setup an OperationProcessor and start it.
@Cleanup
TestDurableDataLog dataLog = TestDurableDataLog.create(CONTAINER_ID, MAX_DATA_LOG_APPEND_SIZE, executorService());
dataLog.initialize(TIMEOUT);
@Cleanup
OperationProcessor operationProcessor = new OperationProcessor(context.metadata, stateUpdater,
dataLog, getNoOpCheckpointPolicy(), executorService());
operationProcessor.startAsync().awaitRunning();
// Process all generated operations.
List<OperationWithCompletion> completionFutures = processOperations(operations, operationProcessor);
// Wait for the store to fail (and make sure it failed).
AssertExtensions.assertThrows(
"Operation Processor did not shut down with failure.",
() -> ServiceListeners.awaitShutdown(operationProcessor, true),
ex -> ex instanceof IllegalStateException);
Assert.assertEquals("Unexpected service state after encountering DataCorruptionException.", Service.State.FAILED, operationProcessor.state());
// Verify that the "right" operations failed, while the others succeeded.
int successCount = 0;
boolean encounteredFirstFailure = false;
for (int i = 0; i < completionFutures.size(); i++) {
OperationWithCompletion oc = completionFutures.get(i);
// Once an operation failed (in our scenario), no other operation can succeed.
if (encounteredFirstFailure) {
Assert.assertTrue("Encountered successful operation after a failed operation: " + oc.operation, oc.completion.isCompletedExceptionally());
}
// The operation that failed may have inadvertently failed other operations that were aggregated together
// with it, which is why it's hard to determine precisely what the first expected failed operation is.
if (oc.completion.isCompletedExceptionally()) {
// If we do find a failed one in this area, make sure it is failed with DataCorruptionException.
AssertExtensions.assertThrows(
"Unexpected exception for failed Operation in the same DataFrame as intentionally failed operation.",
oc.completion::join,
super::isExpectedExceptionForDataCorruption);
encounteredFirstFailure = true;
} else {
successCount++;
}
}
AssertExtensions.assertGreaterThan("No operation succeeded.", 0, successCount);
performLogOperationChecks(completionFutures, corruptedMemoryLog, dataLog, context.metadata, failAtOperationIndex - 1);
// There is no point in performing metadata checks. A DataCorruptionException means the Metadata (and the general
// state of the Container) is in an undefined state.
}