下面列出了怎么用org.apache.hadoop.util.ServicePlugin的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Starts all the service plugins which are configured using
* OzoneConfigKeys.HDDS_DATANODE_PLUGINS_KEY.
*/
private void startPlugins() {
try {
plugins = conf.getInstances(HDDS_DATANODE_PLUGINS_KEY,
ServicePlugin.class);
} catch (RuntimeException e) {
String pluginsValue = conf.get(HDDS_DATANODE_PLUGINS_KEY);
LOG.error("Unable to load HDDS DataNode plugins. " +
"Specified list of plugins: {}",
pluginsValue, e);
throw e;
}
for (ServicePlugin plugin : plugins) {
try {
plugin.start(this);
LOG.info("Started plug-in {}", plugin);
} catch (Throwable t) {
LOG.warn("ServicePlugin {} could not be started", plugin, t);
}
}
}
@Override
public void stop() {
if (!isStopped.get()) {
isStopped.set(true);
if (plugins != null) {
for (ServicePlugin plugin : plugins) {
try {
plugin.stop();
LOG.info("Stopped plug-in {}", plugin);
} catch (Throwable t) {
LOG.warn("ServicePlugin {} could not be stopped", plugin, t);
}
}
}
if (datanodeStateMachine != null) {
datanodeStateMachine.stopDaemon();
}
if (httpServer != null) {
try {
httpServer.stop();
} catch (Exception e) {
LOG.error("Stopping HttpServer is failed.", e);
}
}
}
}
/** Start the services common to active and standby states */
private void startCommonServices(Configuration conf) throws IOException {
namesystem.startCommonServices(conf, haContext);
registerNNSMXBean();
if (NamenodeRole.NAMENODE != role) {
startHttpServer(conf);
httpServer.setNameNodeAddress(getNameNodeAddress());
httpServer.setFSImage(getFSImage());
}
rpcServer.start();
plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY,
ServicePlugin.class);
for (ServicePlugin p: plugins) {
try {
p.start(this);
} catch (Throwable t) {
LOG.warn("ServicePlugin " + p + " could not be started", t);
}
}
LOG.info(getRole() + " RPC up at: " + rpcServer.getRpcAddress());
if (rpcServer.getServiceRpcAddress() != null) {
LOG.info(getRole() + " service RPC up at: "
+ rpcServer.getServiceRpcAddress());
}
}
/** Start the services common to active and standby states */
private void startCommonServices(Configuration conf) throws IOException {
namesystem.startCommonServices(conf, haContext);
registerNNSMXBean();
if (NamenodeRole.NAMENODE != role) {
startHttpServer(conf);
httpServer.setNameNodeAddress(getNameNodeAddress());
httpServer.setFSImage(getFSImage());
}
rpcServer.start();
plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY,
ServicePlugin.class);
for (ServicePlugin p: plugins) {
try {
p.start(this);
} catch (Throwable t) {
LOG.warn("ServicePlugin " + p + " could not be started", t);
}
}
LOG.info(getRole() + " RPC up at: " + rpcServer.getRpcAddress());
if (rpcServer.getServiceRpcAddress() != null) {
LOG.info(getRole() + " service RPC up at: "
+ rpcServer.getServiceRpcAddress());
}
}
@Override
public void close() {
if (plugins != null) {
for (ServicePlugin plugin : plugins) {
try {
plugin.close();
} catch (Throwable t) {
LOG.warn("ServicePlugin {} could not be closed", plugin, t);
}
}
}
}
@Before
public void setUp() {
testDir = GenericTestUtils.getRandomizedTestDir();
conf = new OzoneConfiguration();
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getPath());
conf.setClass(OzoneConfigKeys.HDDS_DATANODE_PLUGINS_KEY, MockService.class,
ServicePlugin.class);
String volumeDir = testDir + "/disk1";
conf.set(DFSConfigKeysLegacy.DFS_DATANODE_DATA_DIR_KEY, volumeDir);
}
@BeforeClass
public static void setUp() throws Exception {
testDir = GenericTestUtils.getRandomizedTestDir();
conf = new OzoneConfiguration();
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getPath());
//conf.set(ScmConfigKeys.OZONE_SCM_NAMES, "localhost");
String volumeDir = testDir + "/disk1";
conf.set(DFSConfigKeysLegacy.DFS_DATANODE_DATA_DIR_KEY, volumeDir);
conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
conf.setClass(OzoneConfigKeys.HDDS_DATANODE_PLUGINS_KEY,
TestHddsDatanodeService.MockService.class,
ServicePlugin.class);
securityConfig = new SecurityConfig(conf);
service = HddsDatanodeService.createHddsDatanodeService(args);
dnLogs = GenericTestUtils.LogCapturer.captureLogs(getLogger());
callQuietly(() -> {
service.start(conf);
return null;
});
callQuietly(() -> {
service.initializeCertificateClient(conf);
return null;
});
certCodec = new CertificateCodec(securityConfig, DN_COMPONENT);
keyCodec = new KeyCodec(securityConfig, DN_COMPONENT);
dnLogs.clearOutput();
privateKey = service.getCertificateClient().getPrivateKey();
publicKey = service.getCertificateClient().getPublicKey();
X509Certificate x509Certificate = null;
x509Certificate = KeyStoreTestUtil.generateCertificate(
"CN=Test", new KeyPair(publicKey, privateKey), 10,
securityConfig.getSignatureAlgo());
certHolder = new X509CertificateHolder(x509Certificate.getEncoded());
}
private void stopCommonServices() {
if(rpcServer != null) rpcServer.stop();
if(namesystem != null) namesystem.close();
if (pauseMonitor != null) pauseMonitor.stop();
if (plugins != null) {
for (ServicePlugin p : plugins) {
try {
p.stop();
} catch (Throwable t) {
LOG.warn("ServicePlugin " + p + " could not be stopped", t);
}
}
}
stopHttpServer();
}
private void startPlugins(Configuration conf) {
plugins = conf.getInstances(DFS_DATANODE_PLUGINS_KEY, ServicePlugin.class);
for (ServicePlugin p: plugins) {
try {
p.start(this);
LOG.info("Started plug-in " + p);
} catch (Throwable t) {
LOG.warn("ServicePlugin " + p + " could not be started", t);
}
}
}
private void stopCommonServices() {
if(rpcServer != null) rpcServer.stop();
if(namesystem != null) namesystem.close();
if (pauseMonitor != null) pauseMonitor.stop();
if (plugins != null) {
for (ServicePlugin p : plugins) {
try {
p.stop();
} catch (Throwable t) {
LOG.warn("ServicePlugin " + p + " could not be stopped", t);
}
}
}
stopHttpServer();
}
private void startPlugins(Configuration conf) {
plugins = conf.getInstances(DFS_DATANODE_PLUGINS_KEY, ServicePlugin.class);
for (ServicePlugin p: plugins) {
try {
p.start(this);
LOG.info("Started plug-in " + p);
} catch (Throwable t) {
LOG.warn("ServicePlugin " + p + " could not be started", t);
}
}
}