下面列出了org.apache.commons.lang.mutable.MutableObject#getValue ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public <R> R run(Supplier<R> function) {
// runBackground synchronously on JavaFX thread
if (Platform.isFxApplicationThread()) {
return function.get();
}
// queue on JavaFX thread and wait for completion
final CountDownLatch doneLatch = new CountDownLatch(1);
final MutableObject result = new MutableObject();
Platform.runLater(() -> {
try {
result.setValue(function.get());
} finally {
doneLatch.countDown();
}
});
try {
doneLatch.await();
} catch (InterruptedException e) {
// ignore exception
}
return (R) result.getValue();
}
void addScheduleGroupCheckers() {
final GlobalDAGChecker scheduleGroupTopoOrdering = (irdag -> {
int lastSeenScheduleGroup = Integer.MIN_VALUE;
for (final IRVertex v : irdag.getVertices()) {
final MutableObject violatingReachableVertex = new MutableObject();
v.getPropertyValue(ScheduleGroupProperty.class).ifPresent(startingScheduleGroup ->
irdag.dfsDo(
v,
visited -> {
if (visited.getPropertyValue(ScheduleGroupProperty.class).isPresent()
&& visited.getPropertyValue(ScheduleGroupProperty.class).get() < startingScheduleGroup) {
violatingReachableVertex.setValue(visited);
}
},
DAGInterface.TraversalOrder.PreOrder,
new HashSet<>()));
if (violatingReachableVertex.getValue() != null) {
return failure(
"A reachable vertex with a smaller schedule group ",
v,
ScheduleGroupProperty.class,
violatingReachableVertex.getValue(),
ScheduleGroupProperty.class);
}
}
return success();
});
globalDAGCheckerList.add(scheduleGroupTopoOrdering);
final SingleEdgeChecker splitByPull = (edge -> {
if (Util.isControlEdge(edge)) {
return success();
}
if (Optional.of(DataFlowProperty.Value.PULL).equals(edge.getPropertyValue(DataFlowProperty.class))) {
final Optional<Integer> srcSG = edge.getSrc().getPropertyValue(ScheduleGroupProperty.class);
final Optional<Integer> dstSG = edge.getDst().getPropertyValue(ScheduleGroupProperty.class);
if (srcSG.isPresent() && dstSG.isPresent()) {
if (srcSG.get().equals(dstSG.get())) {
return failure("Schedule group must split by Pull",
edge.getSrc(), ScheduleGroupProperty.class, edge.getDst(), ScheduleGroupProperty.class);
}
}
}
return success();
});
singleEdgeCheckerList.add(splitByPull);
}
private CSAssignment assignContainersOnNode(Resource clusterResource,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, ResourceLimits currentResoureLimits) {
Resource assigned = Resources.none();
NodeType requestType = null;
MutableObject allocatedContainer = new MutableObject();
// Data-local
ResourceRequest nodeLocalResourceRequest =
application.getResourceRequest(priority, node.getNodeName());
if (nodeLocalResourceRequest != null) {
requestType = NodeType.NODE_LOCAL;
assigned =
assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
node, application, priority, reservedContainer,
allocatedContainer, currentResoureLimits);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {
//update locality statistics
if (allocatedContainer.getValue() != null) {
application.incNumAllocatedContainers(NodeType.NODE_LOCAL,
requestType);
}
return new CSAssignment(assigned, NodeType.NODE_LOCAL);
}
}
// Rack-local
ResourceRequest rackLocalResourceRequest =
application.getResourceRequest(priority, node.getRackName());
if (rackLocalResourceRequest != null) {
if (!rackLocalResourceRequest.getRelaxLocality()) {
return SKIP_ASSIGNMENT;
}
if (requestType != NodeType.NODE_LOCAL) {
requestType = NodeType.RACK_LOCAL;
}
assigned =
assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
node, application, priority, reservedContainer,
allocatedContainer, currentResoureLimits);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {
//update locality statistics
if (allocatedContainer.getValue() != null) {
application.incNumAllocatedContainers(NodeType.RACK_LOCAL,
requestType);
}
return new CSAssignment(assigned, NodeType.RACK_LOCAL);
}
}
// Off-switch
ResourceRequest offSwitchResourceRequest =
application.getResourceRequest(priority, ResourceRequest.ANY);
if (offSwitchResourceRequest != null) {
if (!offSwitchResourceRequest.getRelaxLocality()) {
return SKIP_ASSIGNMENT;
}
if (requestType != NodeType.NODE_LOCAL
&& requestType != NodeType.RACK_LOCAL) {
requestType = NodeType.OFF_SWITCH;
}
assigned =
assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
node, application, priority, reservedContainer,
allocatedContainer, currentResoureLimits);
// update locality statistics
if (allocatedContainer.getValue() != null) {
application.incNumAllocatedContainers(NodeType.OFF_SWITCH, requestType);
}
return new CSAssignment(assigned, NodeType.OFF_SWITCH);
}
return SKIP_ASSIGNMENT;
}
private CSAssignment assignContainersOnNode(Resource clusterResource,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, ResourceLimits currentResoureLimits) {
Resource assigned = Resources.none();
NodeType requestType = null;
MutableObject allocatedContainer = new MutableObject();
//注意这里的优先级,先分配node local的,再分配rack local的最后分配rack off的
// Data-local
ResourceRequest nodeLocalResourceRequest =
application.getResourceRequest(priority, node.getNodeName());
if (nodeLocalResourceRequest != null) {
requestType = NodeType.NODE_LOCAL;
assigned =
assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
node, application, priority, reservedContainer,
allocatedContainer, currentResoureLimits);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {
//update locality statistics
if (allocatedContainer.getValue() != null) {
application.incNumAllocatedContainers(NodeType.NODE_LOCAL,
requestType);
}
return new CSAssignment(assigned, NodeType.NODE_LOCAL);
}
}
// Rack-local
ResourceRequest rackLocalResourceRequest =
application.getResourceRequest(priority, node.getRackName());
if (rackLocalResourceRequest != null) {
if (!rackLocalResourceRequest.getRelaxLocality()) {
return SKIP_ASSIGNMENT;
}
if (requestType != NodeType.NODE_LOCAL) {
requestType = NodeType.RACK_LOCAL;
}
assigned =
assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
node, application, priority, reservedContainer,
allocatedContainer, currentResoureLimits);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {
//update locality statistics
if (allocatedContainer.getValue() != null) {
application.incNumAllocatedContainers(NodeType.RACK_LOCAL,
requestType);
}
return new CSAssignment(assigned, NodeType.RACK_LOCAL);
}
}
// Off-switch
ResourceRequest offSwitchResourceRequest =
application.getResourceRequest(priority, ResourceRequest.ANY);
if (offSwitchResourceRequest != null) {
if (!offSwitchResourceRequest.getRelaxLocality()) {
return SKIP_ASSIGNMENT;
}
if (requestType != NodeType.NODE_LOCAL
&& requestType != NodeType.RACK_LOCAL) {
requestType = NodeType.OFF_SWITCH;
}
assigned =
assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
node, application, priority, reservedContainer,
allocatedContainer, currentResoureLimits);
// update locality statistics
if (allocatedContainer.getValue() != null) {
application.incNumAllocatedContainers(NodeType.OFF_SWITCH, requestType);
}
return new CSAssignment(assigned, NodeType.OFF_SWITCH);
}
return SKIP_ASSIGNMENT;
}
@Override
public ProtocolResponse getProtocolOutput(String url, final Metadata metadata) throws Exception {
Builder rb = new Request.Builder().url(url);
customRequestHeaders.forEach((k) -> {
rb.header(k[0], k[1]);
});
if (metadata != null) {
String lastModified = metadata.getFirstValue("last-modified");
if (StringUtils.isNotBlank(lastModified)) {
rb.header("If-Modified-Since",
HttpHeaders.formatHttpDate(lastModified));
}
String ifNoneMatch = metadata.getFirstValue("etag", protocolMDprefix);
if (StringUtils.isNotBlank(ifNoneMatch)) {
rb.header("If-None-Match", ifNoneMatch);
}
String accept = metadata.getFirstValue("http.accept");
if (StringUtils.isNotBlank(accept)) {
rb.header("Accept", accept);
}
String acceptLanguage = metadata.getFirstValue("http.accept.language");
if (StringUtils.isNotBlank(acceptLanguage)) {
rb.header("Accept-Language", acceptLanguage);
}
if (useCookies) {
addCookiesToRequest(rb, url, metadata);
}
String postJSONData = metadata.getFirstValue("http.post.json");
if (StringUtils.isNotBlank(postJSONData)) {
RequestBody body = RequestBody.create(JSON, postJSONData);
rb.post(body);
}
}
Request request = rb.build();
Call call = client.newCall(request);
try (Response response = call.execute()) {
byte[] bytes = new byte[] {};
Metadata responsemetadata = new Metadata();
Headers headers = response.headers();
for (int i = 0, size = headers.size(); i < size; i++) {
String key = headers.name(i);
String value = headers.value(i);
if (key.equals(ProtocolResponse.REQUEST_HEADERS_KEY)
|| key.equals(ProtocolResponse.RESPONSE_HEADERS_KEY)) {
value = new String(Base64.getDecoder().decode(value));
}
responsemetadata.addValue(key.toLowerCase(Locale.ROOT), value);
}
MutableObject trimmed = new MutableObject(TrimmedContentReason.NOT_TRIMMED);
bytes = toByteArray(response.body(), trimmed);
if (trimmed.getValue() != TrimmedContentReason.NOT_TRIMMED) {
if (!call.isCanceled()) {
call.cancel();
}
responsemetadata.setValue(ProtocolResponse.TRIMMED_RESPONSE_KEY,
"true");
responsemetadata.setValue(
ProtocolResponse.TRIMMED_RESPONSE_REASON_KEY,
trimmed.getValue().toString().toLowerCase(Locale.ROOT));
LOG.warn("HTTP content trimmed to {}", bytes.length);
}
return new ProtocolResponse(bytes, response.code(), responsemetadata);
}
}