类org.apache.hadoop.security.token.Token源码实例Demo

下面列出了怎么用org.apache.hadoop.security.token.Token的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hadoop   文件: TestUserGroupInformation.java
/**
 * In some scenario, such as HA, delegation tokens are associated with a
 * logical name. The tokens are cloned and are associated with the
 * physical address of the server where the service is provided.
 * This test ensures cloned delegated tokens are locally used
 * and are not returned in {@link UserGroupInformation#getCredentials()}
 */
@Test
public void testPrivateTokenExclusion() throws Exception  {
  UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
  TestTokenIdentifier tokenId = new TestTokenIdentifier();
  Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(
          tokenId.getBytes(), "password".getBytes(),
          tokenId.getKind(), null);
  ugi.addToken(new Text("regular-token"), token);

  // Now add cloned private token
  ugi.addToken(new Text("private-token"), new Token.PrivateToken<TestTokenIdentifier>(token));
  ugi.addToken(new Text("private-token1"), new Token.PrivateToken<TestTokenIdentifier>(token));

  // Ensure only non-private tokens are returned
  Collection<Token<? extends TokenIdentifier>> tokens = ugi.getCredentials().getAllTokens();
  assertEquals(1, tokens.size());
}
 
源代码2 项目: hadoop   文件: WebHdfsFileSystem.java
protected synchronized Token<?> getDelegationToken() throws IOException {
  if (canRefreshDelegationToken && delegationToken == null) {
    Token<?> token = tokenSelector.selectToken(
        new Text(getCanonicalServiceName()), ugi.getTokens());
    // ugi tokens are usually indicative of a task which can't
    // refetch tokens.  even if ugi has credentials, don't attempt
    // to get another token to match hdfs/rpc behavior
    if (token != null) {
      LOG.debug("Using UGI token: " + token);
      canRefreshDelegationToken = false; 
    } else {
      token = getDelegationToken(null);
      if (token != null) {
        LOG.debug("Fetched new token: " + token);
      } else { // security is disabled
        canRefreshDelegationToken = false;
      }
    }
    setDelegationToken(token);
  }
  return delegationToken;
}
 
private void cancelNameNodeToken(final Token<? extends TokenIdentifier> t,
    String userToProxy) throws HadoopSecurityManagerException {
  try {
    getProxiedUser(userToProxy).doAs(new PrivilegedExceptionAction<Void>() {
      @Override
      public Void run() throws Exception {
        cancelToken(t);
        return null;
      }

      private void cancelToken(Token<?> nt) throws IOException,
          InterruptedException {
        nt.cancel(conf);
      }
    });
  } catch (Exception e) {
    throw new HadoopSecurityManagerException("Failed to cancel token. "
        + e.getMessage() + e.getCause(), e);
  }
}
 
源代码4 项目: hbase   文件: ClientTokenUtil.java
/**
 * Obtain and return an authentication token for the current user.
 * @param conn The HBase cluster connection
 * @throws IOException if a remote error or serialization problem occurs.
 * @return the authentication token instance
 */
@InterfaceAudience.Private
static Token<AuthenticationTokenIdentifier> obtainToken(
    Connection conn) throws IOException {
  Table meta = null;
  try {
    injectFault();

    meta = conn.getTable(TableName.META_TABLE_NAME);
    CoprocessorRpcChannel rpcChannel = meta.coprocessorService(
            HConstants.EMPTY_START_ROW);
    AuthenticationProtos.AuthenticationService.BlockingInterface service =
        AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel);
    AuthenticationProtos.GetAuthenticationTokenResponse response =
            service.getAuthenticationToken(null,
        AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance());

    return toToken(response.getToken());
  } catch (ServiceException se) {
    throw ProtobufUtil.handleRemoteException(se);
  } finally {
    if (meta != null) {
      meta.close();
    }
  }
}
 
源代码5 项目: big-c   文件: TcpPeerServer.java
public static Peer peerFromSocketAndKey(
      SaslDataTransferClient saslClient, Socket s,
      DataEncryptionKeyFactory keyFactory,
      Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
      throws IOException {
  Peer peer = null;
  boolean success = false;
  try {
    peer = peerFromSocket(s);
    peer = saslClient.peerSend(peer, keyFactory, blockToken, datanodeId);
    success = true;
    return peer;
  } finally {
    if (!success) {
      IOUtils.cleanup(null, peer);
    }
  }
}
 
源代码6 项目: hadoop   文件: TestClientRMTokens.java
private long renewDelegationToken(final UserGroupInformation loggedInUser,
    final ApplicationClientProtocol clientRMService,
    final org.apache.hadoop.yarn.api.records.Token dToken)
    throws IOException, InterruptedException {
  long nextExpTime = loggedInUser.doAs(new PrivilegedExceptionAction<Long>() {
    @Override
    public Long run() throws YarnException, IOException {
      RenewDelegationTokenRequest request = Records
          .newRecord(RenewDelegationTokenRequest.class);
      request.setDelegationToken(dToken);
      return clientRMService.renewDelegationToken(request)
          .getNextExpirationTime();
    }
  });
  return nextExpTime;
}
 
源代码7 项目: big-c   文件: DataXceiver.java
@Override
public void transferBlock(final ExtendedBlock blk,
    final Token<BlockTokenIdentifier> blockToken,
    final String clientName,
    final DatanodeInfo[] targets,
    final StorageType[] targetStorageTypes) throws IOException {
  checkAccess(socketOut, true, blk, blockToken,
      Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
  previousOpClientName = clientName;
  updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);

  final DataOutputStream out = new DataOutputStream(
      getOutputStream());
  try {
    datanode.transferReplicaForPipelineRecovery(blk, targets,
        targetStorageTypes, clientName);
    writeResponse(Status.SUCCESS, null, out);
  } catch (IOException ioe) {
    LOG.info("transferBlock " + blk + " received exception " + ioe);
    incrDatanodeNetworkErrors();
    throw ioe;
  } finally {
    IOUtils.closeStream(out);
  }
}
 
@Before
public void initialize() throws Exception {
  startHACluster(0, false, false, true);
  attemptId = this.cluster.createFakeApplicationAttemptId();
  amClient = ClientRMProxy
      .createRMProxy(this.conf, ApplicationMasterProtocol.class);

  Token<AMRMTokenIdentifier> appToken =
      this.cluster.getResourceManager().getRMContext()
        .getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId);
  appToken.setService(ClientRMProxy.getAMRMTokenService(conf));
  UserGroupInformation.setLoginUser(UserGroupInformation
      .createRemoteUser(UserGroupInformation.getCurrentUser()
          .getUserName()));
  UserGroupInformation.getCurrentUser().addToken(appToken);
  syncToken(appToken);
}
 
/**
 * Cancel an existing delegation token.
 *
 * @param token delegation token
 */
@Override
public void cancelDelegationToken(Token<OzoneTokenIdentifier> token)
    throws OMException {
  CancelDelegationTokenRequestProto req = CancelDelegationTokenRequestProto
      .newBuilder()
      .setToken(OMPBHelper.convertToTokenProto(token))
      .build();

  OMRequest omRequest = createOMRequest(Type.CancelDelegationToken)
      .setCancelDelegationTokenRequest(req)
      .build();

  final CancelDelegationTokenResponseProto resp;
  try {
    handleError(submitRequest(omRequest));
  } catch (IOException e) {
    if(e instanceof OMException) {
      throw (OMException)e;
    }
    throw new OMException("Cancel delegation token failed.", e,
        TOKEN_ERROR_OTHER);
  }
}
 
源代码10 项目: hadoop   文件: DFSClient.java
@Override // RemotePeerFactory
public Peer newConnectedPeer(InetSocketAddress addr,
    Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
    throws IOException {
  Peer peer = null;
  boolean success = false;
  Socket sock = null;
  try {
    sock = socketFactory.createSocket();
    NetUtils.connect(sock, addr,
      getRandomLocalInterfaceAddr(),
      dfsClientConf.socketTimeout);
    peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this,
        blockToken, datanodeId);
    peer.setReadTimeout(dfsClientConf.socketTimeout);
    success = true;
    return peer;
  } finally {
    if (!success) {
      IOUtils.cleanup(LOG, peer);
      IOUtils.closeSocket(sock);
    }
  }
}
 
源代码11 项目: hadoop   文件: TestBinaryTokenFile.java
private static void createBinaryTokenFile(Configuration conf) {
  // Fetch delegation tokens and store in binary token file.
  try {
    Credentials cred1 = new Credentials();
    Credentials cred2 = new Credentials();
    TokenCache.obtainTokensForNamenodesInternal(cred1, new Path[] { p1 },
        conf);
    for (Token<? extends TokenIdentifier> t : cred1.getAllTokens()) {
      cred2.addToken(new Text(DELEGATION_TOKEN_KEY), t);
    }
    DataOutputStream os = new DataOutputStream(new FileOutputStream(
        binaryTokenFileName.toString()));
    try {
      cred2.writeTokenStorageToStream(os);
    } finally {
      os.close();
    }
  } catch (IOException e) {
    Assert.fail("Exception " + e);
  }
}
 
源代码12 项目: Bats   文件: StramClientUtils.java
public void addRMDelegationToken(final String renewer, final Credentials credentials) throws IOException, YarnException
{
  // Get the ResourceManager delegation rmToken
  final org.apache.hadoop.yarn.api.records.Token rmDelegationToken = clientRM.getRMDelegationToken(new Text(renewer));

  Token<RMDelegationTokenIdentifier> token;
  // TODO: Use the utility method getRMDelegationTokenService in ClientRMProxy to remove the separate handling of
  // TODO: HA and non-HA cases when hadoop dependency is changed to hadoop 2.4 or above
  if (ConfigUtils.isRMHAEnabled(conf)) {
    LOG.info("Yarn Resource Manager HA is enabled");
    token = getRMHAToken(rmDelegationToken);
  } else {
    LOG.info("Yarn Resource Manager HA is not enabled");
    InetSocketAddress rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
        YarnConfiguration.DEFAULT_RM_ADDRESS,
        YarnConfiguration.DEFAULT_RM_PORT);

    token = ConverterUtils.convertFromYarn(rmDelegationToken, rmAddress);
  }

  LOG.info("RM dt {}", token);

  credentials.addToken(token.getService(), token);
}
 
源代码13 项目: hbase   文件: AsyncClusterConnectionImpl.java
@Override
public CompletableFuture<Boolean> bulkLoad(TableName tableName,
  List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken,
  String bulkToken, boolean copyFiles, List<String> clusterIds, boolean replicate) {
  return callerFactory.<Boolean> single().table(tableName).row(row)
    .action((controller, loc, stub) -> ConnectionUtils
      .<Void, BulkLoadHFileRequest, BulkLoadHFileResponse, Boolean> call(controller, loc, stub,
        null,
        (rn, nil) -> RequestConverter.buildBulkLoadHFileRequest(familyPaths, rn, assignSeqNum,
          userToken, bulkToken, copyFiles, clusterIds, replicate),
        (s, c, req, done) -> s.bulkLoadHFile(c, req, done), (c, resp) -> resp.getLoaded()))
    .call();
}
 
@Override
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
    throws IOException {
  GetDelegationTokenRequestProto req = GetDelegationTokenRequestProto
      .newBuilder()
      .setRenewer(renewer.toString())
      .build();
  try {
    GetDelegationTokenResponseProto resp = rpcProxy.getDelegationToken(null, req);
    return resp.hasToken() ? PBHelper.convertDelegationToken(resp.getToken())
        : null;
  } catch (ServiceException e) {
    throw ProtobufHelper.getRemoteException(e);
  }
}
 
源代码15 项目: hadoop   文件: MockAM.java
public AllocateResponse allocate(AllocateRequest allocateRequest)
          throws Exception {
  UserGroupInformation ugi =
      UserGroupInformation.createRemoteUser(attemptId.toString());
  Token<AMRMTokenIdentifier> token =
      context.getRMApps().get(attemptId.getApplicationId())
          .getRMAppAttempt(attemptId).getAMRMToken();
  ugi.addTokenIdentifier(token.decodeIdentifier());
  lastResponse = doAllocateAs(ugi, allocateRequest);
  return lastResponse;
}
 
源代码16 项目: big-c   文件: MockAM.java
public AllocateResponse allocate(AllocateRequest allocateRequest)
          throws Exception {
  UserGroupInformation ugi =
      UserGroupInformation.createRemoteUser(attemptId.toString());
  Token<AMRMTokenIdentifier> token =
      context.getRMApps().get(attemptId.getApplicationId())
          .getRMAppAttempt(attemptId).getAMRMToken();
  ugi.addTokenIdentifier(token.decodeIdentifier());
  lastResponse = doAllocateAs(ugi, allocateRequest);
  return lastResponse;
}
 
源代码17 项目: hadoop   文件: HAUtil.java
/**
 * Parse the file system URI out of the provided token.
 */
public static URI getServiceUriFromToken(final String scheme, Token<?> token) {
  String tokStr = token.getService().toString();
  final String prefix = buildTokenServicePrefixForLogicalUri(scheme);
  if (tokStr.startsWith(prefix)) {
    tokStr = tokStr.replaceFirst(prefix, "");
  }
  return URI.create(scheme + "://" + tokStr);
}
 
源代码18 项目: hadoop   文件: MockRMWithCustomAMLauncher.java
@Override
protected ApplicationMasterLauncher createAMLauncher() {
  return new ApplicationMasterLauncher(getRMContext()) {
    @Override
    protected Runnable createRunnableLauncher(RMAppAttempt application,
        AMLauncherEventType event) {
      return new AMLauncher(context, application, event, getConfig()) {
        @Override
        protected ContainerManagementProtocol getContainerMgrProxy(
            ContainerId containerId) {
          return containerManager;
        }
        @Override
        protected Token<AMRMTokenIdentifier> createAndSetAMRMToken() {
          Token<AMRMTokenIdentifier> amRmToken =
              super.createAndSetAMRMToken();
          InetSocketAddress serviceAddr =
              getConfig().getSocketAddr(
                YarnConfiguration.RM_SCHEDULER_ADDRESS,
                YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
                YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
          SecurityUtil.setTokenService(amRmToken, serviceAddr);
          return amRmToken;
        }
      };
    }
  };
}
 
源代码19 项目: big-c   文件: DelegationTokenManager.java
@SuppressWarnings("unchecked")
public void cancelToken(
    Token<? extends AbstractDelegationTokenIdentifier> token,
    String canceler) throws IOException {
  canceler = (canceler != null) ? canceler :
             verifyToken(token).getShortUserName();
  secretManager.cancelToken(token, canceler);
}
 
源代码20 项目: hadoop   文件: TestRecovery.java
private MapTaskImpl getMockMapTask(long clusterTimestamp, EventHandler eh) {

    ApplicationId appId = ApplicationId.newInstance(clusterTimestamp, 1);
    JobId jobId = MRBuilderUtils.newJobId(appId, 1);

    int partitions = 2;

    Path remoteJobConfFile = mock(Path.class);
    JobConf conf = new JobConf();
    TaskAttemptListener taskAttemptListener = mock(TaskAttemptListener.class);
    Token<JobTokenIdentifier> jobToken =
        (Token<JobTokenIdentifier>) mock(Token.class);
    Credentials credentials = null;
    Clock clock = new SystemClock();
    int appAttemptId = 3;
    MRAppMetrics metrics = mock(MRAppMetrics.class);
    Resource minContainerRequirements = mock(Resource.class);
    when(minContainerRequirements.getMemory()).thenReturn(1000);

    ClusterInfo clusterInfo = mock(ClusterInfo.class);
    AppContext appContext = mock(AppContext.class);
    when(appContext.getClusterInfo()).thenReturn(clusterInfo);

    TaskSplitMetaInfo taskSplitMetaInfo = mock(TaskSplitMetaInfo.class);
    MapTaskImpl mapTask = new MapTaskImpl(jobId, partitions,
        eh, remoteJobConfFile, conf,
        taskSplitMetaInfo, taskAttemptListener, jobToken, credentials, clock,
        appAttemptId, metrics, appContext);
    return mapTask;
  }
 
源代码21 项目: incubator-tez   文件: TezClientUtils.java
@Private
public static DAGClientAMProtocolBlockingPB getAMProxy(final Configuration conf, String amHost,
    int amRpcPort, org.apache.hadoop.yarn.api.records.Token clientToAMToken) throws IOException {

  final InetSocketAddress serviceAddr = new InetSocketAddress(amHost, amRpcPort);
  UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(UserGroupInformation
      .getCurrentUser().getUserName());
  if (clientToAMToken != null) {
    Token<ClientToAMTokenIdentifier> token = ConverterUtils.convertFromYarn(clientToAMToken,
        serviceAddr);
    userUgi.addToken(token);
  }
  if (LOG.isDebugEnabled()) {
    LOG.debug("Connecting to Tez AM at " + serviceAddr);
  }
  DAGClientAMProtocolBlockingPB proxy = null;
  try {
    proxy = userUgi.doAs(new PrivilegedExceptionAction<DAGClientAMProtocolBlockingPB>() {
      @Override
      public DAGClientAMProtocolBlockingPB run() throws IOException {
        RPC.setProtocolEngine(conf, DAGClientAMProtocolBlockingPB.class, ProtobufRpcEngine.class);
        return (DAGClientAMProtocolBlockingPB) RPC.getProxy(DAGClientAMProtocolBlockingPB.class,
            0, serviceAddr, conf);
      }
    });
  } catch (InterruptedException e) {
    throw new IOException("Failed to connect to AM", e);
  }
  return proxy;
}
 
源代码22 项目: big-c   文件: TokenAspect.java
synchronized void initDelegationToken(UserGroupInformation ugi) {
  Token<?> token = selectDelegationToken(ugi);
  if (token != null) {
    LOG.debug("Found existing DT for " + token.getService());
    fs.setDelegationToken(token);
    hasInitedToken = true;
  }
}
 
源代码23 项目: hadoop   文件: AMLauncher.java
@VisibleForTesting
protected Token<AMRMTokenIdentifier> createAndSetAMRMToken() {
  Token<AMRMTokenIdentifier> amrmToken =
      this.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
        application.getAppAttemptId());
  ((RMAppAttemptImpl)application).setAMRMToken(amrmToken);
  return amrmToken;
}
 
源代码24 项目: big-c   文件: TestZKDelegationTokenSecretManager.java
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testStopThreads() throws Exception {
  DelegationTokenManager tm1 = null;
  String connectString = zkServer.getConnectString();

  // let's make the update interval short and the shutdown interval
  // comparatively longer, so if the update thread runs after shutdown,
  // it will cause an error.
  final long updateIntervalSeconds = 1;
  final long shutdownTimeoutMillis = updateIntervalSeconds * 1000 * 5;
  Configuration conf = getSecretConf(connectString);
  conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, updateIntervalSeconds);
  conf.setLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, updateIntervalSeconds);
  conf.setLong(DelegationTokenManager.RENEW_INTERVAL, updateIntervalSeconds);

  conf.setLong(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT, shutdownTimeoutMillis);
  tm1 = new DelegationTokenManager(conf, new Text("foo"));
  tm1.init();

  Token<DelegationTokenIdentifier> token =
    (Token<DelegationTokenIdentifier>)
  tm1.createToken(UserGroupInformation.getCurrentUser(), "foo");
  Assert.assertNotNull(token);

  AbstractDelegationTokenSecretManager sm = tm1.getDelegationTokenSecretManager();
  ZKDelegationTokenSecretManager zksm = (ZKDelegationTokenSecretManager)sm;
  ExecutorService es = zksm.getListenerThreadPool();
  es.submit(new Callable<Void>() {
    public Void call() throws Exception {
      Thread.sleep(shutdownTimeoutMillis * 2); // force this to be shutdownNow
      return null;
    }
  });

  tm1.destroy();
}
 
源代码25 项目: gcp-token-broker   文件: PingServer.java
private static Token<BrokerTokenIdentifier> getTokenBTI(String sessionToken) throws IOException {
    BrokerTokenIdentifier identifier = getBTI(sessionToken);
    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
    DataOutputStream data = new DataOutputStream(byteArrayOutputStream);
    identifier.write(data);
    return new Token<>(byteArrayOutputStream.toByteArray(), new byte[0], BrokerTokenIdentifier.KIND, SERVICE);
}
 
源代码26 项目: incubator-tez   文件: ShuffleUtils.java
public static SecretKey getJobTokenSecretFromTokenBytes(ByteBuffer meta)
    throws IOException {
  DataInputByteBuffer in = new DataInputByteBuffer();
  in.reset(meta);
  Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
  jt.readFields(in);
  SecretKey sk = JobTokenSecretManager.createSecretKey(jt.getPassword());
  return sk;
}
 
源代码27 项目: big-c   文件: BlockTokenSecretManager.java
/** Generate an block token for current user */
public Token<BlockTokenIdentifier> generateToken(ExtendedBlock block,
    EnumSet<AccessMode> modes) throws IOException {
  UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
  String userID = (ugi == null ? null : ugi.getShortUserName());
  return generateToken(userID, block, modes);
}
 
源代码28 项目: hadoop   文件: TestDelegationTokensWithHA.java
@Test(timeout = 300000)
public void testDelegationTokenWithDoAs() throws Exception {
  final Token<DelegationTokenIdentifier> token =
      getDelegationToken(fs, "JobTracker");
  final UserGroupInformation longUgi = UserGroupInformation
      .createRemoteUser("JobTracker/[email protected]");
  final UserGroupInformation shortUgi = UserGroupInformation
      .createRemoteUser("JobTracker");
  longUgi.doAs(new PrivilegedExceptionAction<Void>() {
    @Override
    public Void run() throws Exception {
      // try renew with long name
      token.renew(conf);
      return null;
    }
  });
  shortUgi.doAs(new PrivilegedExceptionAction<Void>() {
    @Override
    public Void run() throws Exception {
      token.renew(conf);
      return null;
    }
  });
  longUgi.doAs(new PrivilegedExceptionAction<Void>() {
    @Override
    public Void run() throws Exception {
      token.cancel(conf);;
      return null;
    }
  });
}
 
源代码29 项目: big-c   文件: BlockStorageLocationUtil.java
VolumeBlockLocationCallable(Configuration configuration,
    DatanodeInfo datanode, String poolId, long []blockIds,
    List<Token<BlockTokenIdentifier>> dnTokens, int timeout, 
    boolean connectToDnViaHostname, Span parentSpan) {
  this.configuration = configuration;
  this.timeout = timeout;
  this.datanode = datanode;
  this.poolId = poolId;
  this.blockIds = blockIds;
  this.dnTokens = dnTokens;
  this.connectToDnViaHostname = connectToDnViaHostname;
  this.parentSpan = parentSpan;
}
 
源代码30 项目: big-c   文件: ReduceTaskImpl.java
public ReduceTaskImpl(JobId jobId, int partition,
    EventHandler eventHandler, Path jobFile, JobConf conf,
    int numMapTasks, TaskAttemptListener taskAttemptListener,
    Token<JobTokenIdentifier> jobToken,
    Credentials credentials, Clock clock,
    int appAttemptId, MRAppMetrics metrics, AppContext appContext) {
  super(jobId, TaskType.REDUCE, partition, eventHandler, jobFile, conf,
      taskAttemptListener, jobToken, credentials, clock,
      appAttemptId, metrics, appContext);
  this.numMapTasks = numMapTasks;
}
 
 类方法
 同包方法