下面列出了org.apache.hadoop.mapreduce.v2.LogParams#org.apache.hadoop.yarn.api.records.NodeId 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testNodeRegistrationFailure() throws Exception {
writeToHostsFile("host1");
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
.getAbsolutePath());
rm = new MockRM(conf);
rm.start();
ResourceTrackerService resourceTrackerService = rm.getResourceTrackerService();
RegisterNodeManagerRequest req = Records.newRecord(
RegisterNodeManagerRequest.class);
NodeId nodeId = NodeId.newInstance("host2", 1234);
req.setNodeId(nodeId);
req.setHttpPort(1234);
// trying to register a invalid node.
RegisterNodeManagerResponse response = resourceTrackerService.registerNodeManager(req);
Assert.assertEquals(NodeAction.SHUTDOWN,response.getNodeAction());
Assert
.assertEquals(
"Disallowed NodeManager from host2, Sending SHUTDOWN signal to the NodeManager.",
response.getDiagnosticsMessage());
}
private void sendFinishedContainersToNM() {
for (NodeId nodeId : finishedContainersSentToAM.keySet()) {
// Clear and get current values
List<ContainerStatus> currentSentContainers =
finishedContainersSentToAM.put(nodeId,
new ArrayList<ContainerStatus>());
List<ContainerId> containerIdList =
new ArrayList<ContainerId>(currentSentContainers.size());
for (ContainerStatus containerStatus : currentSentContainers) {
containerIdList.add(containerStatus.getContainerId());
}
eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent(nodeId,
containerIdList));
}
}
@Test
public void testSuccessfulFinishingToFinished() {
Container amContainer = allocateApplicationAttempt();
launchApplicationAttempt(amContainer);
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
String trackingUrl = "mytrackingurl";
String diagnostics = "Successful";
unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl,
diagnostics);
NodeId anyNodeId = NodeId.newInstance("host", 1234);
applicationAttempt.handle(
new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(),
BuilderUtils.newContainerStatus(amContainer.getId(),
ContainerState.COMPLETE, "", 0), anyNodeId));
testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
diagnostics, 0, false);
}
@Test(timeout=5000)
public void testHealthUpdateUnknownNode() {
AppContext appContext = mock(AppContext.class);
AMNodeMap amNodeMap = new AMNodeMap(eventHandler, appContext);
amNodeMap.init(new Configuration(false));
amNodeMap.start();
NodeId nodeId = NodeId.newInstance("unknownhost", 2342);
NodeReport nodeReport = generateNodeReport(nodeId, NodeState.UNHEALTHY);
amNodeMap.handle(new AMNodeEventStateChanged(nodeReport));
dispatcher.await();
amNodeMap.stop();
// No exceptions - the status update was ignored. Not bothering to capture
// the log message for verification.
}
@Test
public void testFinishingToFinishing() {
Container amContainer = allocateApplicationAttempt();
launchApplicationAttempt(amContainer);
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
String trackingUrl = "mytrackingurl";
String diagnostics = "Successful";
unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl,
diagnostics);
// container must be AM container to move from FINISHING to FINISHED
NodeId anyNodeId = NodeId.newInstance("host", 1234);
applicationAttempt.handle(
new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(),
BuilderUtils.newContainerStatus(
BuilderUtils.newContainerId(
applicationAttempt.getAppAttemptId(), 42),
ContainerState.COMPLETE, "", 0), anyNodeId));
testAppAttemptFinishingState(amContainer, finalStatus, trackingUrl,
diagnostics);
}
private NodeId verifyAndGetNodeId(Block html) {
String nodeIdStr = $(NM_NODENAME);
if (nodeIdStr == null || nodeIdStr.isEmpty()) {
html.h1()._("Cannot get container logs without a NodeId")._();
return null;
}
NodeId nodeId = null;
try {
nodeId = ConverterUtils.toNodeId(nodeIdStr);
} catch (IllegalArgumentException e) {
html.h1()._("Cannot get container logs. Invalid nodeId: " + nodeIdStr)
._();
return null;
}
return nodeId;
}
private String testStopContainer(YarnRPC rpc,
ApplicationAttemptId appAttemptId, NodeId nodeId,
ContainerId containerId, Token nmToken, boolean isExceptionExpected) {
try {
stopContainer(rpc, nmToken,
Arrays.asList(new ContainerId[] { containerId }), appAttemptId,
nodeId);
if (isExceptionExpected) {
fail("Exception was expected!!");
}
return "";
} catch (Exception e) {
e.printStackTrace();
return e.getMessage();
}
}
@Test(timeout=5000)
public void testGetLabelResourceWhenMultipleNMsExistingInSameHost() throws IOException {
// active two NM to n1, one large and one small
mgr.activateNode(NodeId.newInstance("n1", 1), SMALL_RESOURCE);
mgr.activateNode(NodeId.newInstance("n1", 2), SMALL_RESOURCE);
mgr.activateNode(NodeId.newInstance("n1", 3), SMALL_RESOURCE);
mgr.activateNode(NodeId.newInstance("n1", 4), SMALL_RESOURCE);
// check resource of no label, it should be small * 4
Assert.assertEquals(
mgr.getResourceByLabel(CommonNodeLabelsManager.NO_LABEL, null),
Resources.multiply(SMALL_RESOURCE, 4));
// change two of these nodes to p1, check resource of no_label and P1
mgr.addToCluserNodeLabels(toSet("p1"));
mgr.addLabelsToNode(ImmutableMap.of(toNodeId("n1:1"), toSet("p1"),
toNodeId("n1:2"), toSet("p1")));
// check resource
Assert.assertEquals(
mgr.getResourceByLabel(CommonNodeLabelsManager.NO_LABEL, null),
Resources.multiply(SMALL_RESOURCE, 2));
Assert.assertEquals(
mgr.getResourceByLabel("p1", null),
Resources.multiply(SMALL_RESOURCE, 2));
}
private void startContainer(final YarnRPC rpc,
org.apache.hadoop.yarn.api.records.Token nmToken,
org.apache.hadoop.yarn.api.records.Token containerToken,
NodeId nodeId, String user) throws Exception {
ContainerLaunchContext context =
Records.newRecord(ContainerLaunchContext.class);
StartContainerRequest scRequest =
StartContainerRequest.newInstance(context,containerToken);
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
ContainerManagementProtocol proxy = null;
try {
proxy = getContainerManagementProtocolProxy(rpc, nmToken, nodeId, user);
StartContainersResponse response = proxy.startContainers(allRequests);
for(SerializedException ex : response.getFailedRequests().values()){
parseAndThrowException(ex.deSerialize());
}
} finally {
if (proxy != null) {
rpc.stopProxy(proxy, conf);
}
}
}
@Test
public void testRMHARecoverNodeLabels() throws Exception {
// start two RMs, and transit rm1 to active, rm2 to standby
startRMs();
// Add labels to rm1
rm1.getRMContext()
.getNodeLabelManager()
.addToCluserNodeLabels(ImmutableSet.of("a", "b", "c"));
Map<NodeId, Set<String>> nodeToLabels = new HashMap<>();
nodeToLabels.put(NodeId.newInstance("host1", 0), ImmutableSet.of("a"));
nodeToLabels.put(NodeId.newInstance("host2", 0), ImmutableSet.of("b"));
rm1.getRMContext().getNodeLabelManager().replaceLabelsOnNode(nodeToLabels);
// Do the failover
explicitFailover();
// Check labels in rm2
Assert
.assertTrue(rm2.getRMContext().getNodeLabelManager()
.getClusterNodeLabels()
.containsAll(ImmutableSet.of("a", "b", "c")));
Assert.assertTrue(rm2.getRMContext().getNodeLabelManager()
.getNodeLabels().get(NodeId.newInstance("host1", 0)).contains("a"));
Assert.assertTrue(rm2.getRMContext().getNodeLabelManager()
.getNodeLabels().get(NodeId.newInstance("host2", 0)).contains("b"));
}
@SuppressWarnings("deprecation")
@Test
public void testContainersCleanupForLastAttempt() {
// create a failed attempt.
applicationAttempt =
new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(), spyRMContext,
scheduler, masterService, submissionContext, new Configuration(),
true, BuilderUtils.newResourceRequest(
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
submissionContext.getResource(), 1));
when(submissionContext.getKeepContainersAcrossApplicationAttempts())
.thenReturn(true);
when(submissionContext.getMaxAppAttempts()).thenReturn(1);
Container amContainer = allocateApplicationAttempt();
launchApplicationAttempt(amContainer);
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
ContainerStatus cs1 =
ContainerStatus.newInstance(amContainer.getId(),
ContainerState.COMPLETE, "some error", 123);
ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId();
NodeId anyNodeId = NodeId.newInstance("host", 1234);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
appAttemptId, cs1, anyNodeId));
assertEquals(YarnApplicationAttemptState.RUNNING,
applicationAttempt.createApplicationAttemptState());
sendAttemptUpdateSavedEvent(applicationAttempt);
assertEquals(RMAppAttemptState.FAILED,
applicationAttempt.getAppAttemptState());
assertFalse(transferStateFromPreviousAttempt);
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
}
@Test
public void testNodeStatusWithEmptyNodeLabels() throws Exception {
NodeId nodeId = NodeId.newInstance("host0", 0);
NodeCLI cli = new NodeCLI();
when(client.getNodeReports()).thenReturn(
getNodeReports(3, NodeState.RUNNING));
cli.setClient(client);
cli.setSysOutPrintStream(sysOut);
cli.setSysErrPrintStream(sysErr);
int result = cli.run(new String[] { "-status", nodeId.toString() });
assertEquals(0, result);
verify(client).getNodeReports();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintWriter pw = new PrintWriter(baos);
pw.println("Node Report : ");
pw.println("\tNode-Id : host0:0");
pw.println("\tRack : rack1");
pw.println("\tNode-State : RUNNING");
pw.println("\tNode-Http-Address : host1:8888");
pw.println("\tLast-Health-Update : "
+ DateFormatUtils.format(new Date(0), "E dd/MMM/yy hh:mm:ss:SSzz"));
pw.println("\tHealth-Report : ");
pw.println("\tContainers : 0");
pw.println("\tMemory-Used : 0MB");
pw.println("\tMemory-Capacity : 0MB");
pw.println("\tCPU-Used : 0 vcores");
pw.println("\tCPU-Capacity : 0 vcores");
pw.println("\tNode-Labels : ");
pw.close();
String nodeStatusStr = baos.toString("UTF-8");
verify(sysOut, times(1)).println(isA(String.class));
verify(sysOut).println(nodeStatusStr);
}
@SuppressWarnings("deprecation")
@Test
public void testContainersCleanupForLastAttempt() {
// create a failed attempt.
applicationAttempt =
new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(), spyRMContext,
scheduler, masterService, submissionContext, new Configuration(),
true, BuilderUtils.newResourceRequest(
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
submissionContext.getResource(), 1));
when(submissionContext.getKeepContainersAcrossApplicationAttempts())
.thenReturn(true);
when(submissionContext.getMaxAppAttempts()).thenReturn(1);
Container amContainer = allocateApplicationAttempt();
launchApplicationAttempt(amContainer);
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
ContainerStatus cs1 =
ContainerStatus.newInstance(amContainer.getId(),
ContainerState.COMPLETE, "some error", 123);
ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId();
NodeId anyNodeId = NodeId.newInstance("host", 1234);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
appAttemptId, cs1, anyNodeId));
assertEquals(YarnApplicationAttemptState.RUNNING,
applicationAttempt.createApplicationAttemptState());
sendAttemptUpdateSavedEvent(applicationAttempt);
assertEquals(RMAppAttemptState.FAILED,
applicationAttempt.getAppAttemptState());
assertFalse(transferStateFromPreviousAttempt);
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
}
@Override
public int compare(NodeId n1, NodeId n2) {
if (!nodes.containsKey(n1)) {
return 1;
}
if (!nodes.containsKey(n2)) {
return -1;
}
return RESOURCE_CALCULATOR.compare(clusterResource,
nodes.get(n2).getAvailableResource(),
nodes.get(n1).getAvailableResource());
}
@SuppressWarnings("unchecked")
public synchronized void containerLaunchedOnNode(ContainerId containerId,
NodeId nodeId) {
// Inform the container
RMContainer rmContainer = getRMContainer(containerId);
if (rmContainer == null) {
// Some unknown container sneaked into the system. Kill it.
rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeCleanContainerEvent(nodeId, containerId));
return;
}
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.LAUNCHED));
}
@Override
public void setNodeId(NodeId nodeId) {
maybeInitBuilder();
if (nodeId == null)
builder.clearNodeId();
this.nodeId = nodeId;
}
public FakeRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
Resource perNode, String rackName, String healthReport,
int cmdPort, String hostName, NodeState state) {
this.nodeId = nodeId;
this.nodeAddr = nodeAddr;
this.httpAddress = httpAddress;
this.perNode = perNode;
this.rackName = rackName;
this.healthReport = healthReport;
this.cmdPort = cmdPort;
this.hostName = hostName;
this.state = state;
toCleanUpApplications = new ArrayList<ApplicationId>();
toCleanUpContainers = new ArrayList<ContainerId>();
}
private void replaceNodeForLabels(NodeId node, Set<String> oldLabels,
Set<String> newLabels) {
if(oldLabels != null) {
removeNodeFromLabels(node, oldLabels);
}
addNodeToLabels(node, newLabels);
}
@Override
public int compare(NodeId n1, NodeId n2) {
if (!nodes.containsKey(n1)) {
return 1;
}
if (!nodes.containsKey(n2)) {
return -1;
}
return RESOURCE_CALCULATOR.compare(clusterResource,
nodes.get(n2).getAvailableResource(),
nodes.get(n1).getAvailableResource());
}
protected Map<NodeId, Set<String>> normalizeNodeIdToLabels(
Map<NodeId, Set<String>> nodeIdToLabels) {
Map<NodeId, Set<String>> newMap = new HashMap<NodeId, Set<String>>();
for (Entry<NodeId, Set<String>> entry : nodeIdToLabels.entrySet()) {
NodeId id = entry.getKey();
Set<String> labels = entry.getValue();
newMap.put(id, normalizeLabels(labels));
}
return newMap;
}
@Override
public void handle(ContainerAllocatorEvent event) {
ContainerId cId =
ContainerId.newContainerId(getContext().getApplicationAttemptId(),
containerCount++);
NodeId nodeId = NodeId.newInstance(NM_HOST, NM_PORT);
Resource resource = Resource.newInstance(1234, 2);
ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier(cId, nodeId.toString(), "user",
resource, System.currentTimeMillis() + 10000, 42, 42,
Priority.newInstance(0), 0);
Token containerToken = newContainerToken(nodeId, "password".getBytes(),
containerTokenIdentifier);
Container container = Container.newInstance(cId, nodeId,
NM_HOST + ":" + NM_HTTP_PORT, resource, null, containerToken);
JobID id = TypeConverter.fromYarn(applicationId);
JobId jobId = TypeConverter.toYarn(id);
getContext().getEventHandler().handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(
org.apache.hadoop.mapreduce.TaskType.REDUCE,
100)));
getContext().getEventHandler().handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(
org.apache.hadoop.mapreduce.TaskType.MAP,
100)));
getContext().getEventHandler().handle(
new TaskAttemptContainerAssignedEvent(event.getAttemptID(),
container, null));
}
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
NodeId nodeId, String user,
NMContainerTokenSecretManager containerTokenSecretManager)
throws IOException {
return createContainerToken(cId, rmIdentifier, nodeId, user,
containerTokenSecretManager, null);
}
@Override
public void setAssignedNode(NodeId nodeId) {
maybeInitBuilder();
if (nodeId == null) {
builder.clearAssignedNodeId();
}
this.nodeId = nodeId;
}
public static NodeStatus newInstance(NodeId nodeId, int responseId,
List<ContainerStatus> containerStatuses,
List<ApplicationId> keepAliveApplications,
NodeHealthStatus nodeHealthStatus) {
NodeStatus nodeStatus = Records.newRecord(NodeStatus.class);
nodeStatus.setResponseId(responseId);
nodeStatus.setNodeId(nodeId);
nodeStatus.setContainersStatuses(containerStatuses);
nodeStatus.setKeepAliveApplications(keepAliveApplications);
nodeStatus.setNodeHealthStatus(nodeHealthStatus);
return nodeStatus;
}
public NodeId toNodeId(String str) {
if (str.contains(":")) {
int idx = str.indexOf(':');
NodeId id =
NodeId.newInstance(str.substring(0, idx),
Integer.valueOf(str.substring(idx + 1)));
return id;
} else {
return NodeId.newInstance(str, CommonNodeLabelsManager.WILDCARD_PORT);
}
}
private void sendIngoreBlacklistingStateToNodes() {
AMNodeEventType eventType =
ignoreBlacklisting ? AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED
: AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED;
for (NodeId nodeId : nodeMap.keySet()) {
sendEvent(new AMNodeEvent(nodeId, sourceId, eventType));
}
}
@Test
public void testReplaceLabelsOnNode() throws Exception {
// Successfully replace labels
dummyNodeLabelsManager
.addToCluserNodeLabels(ImmutableSet.of("x", "y", "Y"));
String[] args =
{ "-replaceLabelsOnNode",
"node1:8000,x node2:8000=y node3,x node4=Y",
"-directlyAccessNodeLabelStore" };
assertEquals(0, rmAdminCLI.run(args));
assertTrue(dummyNodeLabelsManager.getNodeLabels().containsKey(
NodeId.newInstance("node1", 8000)));
assertTrue(dummyNodeLabelsManager.getNodeLabels().containsKey(
NodeId.newInstance("node2", 8000)));
assertTrue(dummyNodeLabelsManager.getNodeLabels().containsKey(
NodeId.newInstance("node3", 0)));
assertTrue(dummyNodeLabelsManager.getNodeLabels().containsKey(
NodeId.newInstance("node4", 0)));
// no labels, should fail
args = new String[] { "-replaceLabelsOnNode" };
assertTrue(0 != rmAdminCLI.run(args));
// no labels, should fail
args =
new String[] { "-replaceLabelsOnNode", "-directlyAccessNodeLabelStore" };
assertTrue(0 != rmAdminCLI.run(args));
// no labels, should fail
args = new String[] { "-replaceLabelsOnNode", " " };
assertTrue(0 != rmAdminCLI.run(args));
args = new String[] { "-replaceLabelsOnNode", ", " };
assertTrue(0 != rmAdminCLI.run(args));
}
/**
* @param host Host.
* @param cpu Cpu count.
* @param mem Memory.
* @return Container.
*/
private Container createContainer(String host, int cpu, int mem) {
return Container.newInstance(
ContainerId.newContainerId(ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0),
ThreadLocalRandom.current().nextLong()),
NodeId.newInstance(host, 0),
"example.com",
new MockResource(mem, cpu),
Priority.newInstance(0),
null
);
}
private int replaceLabelsOnNodes(Map<NodeId, Set<String>> map)
throws IOException, YarnException {
if (directlyAccessNodeLabelStore) {
getNodeLabelManagerInstance(getConf()).replaceLabelsOnNode(map);
} else {
ResourceManagerAdministrationProtocol adminProtocol =
createAdminProtocol();
ReplaceLabelsOnNodeRequest request =
ReplaceLabelsOnNodeRequest.newInstance(map);
adminProtocol.replaceLabelsOnNode(request);
}
return 0;
}
public static void assertMapEquals(Map<NodeId, Set<String>> m1,
ImmutableMap<NodeId, Set<String>> m2) {
Assert.assertEquals(m1.size(), m2.size());
for (NodeId k : m1.keySet()) {
Assert.assertTrue(m2.containsKey(k));
assertCollectionEquals(m1.get(k), m2.get(k));
}
}