下面列出了com.google.common.collect.MoreCollectors#build.bazel.remote.execution.v2.Platform 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@VisibleForTesting
public QueueEntry dispatchOperation(MatchListener listener)
throws IOException, InterruptedException {
while (!backplane.isStopped()) {
listener.onWaitStart();
try {
List<Platform.Property> provisions = new ArrayList<>();
QueueEntry queueEntry = backplane.dispatchOperation(provisions);
if (queueEntry != null) {
return queueEntry;
}
} catch (IOException e) {
Status status = Status.fromThrowable(e);
if (status.getCode() != Code.UNAVAILABLE && status.getCode() != Code.DEADLINE_EXCEEDED) {
throw e;
}
}
listener.onWaitEnd();
}
throw new IOException(Status.UNAVAILABLE.withDescription("backplane is stopped").asException());
}
@Test
public void requeueRemovesRequeuers() throws InterruptedException {
Digest actionDigest = createAction(Action.newBuilder());
instance.execute(
actionDigest,
/* skipCacheLookup=*/ true,
ExecutionPolicy.getDefaultInstance(),
ResultsCachePolicy.getDefaultInstance(),
RequestMetadata.getDefaultInstance(),
(operation) -> {});
MatchListener listener = mock(MatchListener.class);
when(listener.onEntry(any(QueueEntry.class))).thenReturn(true);
instance.match(Platform.getDefaultInstance(), listener);
ArgumentCaptor<QueueEntry> queueEntryCaptor = ArgumentCaptor.forClass(QueueEntry.class);
verify(listener, times(1)).onEntry(queueEntryCaptor.capture());
QueueEntry queueEntry = queueEntryCaptor.getValue();
assertThat(queueEntry).isNotNull();
String operationName = queueEntry.getExecuteEntry().getOperationName();
assertThat(requeuers).isNotEmpty();
Operation queuedOperation = outstandingOperations.get(operationName);
assertThat(instance.isQueued(queuedOperation)).isTrue();
instance.putOperation(queuedOperation); // requeue
assertThat(requeuers).isEmpty();
assertThat(outstandingOperations.get(operationName)).isEqualTo(queuedOperation);
}
WorkerContext createTestContext(Platform platform, Iterable<ExecutionPolicy> policies) {
return new ShardWorkerContext(
"test",
platform,
/* operationPollPeriod=*/ Duration.getDefaultInstance(),
/* operationPoller=*/ (queueEntry, stage, requeueAt) -> {
return false;
},
/* inlineContentLimit=*/ 0,
/* inputFetchStageWidth=*/ 0,
/* executeStageWidth=*/ 0,
backplane,
execFileSystem,
inputStreamFactory,
policies,
instance,
/* deadlineAfter=*/ 0,
/* deadlineAfterUnits=*/ SECONDS,
/* defaultActionTimeout=*/ Duration.getDefaultInstance(),
/* maximumActionTimeout=*/ Duration.getDefaultInstance(),
/* limitExecution=*/ false,
/* limitGlobalExecution=*/ false,
/* onlyMulticoreTests=*/ false);
}
@Test
public void queueEntryWithExecutionPolicyPlatformMatches() throws Exception {
WorkerContext context =
createTestContext(
Platform.getDefaultInstance(),
ImmutableList.of(ExecutionPolicy.newBuilder().setName("foo").build()));
Platform matchPlatform =
Platform.newBuilder()
.addProperties(
Property.newBuilder().setName("execution-policy").setValue("foo").build())
.build();
QueueEntry queueEntry = QueueEntry.newBuilder().setPlatform(matchPlatform).build();
when(backplane.dispatchOperation(any(List.class)))
.thenReturn(queueEntry)
.thenReturn(null); // provide a match completion in failure case
MatchListener listener = mock(MatchListener.class);
context.match(listener);
verify(listener, times(1)).onEntry(queueEntry);
}
private Optional<String> dockerContainerFromSpawn(Spawn spawn) throws ExecException {
Platform platform =
PlatformUtils.getPlatformProto(spawn, cmdEnv.getOptions().getOptions(RemoteOptions.class));
if (platform != null) {
try {
return platform
.getPropertiesList()
.stream()
.filter(p -> p.getName().equals(CONTAINER_IMAGE_ENTRY_NAME))
.map(p -> p.getValue())
.filter(r -> r.startsWith(DOCKER_IMAGE_PREFIX))
.map(r -> r.substring(DOCKER_IMAGE_PREFIX.length()))
.collect(MoreCollectors.toOptional());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
String.format(
"Platform %s contained multiple container-image entries, but only one is allowed.",
spawn.getExecutionPlatform().label()),
e);
}
} else {
return Optional.empty();
}
}
private static String dockerContainer(Command cmd) throws StatusException {
String result = null;
for (Platform.Property property : cmd.getPlatform().getPropertiesList()) {
if (property.getName().equals(CONTAINER_IMAGE_ENTRY_NAME)) {
if (result != null) {
// Multiple container name entries
throw StatusUtils.invalidArgumentError(
"platform", // Field name.
String.format(
"Multiple entries for %s in action.Platform", CONTAINER_IMAGE_ENTRY_NAME));
}
result = property.getValue();
if (!result.startsWith(DOCKER_IMAGE_PREFIX)) {
throw StatusUtils.invalidArgumentError(
"platform", // Field name.
String.format(
"%s: Docker images must be stored in gcr.io with an image spec in the form "
+ "'docker://gcr.io/{IMAGE_NAME}'",
CONTAINER_IMAGE_ENTRY_NAME));
}
result = result.substring(DOCKER_IMAGE_PREFIX.length());
}
}
return result;
}
static SetMultimap<String, String> createProvisions(Platform platform) {
ImmutableSetMultimap.Builder<String, String> provisions = ImmutableSetMultimap.builder();
for (Platform.Property property : platform.getPropertiesList()) {
provisions.put(property.getName(), property.getValue());
}
return provisions.build();
}
@Override
public void match(Platform platform, MatchListener listener) throws InterruptedException {
WorkerQueue queue = queuedOperations.MatchEligibleQueue(createProvisions(platform));
synchronized (queue.operations) {
matchSynchronized(platform, listener);
}
}
public Boolean validProperties(List<Platform.Property> provisions) {
for (ProvisionedRedisQueue provisionedQueue : queues) {
if (provisionedQueue.isEligible(toMultimap(provisions))) {
return true;
}
}
return false;
}
private BalancedRedisQueue chooseEligibleQueue(List<Platform.Property> provisions) {
for (ProvisionedRedisQueue provisionedQueue : queues) {
if (provisionedQueue.isEligible(toMultimap(provisions))) {
return provisionedQueue.queue();
}
}
throw new RuntimeException(
"there are no eligible queues for the provided execution requirements. One solution to is to configure a provision queue with no requirements which would be eligible to all operations.");
}
private SetMultimap<String, String> toMultimap(List<Platform.Property> provisions) {
SetMultimap<String, String> set = LinkedHashMultimap.create();
for (Platform.Property property : provisions) {
set.put(property.getName(), property.getValue());
}
return set;
}
private SetMultimap<String, String> toMultimap(List<Platform.Property> provisions) {
SetMultimap<String, String> set = LinkedHashMultimap.create();
for (Platform.Property property : provisions) {
set.put(property.getName(), property.getValue());
}
return set;
}
private void queue(
JedisCluster jedis,
String operationName,
List<Platform.Property> provisions,
String queueEntryJson) {
if (jedis.hdel(config.getDispatchedOperationsHashName(), operationName) == 1) {
logger.log(Level.WARNING, format("removed dispatched operation %s", operationName));
}
operationQueue.push(jedis, provisions, queueEntryJson);
}
@Override
public QueueEntry dispatchOperation(List<Platform.Property> provisions)
throws IOException, InterruptedException {
return client.blockingCall(
jedis -> {
return dispatchOperation(jedis, provisions);
});
}
@Override
public void match(Platform platform, MatchListener listener) throws InterruptedException {
throwIfStopped();
TakeOperationRequest request =
TakeOperationRequest.newBuilder().setInstanceName(getName()).setPlatform(platform).build();
boolean complete = false;
while (!complete) {
listener.onWaitStart();
try {
QueueEntry queueEntry;
try {
queueEntry = deadlined(operationQueueBlockingStub).take(request);
} finally {
listener.onWaitEnd();
}
listener.onEntry(queueEntry);
complete = true;
} catch (Exception e) {
Status status = Status.fromThrowable(e);
if (status.getCode() != Status.Code.DEADLINE_EXCEEDED) {
listener.onError(e);
complete = true;
}
// ignore DEADLINE_EXCEEDED to prevent long running request behavior
}
}
}
public static boolean satisfiesRequirements(
SetMultimap<String, String> provisions, Platform requirements) {
for (Platform.Property property : requirements.getPropertiesList()) {
if (!satisfiesRequirement(provisions, property.getName(), property.getValue())) {
return false;
}
}
return true;
}
static Platform getMatchPlatform(Platform platform, Iterable<ExecutionPolicy> policies) {
Platform.Builder builder = platform.toBuilder();
for (ExecutionPolicy policy : policies) {
String name = policy.getName();
if (!name.isEmpty()) {
builder.addPropertiesBuilder().setName("execution-policy").setValue(policy.getName());
}
}
return builder.build();
}
static SetMultimap<String, String> getMatchProvisions(
Platform platform, Iterable<ExecutionPolicy> policyNames, int executeStageWidth) {
ImmutableSetMultimap.Builder<String, String> provisions = ImmutableSetMultimap.builder();
for (Platform.Property property : platform.getPropertiesList()) {
provisions.put(property.getName(), property.getValue());
}
for (ExecutionPolicy policy : policyNames) {
String name = policy.getName();
if (!name.isEmpty()) {
provisions.put("execution-policy", name);
}
}
provisions.put("cores", String.format("%d", executeStageWidth));
return provisions.build();
}
@Test
public void matchCancelRemovesFromWorkers() throws InterruptedException {
MatchListener listener = mock(MatchListener.class);
instance.match(Platform.getDefaultInstance(), listener);
ArgumentCaptor<Runnable> onCancelHandlerCaptor = ArgumentCaptor.forClass(Runnable.class);
verify(listener, times(1)).setOnCancelHandler(onCancelHandlerCaptor.capture());
onCancelHandlerCaptor.getValue().run();
assertThat(workers).isEmpty();
}
private static Protos.Platform buildPlatform(Platform platform) {
Protos.Platform.Builder platformBuilder = Protos.Platform.newBuilder();
for (Platform.Property p : platform.getPropertiesList()) {
platformBuilder.addPropertiesBuilder().setName(p.getName()).setValue(p.getValue());
}
return platformBuilder.build();
}
private static void sortPlatformProperties(Platform.Builder builder) {
List<Platform.Property> properties =
Ordering.from(Comparator.comparing(Platform.Property::getName))
.sortedCopy(builder.getPropertiesList());
builder.clearProperties();
builder.addAllProperties(properties);
}
@Nullable
public static Platform buildPlatformProto(Map<String, String> executionProperties) {
if (executionProperties.isEmpty()) {
return null;
}
Platform.Builder builder = Platform.newBuilder();
for (Map.Entry<String, String> keyValue : executionProperties.entrySet()) {
Property property =
Property.newBuilder().setName(keyValue.getKey()).setValue(keyValue.getValue()).build();
builder.addProperties(property);
}
sortPlatformProperties(builder);
return builder.build();
}
static Command buildCommand(
Collection<? extends ActionInput> outputs,
List<String> arguments,
ImmutableMap<String, String> env,
@Nullable Platform platform,
@Nullable String workingDirectory) {
Command.Builder command = Command.newBuilder();
ArrayList<String> outputFiles = new ArrayList<>();
ArrayList<String> outputDirectories = new ArrayList<>();
for (ActionInput output : outputs) {
String pathString = output.getExecPathString();
if (output instanceof Artifact && ((Artifact) output).isTreeArtifact()) {
outputDirectories.add(pathString);
} else {
outputFiles.add(pathString);
}
}
Collections.sort(outputFiles);
Collections.sort(outputDirectories);
command.addAllOutputFiles(outputFiles);
command.addAllOutputDirectories(outputDirectories);
if (platform != null) {
command.setPlatform(platform);
}
command.addAllArguments(arguments);
// Sorting the environment pairs by variable name.
TreeSet<String> variables = new TreeSet<>(env.keySet());
for (String var : variables) {
command.addEnvironmentVariablesBuilder().setName(var).setValue(env.get(var));
}
if (!Strings.isNullOrEmpty(workingDirectory)) {
command.setWorkingDirectory(workingDirectory);
}
return command.build();
}
private static String platformAsString(@Nullable Platform platform) {
if (platform == null) {
return "";
}
String separator = "";
StringBuilder value = new StringBuilder();
for (Property property : platform.getPropertiesList()) {
value.append(separator).append(property.getName()).append("=").append(property.getValue());
separator = ",";
}
return value.toString();
}
@Test
public void testParsePlatformLegacyOptions() throws Exception {
Platform expected =
Platform.newBuilder()
.addProperties(Platform.Property.newBuilder().setName("a").setValue("1"))
.addProperties(Platform.Property.newBuilder().setName("b").setValue("2"))
.build();
PlatformInfo platform =
PlatformInfo.builder().setRemoteExecutionProperties(platformOptionsString()).build();
Spawn s = new SpawnBuilder("dummy").withPlatform(platform).build();
assertThat(PlatformUtils.getPlatformProto(s, null)).isEqualTo(expected);
}
@Test
public void testParsePlatformSortsProperties() throws Exception {
Platform expected =
Platform.newBuilder()
.addProperties(Platform.Property.newBuilder().setName("a").setValue("1"))
.addProperties(Platform.Property.newBuilder().setName("b").setValue("2"))
.build();
Spawn s = new SpawnBuilder("dummy").build();
assertThat(PlatformUtils.getPlatformProto(s, remoteOptions())).isEqualTo(expected);
}
@Test
public void testParsePlatformSortsProperties_ExecProperties() throws Exception {
// execProperties are chosen even if there are remoteOptions
ImmutableMap<String, String> map = ImmutableMap.of("aa", "99", "zz", "66", "dd", "11");
Spawn s = new SpawnBuilder("dummy").withExecProperties(map).build();
Platform expected =
Platform.newBuilder()
.addProperties(Platform.Property.newBuilder().setName("aa").setValue("99"))
.addProperties(Platform.Property.newBuilder().setName("dd").setValue("11"))
.addProperties(Platform.Property.newBuilder().setName("zz").setValue("66"))
.build();
// execProperties are sorted by key
assertThat(PlatformUtils.getPlatformProto(s, remoteOptions())).isEqualTo(expected);
}
@Override
public Command newCommand(
ImmutableList<String> command,
ImmutableSortedMap<String, String> commandEnvironment,
Set<Path> outputs,
WorkerRequirements workerRequirements) {
List<String> outputStrings =
outputs.stream().map(Path::toString).sorted().collect(Collectors.toList());
Platform.Builder platformBuilder = Platform.newBuilder();
platformBuilder.addProperties(
Property.newBuilder()
.setName("SIZE")
.setValue(workerRequirements.getWorkerSize().name())
.build());
platformBuilder.addProperties(
Property.newBuilder()
.setName("PLATFORM")
.setValue(workerRequirements.getPlatformType().name())
.build());
return new GrpcCommand(
build.bazel.remote.execution.v2.Command.newBuilder()
.addAllArguments(command)
.addAllEnvironmentVariables(
commandEnvironment.entrySet().stream()
.map(
entry ->
EnvironmentVariable.newBuilder()
.setName(entry.getKey())
.setValue(entry.getValue())
.build())
.collect(Collectors.toList()))
.addAllOutputFiles(outputStrings)
.addAllOutputDirectories(outputStrings)
.setPlatform(platformBuilder.build())
.build());
}
@Override
protected void validatePlatform(
Platform platform, PreconditionFailure.Builder preconditionFailure) {
int minCores = 0;
int maxCores = -1;
// check that the platform properties correspond to valid provisions for the OperationQeueue.
// if the operation is eligible to be put anywhere in the OperationQueue, it passes validation.
boolean validForOperationQueue = backplane.validQueueProperties(platform.getPropertiesList());
if (!validForOperationQueue) {
preconditionFailure
.addViolationsBuilder()
.setType(VIOLATION_TYPE_INVALID)
.setSubject(INVALID_PLATFORM)
.setDescription(
format(
"properties are not valid for queue eligibility: %s",
platform.getPropertiesList()));
}
for (Property property : platform.getPropertiesList()) {
/* FIXME generalize with config */
if (property.getName().equals("min-cores") || property.getName().equals("max-cores")) {
try {
int intValue = Integer.parseInt(property.getValue());
if (intValue <= 0 || intValue > 80) {
preconditionFailure
.addViolationsBuilder()
.setType(VIOLATION_TYPE_INVALID)
.setSubject(INVALID_PLATFORM)
.setDescription(
format(
"property '%s' value was out of range: %d", property.getName(), intValue));
}
if (property.getName().equals("min-cores")) {
minCores = intValue;
} else {
maxCores = intValue;
}
} catch (NumberFormatException e) {
preconditionFailure
.addViolationsBuilder()
.setType(VIOLATION_TYPE_INVALID)
.setSubject(INVALID_PLATFORM)
.setDescription(
format(
"property '%s' value was not a valid integer: %s",
property.getName(), property.getValue()));
}
// An individual platform property may not be valid on its own,
// but instead, valid in the context of the full platform where the configured
// OperationQueue checks the eligibility.
// Therefore, we do not consider an individual property invalid when it has been previously
// validated against the OperationQueue.
} else if (validForOperationQueue) {
} else {
preconditionFailure
.addViolationsBuilder()
.setType(VIOLATION_TYPE_INVALID)
.setSubject(INVALID_PLATFORM)
.setDescription(format("property name '%s' is invalid", property.getName()));
}
}
if (maxCores != -1 && minCores > 0 && maxCores < minCores) {
preconditionFailure
.addViolationsBuilder()
.setType(VIOLATION_TYPE_INVALID)
.setSubject(INVALID_PLATFORM)
.setDescription(format("max-cores (%d) must be >= min-cores (%d)", maxCores, minCores));
}
}
@Override
public void match(Platform platform, MatchListener listener) {
throw new UnsupportedOperationException();
}