org.apache.hadoop.fs.ParentNotDirectoryException#org.apache.hadoop.ipc.RPC源码实例Demo

下面列出了org.apache.hadoop.fs.ParentNotDirectoryException#org.apache.hadoop.ipc.RPC 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: big-c   文件: TestClientToAMTokens.java
private void verifyNewVersionToken(final Configuration conf, final CustomAM am,
    Token<ClientToAMTokenIdentifier> token, MockRM rm) throws IOException,
    InterruptedException {
  UserGroupInformation ugi;
  ugi = UserGroupInformation.createRemoteUser("me");
  
  Token<ClientToAMTokenIdentifier> newToken = 
      new Token<ClientToAMTokenIdentifier>(
          new ClientToAMTokenIdentifierForTest(token.decodeIdentifier(), "message"),
          am.getClientToAMTokenSecretManager());
  newToken.setService(token.getService());
  
  ugi.addToken(newToken);

  ugi.doAs(new PrivilegedExceptionAction<Void>() {
    @Override
    public Void run() throws Exception {
      CustomProtocol client =
          (CustomProtocol) RPC.getProxy(CustomProtocol.class, 1L, am.address,
            conf);
      client.ping();
      Assert.assertTrue(am.pinged);
      return null;
    }
  });
}
 
源代码2 项目: hadoop-ozone   文件: HddsServerUtil.java
/**
 * Create a scm security client.
 * @param conf    - Ozone configuration.
 *
 * @return {@link SCMSecurityProtocol}
 * @throws IOException
 */
public static SCMSecurityProtocolClientSideTranslatorPB getScmSecurityClient(
    OzoneConfiguration conf) throws IOException {
  RPC.setProtocolEngine(conf, SCMSecurityProtocolPB.class,
      ProtobufRpcEngine.class);
  long scmVersion =
      RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class);
  InetSocketAddress address =
      getScmAddressForSecurityProtocol(conf);
  RetryPolicy retryPolicy =
      RetryPolicies.retryForeverWithFixedSleep(
          1000, TimeUnit.MILLISECONDS);
  return new SCMSecurityProtocolClientSideTranslatorPB(
      RPC.getProtocolProxy(SCMSecurityProtocolPB.class, scmVersion,
          address, UserGroupInformation.getCurrentUser(),
          conf, NetUtils.getDefaultSocketFactory(conf),
          Client.getRpcTimeout(conf), retryPolicy).getProxy());
}
 
源代码3 项目: RDFS   文件: UtilizationCollectorCached.java
protected void connect() {
  LOG.info("Connecting to collector...");
  try {
    conf.setStrings(UnixUserGroupInformation.UGI_PROPERTY_NAME,
                    new String[]{"hadoop", "hadoop"});
    rpcCollector =
          (UtilizationCollectorProtocol) RPC.getProxy(UtilizationCollectorProtocol.class,
                                           UtilizationCollectorProtocol.versionID,
                                           UtilizationCollector.getAddress(conf),
                                           conf);
  } catch (IOException e) {
    LOG.error("Cannot connect to UtilizationCollector server. Retry in " +
             DEFAULT_MIRROR_PERIOD + " milliseconds.");
    return;
  }
  LOG.info("Connection established");
}
 
源代码4 项目: hadoop-ozone   文件: Hadoop3OmTransport.java
public Hadoop3OmTransport(ConfigurationSource conf,
    UserGroupInformation ugi, String omServiceId) throws IOException {

  RPC.setProtocolEngine(OzoneConfiguration.of(conf),
      OzoneManagerProtocolPB.class,
      ProtobufRpcEngine.class);

  this.omFailoverProxyProvider = new OMFailoverProxyProvider(conf, ugi,
      omServiceId);

  int maxFailovers = conf.getInt(
      OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
      OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);

  this.rpcProxy = createRetryProxy(omFailoverProxyProvider, maxFailovers);
}
 
protected void startRpcServer() {
  Configuration conf = getConfig();
  try {
    server = new RPC.Builder(conf)
        .setProtocol(TezTaskUmbilicalProtocol.class)
        .setBindAddress("0.0.0.0")
        .setPort(0)
        .setInstance(this)
        .setNumHandlers(
            conf.getInt(TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT,
                TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT))
        .setSecretManager(jobTokenSecretManager).build();

    // Enable service authorization?
    if (conf.getBoolean(
        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
        false)) {
      refreshServiceAcls(conf, new TezAMPolicyProvider());
    }

    server.start();
    this.address = NetUtils.getConnectAddress(server);
  } catch (IOException e) {
    throw new TezUncheckedException(e);
  }
}
 
源代码6 项目: hadoop-ozone   文件: OzoneManager.java
/**
 * Creates a new instance of rpc server. If an earlier instance is already
 * running then returns the same.
 */
private RPC.Server getRpcServer(OzoneConfiguration conf) throws IOException {
  if (isOmRpcServerRunning) {
    return omRpcServer;
  }

  InetSocketAddress omNodeRpcAddr = OmUtils.getOmAddress(conf);

  final int handlerCount = conf.getInt(OZONE_OM_HANDLER_COUNT_KEY,
      OZONE_OM_HANDLER_COUNT_DEFAULT);
  RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class,
      ProtobufRpcEngine.class);
  this.omServerProtocol = new OzoneManagerProtocolServerSideTranslatorPB(
      this, omRatisServer, omClientProtocolMetrics, isRatisEnabled);

  BlockingService omService = newReflectiveBlockingService(omServerProtocol);

  return startRpcServer(configuration, omNodeRpcAddr,
      OzoneManagerProtocolPB.class, omService,
      handlerCount);
}
 
源代码7 项目: hadoop   文件: TestRMAuditLogger.java
/**
 * Test {@link RMAuditLogger} with IP set.
 */
@Test  
public void testRMAuditLoggerWithIP() throws Exception {
  Configuration conf = new Configuration();
  // start the IPC server
  Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
      .setInstance(new MyTestRPCServer()).setBindAddress("0.0.0.0")
      .setPort(0).setNumHandlers(5).setVerbose(true).build();
  server.start();

  InetSocketAddress addr = NetUtils.getConnectAddress(server);

  // Make a client connection and test the audit log
  TestProtocol proxy = (TestProtocol)RPC.getProxy(TestProtocol.class,
                         TestProtocol.versionID, addr, conf);
  // Start the testcase
  proxy.ping();

  server.stop();
}
 
源代码8 项目: RDFS   文件: CoronaTaskTracker.java
private synchronized void initializeTaskActionServer() throws IOException {
  // Create Hadoop RPC to serve JobTrackers
  actionServerAddr = NetUtils.createSocketAddr(getLocalHostname(), 0);
  int handlerCount = fConf.getInt(CORONA_TASK_TRACKER_HANDLER_COUNT_KEY, 10);
  this.actionServer = RPC.getServer
    (this, actionServerAddr.getHostName(), 0, handlerCount, false, fConf);
  this.actionServer.start();
  actionServerAddr = actionServer.getListenerAddress();
  LOG.info("TaskActionServer up at " +
    actionServerAddr.getHostName() + ":" + actionServerAddr.getPort());
  jobTrackerReporters = new ConcurrentHashMap<JobID, JobTrackerReporter>();
  String dir = fConf.get(JobTracker.MAPRED_SYSTEM_DIR_KEY,
      JobTracker.DEFAULT_MAPRED_SYSTEM_DIR);
  if (dir == null) {
    throw new IOException("Failed to get system directory");
  }
  systemDirectory = new Path(dir);
  systemFS = systemDirectory.getFileSystem(fConf);
}
 
源代码9 项目: RDFS   文件: CoronaJobTracker.java
public ParentHeartbeat(
  Configuration conf,
  TaskAttemptID attemptId,
  InetSocketAddress myAddr,
  InetSocketAddress parentAddr,
  String sessionId) throws IOException {
  this.attemptId = attemptId;
  this.myAddr = myAddr;
  this.parentAddr = parentAddr;
  this.sessionId = sessionId;
  long connectTimeout = RemoteJTProxy.getRemotJTTimeout(conf);
  parent = RPC.waitForProxy(
      InterCoronaJobTrackerProtocol.class,
      InterCoronaJobTrackerProtocol.versionID,
      parentAddr,
      conf,
      connectTimeout);
}
 
源代码10 项目: hadoop   文件: TestInterDatanodeProtocol.java
/** Test to verify that InterDatanode RPC timesout as expected when
 *  the server DN does not respond.
 */
@Test(expected=SocketTimeoutException.class)
public void testInterDNProtocolTimeout() throws Throwable {
  final Server server = new TestServer(1, true);
  server.start();

  final InetSocketAddress addr = NetUtils.getConnectAddress(server);
  DatanodeID fakeDnId = DFSTestUtil.getLocalDatanodeID(addr.getPort());
  DatanodeInfo dInfo = new DatanodeInfo(fakeDnId);
  InterDatanodeProtocol proxy = null;

  try {
    proxy = DataNode.createInterDataNodeProtocolProxy(
        dInfo, conf, 500, false);
    proxy.initReplicaRecovery(new RecoveringBlock(
        new ExtendedBlock("bpid", 1), null, 100));
    fail ("Expected SocketTimeoutException exception, but did not get.");
  } finally {
    if (proxy != null) {
      RPC.stopProxy(proxy);
    }
    server.stop();
  }
}
 
源代码11 项目: RDFS   文件: DataNode.java
void setupNS(Configuration conf, AbstractList<File> dataDirs) 
throws IOException {
  // get NN proxy
  DatanodeProtocol dnp = 
    (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
        DatanodeProtocol.versionID, nnAddr, conf);
  setNameNode(dnp);

  // handshake with NN
  NamespaceInfo nsInfo = handshake();
  setNamespaceInfo(nsInfo);
  synchronized(DataNode.this){
    setupNSStorage();
  }
  
  nsRegistration.setIpcPort(ipcServer.getListenerAddress().getPort());
  nsRegistration.setInfoPort(infoServer.getPort());
}
 
源代码12 项目: big-c   文件: TestIsMethodSupported.java
@Test
public void testNamenodeProtocol() throws IOException {
  NamenodeProtocol np =
      NameNodeProxies.createNonHAProxy(conf,
          nnAddress, NamenodeProtocol.class, UserGroupInformation.getCurrentUser(),
          true).getProxy();

  boolean exists = RpcClientUtil.isMethodSupported(np,
      NamenodeProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
      RPC.getProtocolVersion(NamenodeProtocolPB.class), "rollEditLog");

  assertTrue(exists);
  exists = RpcClientUtil.isMethodSupported(np,
      NamenodeProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
      RPC.getProtocolVersion(NamenodeProtocolPB.class), "bogusMethod");
  assertFalse(exists);
}
 
源代码13 项目: RDFS   文件: DataNode.java
public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
     DatanodeID datanodeid, Configuration conf, final int socketTimeout)
   throws IOException {
   InetSocketAddress addr = NetUtils.createSocketAddr(
datanodeid.getHost() + ":" + datanodeid.getIpcPort());
   if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
     InterDatanodeProtocol.LOG.info("InterDatanodeProtocol addr=" + addr);
   }
   UserGroupInformation ugi;
   try {
     ugi = UserGroupInformation.login(conf);
   } catch (LoginException le) {
     throw new RuntimeException("Couldn't login!");
   }
   return (InterDatanodeProtocol)RPC.getProxy(InterDatanodeProtocol.class,
       InterDatanodeProtocol.versionID, addr,
       ugi, conf,
       NetUtils.getDefaultSocketFactory(conf), socketTimeout);
 }
 
源代码14 项目: RDFS   文件: AvatarShell.java
/**
 * main() has some simple utility methods
 */
public static void main(String argv[]) throws Exception {
  AvatarShell shell = null;
  try {
    shell = new AvatarShell();
  } catch (RPC.VersionMismatch v) {
    System.err.println("Version Mismatch between client and server"
        + "... command aborted.");
    System.exit(-1);
  } catch (IOException e) {
    System.err.println("Bad connection to AvatarNode. command aborted.");
    System.exit(-1);
  }

  int res;
  try {
    res = ToolRunner.run(shell, argv);
  } finally {
    shell.close();
  }
  System.exit(res);
}
 
源代码15 项目: hadoop-gpu   文件: DFSAdmin.java
/**
 * Refresh the authorization policy on the {@link NameNode}.
 * @return exitcode 0 on success, non-zero on failure
 * @throws IOException
 */
public int refreshServiceAcl() throws IOException {
  // Get the current configuration
  Configuration conf = getConf();
  
  // Create the client
  RefreshAuthorizationPolicyProtocol refreshProtocol = 
    (RefreshAuthorizationPolicyProtocol) 
    RPC.getProxy(RefreshAuthorizationPolicyProtocol.class, 
                 RefreshAuthorizationPolicyProtocol.versionID, 
                 NameNode.getAddress(conf), getUGI(conf), conf,
                 NetUtils.getSocketFactory(conf, 
                                           RefreshAuthorizationPolicyProtocol.class));
  
  // Refresh the authorization policy in-effect
  refreshProtocol.refreshServiceAcl();
  
  return 0;
}
 
源代码16 项目: hadoop   文件: IPCLoggerChannel.java
protected QJournalProtocol createProxy() throws IOException {
  final Configuration confCopy = new Configuration(conf);
  
  // Need to set NODELAY or else batches larger than MTU can trigger 
  // 40ms nagling delays.
  confCopy.setBoolean(
      CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY,
      true);
  
  RPC.setProtocolEngine(confCopy,
      QJournalProtocolPB.class, ProtobufRpcEngine.class);
  return SecurityUtil.doAsLoginUser(
      new PrivilegedExceptionAction<QJournalProtocol>() {
        @Override
        public QJournalProtocol run() throws IOException {
          RPC.setProtocolEngine(confCopy,
              QJournalProtocolPB.class, ProtobufRpcEngine.class);
          QJournalProtocolPB pbproxy = RPC.getProxy(
              QJournalProtocolPB.class,
              RPC.getProtocolVersion(QJournalProtocolPB.class),
              addr, confCopy);
          return new QJournalProtocolTranslatorPB(pbproxy);
        }
      });
}
 
源代码17 项目: RDFS   文件: AvatarZKShell.java
private static AvatarProtocol createRPCAvatarnode(
    InetSocketAddress avatarNodeAddr, Configuration conf,
    UnixUserGroupInformation ugi) throws IOException {
  LOG.info("AvatarShell connecting to " + avatarNodeAddr);
  return (AvatarProtocol) RPC.getProxy(AvatarProtocol.class,
      AvatarProtocol.versionID, avatarNodeAddr, ugi, conf,
      NetUtils.getSocketFactory(conf, AvatarProtocol.class));
}
 
源代码18 项目: incubator-tez   文件: DAGClientServer.java
private Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf,
    int numHandlers,
    BlockingService blockingService, String portRangeConfig) throws IOException {
  RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class);
  RPC.Server server = new RPC.Builder(conf).setProtocol(pbProtocol)
      .setInstance(blockingService).setBindAddress(addr.getHostName())
      .setPort(addr.getPort()).setNumHandlers(numHandlers).setVerbose(false)
      .setPortRangeConfig(portRangeConfig).setSecretManager(secretManager)
      .build();
  server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, pbProtocol, blockingService);
  return server;
}
 
源代码19 项目: RDFS   文件: DataNode.java
private void checkVersion(String protocol, long clientVersion, 
  long serverVersion) throws IOException {
  if (serverVersion > clientVersion &&
     !ProtocolCompatible.isCompatibleClientDatanodeProtocol(
            clientVersion, serverVersion)) {
    throw new RPC.VersionIncompatible(protocol, clientVersion, serverVersion);
  }
}
 
public ZKFCProtocolClientSideTranslatorPB(
    InetSocketAddress addr, Configuration conf,
    SocketFactory socketFactory, int timeout) throws IOException {
  RPC.setProtocolEngine(conf, ZKFCProtocolPB.class,
      ProtobufRpcEngine.class);
  rpcProxy = RPC.getProxy(ZKFCProtocolPB.class,
      RPC.getProtocolVersion(ZKFCProtocolPB.class), addr,
      UserGroupInformation.getCurrentUser(), conf, socketFactory, timeout);
}
 
源代码21 项目: hadoop   文件: TestDoAsEffectiveUser.java
@Test(timeout=4000)
public void testRealUserSetup() throws IOException {
  final Configuration conf = new Configuration();
  conf.setStrings(DefaultImpersonationProvider.getTestProvider().
      getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1");
  configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
  Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
      .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
      .setNumHandlers(5).setVerbose(true).build();

  refreshConf(conf);
  try {
    server.start();

    UserGroupInformation realUserUgi = UserGroupInformation
        .createRemoteUser(REAL_USER_NAME);
    checkRemoteUgi(server, realUserUgi, conf);
    
    UserGroupInformation proxyUserUgi = UserGroupInformation.createProxyUserForTesting(
        PROXY_USER_NAME, realUserUgi, GROUP_NAMES);
    checkRemoteUgi(server, proxyUserUgi, conf);
  } catch (Exception e) {
    e.printStackTrace();
    Assert.fail();
  } finally {
    server.stop();
    if (proxy != null) {
      RPC.stopProxy(proxy);
    }
  }
}
 
public DatanodeProtocolClientSideTranslatorPB(InetSocketAddress nameNodeAddr,
    Configuration conf) throws IOException {
  RPC.setProtocolEngine(conf, DatanodeProtocolPB.class,
      ProtobufRpcEngine.class);
  UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
  rpcProxy = createNamenode(nameNodeAddr, conf, ugi);
}
 
源代码23 项目: hadoop   文件: GetGroupsBase.java
/**
 * Get a client of the {@link GetUserMappingsProtocol}.
 * @return A {@link GetUserMappingsProtocol} client proxy.
 * @throws IOException
 */
protected GetUserMappingsProtocol getUgmProtocol() throws IOException {
  GetUserMappingsProtocol userGroupMappingProtocol =
    RPC.getProxy(GetUserMappingsProtocol.class, 
        GetUserMappingsProtocol.versionID,
        getProtocolAddress(getConf()), UserGroupInformation.getCurrentUser(),
        getConf(), NetUtils.getSocketFactory(getConf(),
            GetUserMappingsProtocol.class));
  return userGroupMappingProtocol;
}
 
源代码24 项目: big-c   文件: NameNodeProxies.java
private static Object createNameNodeProxy(InetSocketAddress address,
    Configuration conf, UserGroupInformation ugi, Class<?> xface)
    throws IOException {
  RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine.class);
  Object proxy = RPC.getProxy(xface, RPC.getProtocolVersion(xface), address,
      ugi, conf, NetUtils.getDefaultSocketFactory(conf));
  return proxy;
}
 
@Override // ProtocolMetaInterface
public boolean isMethodSupported(String methodName)
    throws IOException {
  return RpcClientUtil.isMethodSupported(rpcProxy, DatanodeProtocolPB.class,
      RPC.RpcKind.RPC_PROTOCOL_BUFFER,
      RPC.getProtocolVersion(DatanodeProtocolPB.class), methodName);
}
 
源代码26 项目: tez   文件: TezTaskCommunicatorImpl.java
protected void startRpcServer() {
  try {
    JobTokenSecretManager jobTokenSecretManager =
        new JobTokenSecretManager();
    jobTokenSecretManager.addTokenForJob(tokenIdentifier, sessionToken);

    server = new RPC.Builder(conf)
        .setProtocol(TezTaskUmbilicalProtocol.class)
        .setBindAddress("0.0.0.0")
        .setPort(0)
        .setInstance(taskUmbilical)
        .setNumHandlers(
            conf.getInt(TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT,
                TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT))
        .setPortRangeConfig(TezConfiguration.TEZ_AM_TASK_AM_PORT_RANGE)
        .setSecretManager(jobTokenSecretManager).build();

    // Enable service authorization?
    if (conf.getBoolean(
        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
        false)) {
      refreshServiceAcls(conf, new TezAMPolicyProvider());
    }

    server.start();
    InetSocketAddress serverBindAddress = NetUtils.getConnectAddress(server);
    this.address = NetUtils.createSocketAddrForHost(
        serverBindAddress.getAddress().getCanonicalHostName(),
        serverBindAddress.getPort());
    LOG.info("Instantiated TezTaskCommunicator RPC at " + this.address);
  } catch (IOException e) {
    throw new TezUncheckedException(e);
  }
}
 
源代码27 项目: big-c   文件: ZKFCProtocolServerSideTranslatorPB.java
@Override
public ProtocolSignature getProtocolSignature(String protocol,
    long clientVersion, int clientMethodsHash) throws IOException {
  if (!protocol.equals(RPC.getProtocolName(ZKFCProtocolPB.class))) {
    throw new IOException("Serverside implements " +
        RPC.getProtocolName(ZKFCProtocolPB.class) +
        ". The following requested protocol is unknown: " + protocol);
  }

  return ProtocolSignature.getProtocolSignature(clientMethodsHash,
      RPC.getProtocolVersion(ZKFCProtocolPB.class),
      HAServiceProtocolPB.class);
}
 
源代码28 项目: attic-apex-core   文件: RecoverableRpcProxy.java
@Override
public void close()
{
  LOG.debug("Closing RPC connection {}", lastConnectURI);
  if (umbilical != null) {
    RPC.stopProxy(umbilical);
    umbilical = null;
  }
}
 
源代码29 项目: hadoop   文件: TestDoAsEffectiveUser.java
@Test
public void testRealUserGroupNotSpecified() throws IOException {
  final Configuration conf = new Configuration();
  configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
  Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
      .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
      .setNumHandlers(2).setVerbose(false).build();

  try {
    server.start();

    final InetSocketAddress addr = NetUtils.getConnectAddress(server);

    UserGroupInformation realUserUgi = UserGroupInformation
        .createRemoteUser(REAL_USER_NAME);

    UserGroupInformation proxyUserUgi = UserGroupInformation
        .createProxyUserForTesting(PROXY_USER_NAME, realUserUgi, GROUP_NAMES);
    String retVal = proxyUserUgi
        .doAs(new PrivilegedExceptionAction<String>() {
          @Override
          public String run() throws IOException {
            proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
                TestProtocol.versionID, addr, conf);
            String ret = proxy.aMethod();
            return ret;
          }
        });

    Assert.fail("The RPC must have failed " + retVal);
  } catch (Exception e) {
    e.printStackTrace();
  } finally {
    server.stop();
    if (proxy != null) {
      RPC.stopProxy(proxy);
    }
  }
}
 
源代码30 项目: ratis   文件: Proxy.java
public static <PROTOCOL> PROTOCOL getProxy(
    Class<PROTOCOL> clazz, String addressStr, Configuration conf)
    throws IOException {
  RPC.setProtocolEngine(conf, clazz, ProtobufRpcEngineShaded.class);
  return RPC.getProxy(clazz, RPC.getProtocolVersion(clazz),
      org.apache.ratis.util.NetUtils.createSocketAddr(addressStr),
      UserGroupInformation.getCurrentUser(),
      conf, NetUtils.getSocketFactory(conf, clazz));
}