下面列出了怎么用org.apache.hadoop.util.ShutdownHookManager的API类实例代码及写法,或者点击链接到github查看源代码。
public static void startupShutdownMessage(VersionInfo versionInfo,
Class<?> clazz, String[] args, Logger log) {
final String hostname = NetUtils.getHostname();
final String className = clazz.getSimpleName();
if (log.isInfoEnabled()) {
log.info(createStartupShutdownMessage(versionInfo, className, hostname,
args));
}
if (SystemUtils.IS_OS_UNIX) {
try {
SignalLogger.INSTANCE.register(log);
} catch (Throwable t) {
log.warn("failed to register any UNIX signal loggers: ", t);
}
}
ShutdownHookManager.get().addShutdownHook(
() -> log.info(toStartupShutdownString("SHUTDOWN_MSG: ",
"Shutting down " + className + " at " + hostname)),
SHUTDOWN_HOOK_PRIORITY);
}
static AMSApplicationServer launchAMSApplicationServer(String[] args) {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(AMSApplicationServer.class, args, LOG);
AMSApplicationServer amsApplicationServer = null;
try {
amsApplicationServer = new AMSApplicationServer();
ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(amsApplicationServer),
SHUTDOWN_HOOK_PRIORITY);
YarnConfiguration conf = new YarnConfiguration();
amsApplicationServer.init(conf);
amsApplicationServer.start();
} catch (Throwable t) {
LOG.fatal("Error starting AMSApplicationServer", t);
ExitUtil.terminate(-1, "Error starting AMSApplicationServer");
}
return amsApplicationServer;
}
static JobHistoryServer launchJobHistoryServer(String[] args) {
Thread.
setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(JobHistoryServer.class, args, LOG);
JobHistoryServer jobHistoryServer = null;
try {
jobHistoryServer = new JobHistoryServer();
ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(jobHistoryServer),
SHUTDOWN_HOOK_PRIORITY);
YarnConfiguration conf = new YarnConfiguration(new JobConf());
new GenericOptionsParser(conf, args);
jobHistoryServer.init(conf);
jobHistoryServer.start();
} catch (Throwable t) {
LOG.fatal("Error starting JobHistoryServer", t);
ExitUtil.terminate(-1, "Error starting JobHistoryServer");
}
return jobHistoryServer;
}
public static void main(String[] args) {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(SharedCacheManager.class, args, LOG);
try {
Configuration conf = new YarnConfiguration();
SharedCacheManager sharedCacheManager = new SharedCacheManager();
ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(sharedCacheManager),
SHUTDOWN_HOOK_PRIORITY);
sharedCacheManager.init(conf);
sharedCacheManager.start();
} catch (Throwable t) {
LOG.fatal("Error starting SharedCacheManager", t);
System.exit(-1);
}
}
protected void shutDown() {
new Thread() {
@Override
public void run() {
try {
NodeManager.this.stop();
} catch (Throwable t) {
LOG.error("Error while shutting down NodeManager", t);
} finally {
if (shouldExitOnShutdownEvent
&& !ShutdownHookManager.get().isShutdownInProgress()) {
ExitUtil.terminate(-1);
}
}
}
}.start();
}
private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) {
try {
// Remove the old hook if we are rebooting.
if (hasToReboot && null != nodeManagerShutdownHook) {
ShutdownHookManager.get().removeShutdownHook(nodeManagerShutdownHook);
}
nodeManagerShutdownHook = new CompositeServiceShutdownHook(this);
ShutdownHookManager.get().addShutdownHook(nodeManagerShutdownHook,
SHUTDOWN_HOOK_PRIORITY);
// System exit should be called only when NodeManager is instantiated from
// main() funtion
this.shouldExitOnShutdownEvent = true;
this.init(conf);
this.start();
} catch (Throwable t) {
LOG.fatal("Error starting NodeManager", t);
System.exit(-1);
}
}
public static void main(String argv[]) {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
try {
Configuration conf = new YarnConfiguration();
GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
argv = hParser.getRemainingArgs();
// If -format-state-store, then delete RMStateStore; else startup normally
if (argv.length == 1 && argv[0].equals("-format-state-store")) {
deleteRMStateStore(conf);
} else {
ResourceManager resourceManager = new ResourceManager();
ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(resourceManager),
SHUTDOWN_HOOK_PRIORITY);
resourceManager.init(conf);
resourceManager.start();
}
} catch (Throwable t) {
LOG.fatal("Error starting ResourceManager", t);
System.exit(-1);
}
}
static ApplicationHistoryServer launchAppHistoryServer(String[] args) {
Thread
.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(ApplicationHistoryServer.class, args,
LOG);
ApplicationHistoryServer appHistoryServer = null;
try {
appHistoryServer = new ApplicationHistoryServer();
ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(appHistoryServer),
SHUTDOWN_HOOK_PRIORITY);
YarnConfiguration conf = new YarnConfiguration();
new GenericOptionsParser(conf, args);
appHistoryServer.init(conf);
appHistoryServer.start();
} catch (Throwable t) {
LOG.fatal("Error starting ApplicationHistoryServer", t);
ExitUtil.terminate(-1, "Error starting ApplicationHistoryServer");
}
return appHistoryServer;
}
/**
* Report a fatal error to the parent (task) tracker.
*/
protected void reportFatalError(TaskAttemptID id, Throwable throwable,
String logMsg) {
LOG.fatal(logMsg);
if (ShutdownHookManager.get().isShutdownInProgress()) {
return;
}
Throwable tCause = throwable.getCause();
String cause = tCause == null
? StringUtils.stringifyException(throwable)
: StringUtils.stringifyException(tCause);
try {
umbilical.fatalError(id, cause);
} catch (IOException ioe) {
LOG.fatal("Failed to contact the tasktracker", ioe);
System.exit(-1);
}
}
static JobHistoryServer launchJobHistoryServer(String[] args) {
Thread.
setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(JobHistoryServer.class, args, LOG);
JobHistoryServer jobHistoryServer = null;
try {
jobHistoryServer = new JobHistoryServer();
ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(jobHistoryServer),
SHUTDOWN_HOOK_PRIORITY);
YarnConfiguration conf = new YarnConfiguration(new JobConf());
new GenericOptionsParser(conf, args);
jobHistoryServer.init(conf);
jobHistoryServer.start();
} catch (Throwable t) {
LOG.fatal("Error starting JobHistoryServer", t);
ExitUtil.terminate(-1, "Error starting JobHistoryServer");
}
return jobHistoryServer;
}
DFSClientCache(NfsConfiguration config, int clientCache) {
this.config = config;
this.clientCache = CacheBuilder.newBuilder()
.maximumSize(clientCache)
.removalListener(clientRemovalListener())
.build(clientLoader());
this.inputstreamCache = CacheBuilder.newBuilder()
.maximumSize(DEFAULT_DFS_INPUTSTREAM_CACHE_SIZE)
.expireAfterAccess(DEFAULT_DFS_INPUTSTREAM_CACHE_TTL, TimeUnit.SECONDS)
.removalListener(inputStreamRemovalListener())
.build(inputStreamLoader());
ShutdownHookManager.get().addShutdownHook(new CacheFinalizer(),
SHUTDOWN_HOOK_PRIORITY);
}
/**
* Main function of the DistCp program. Parses the input arguments (via OptionsParser),
* and invokes the DistCp::run() method, via the ToolRunner.
* @param argv Command-line arguments sent to DistCp.
*/
public static void main(String argv[]) {
int exitCode;
try {
DistCp distCp = new DistCp();
Cleanup CLEANUP = new Cleanup(distCp);
ShutdownHookManager.get().addShutdownHook(CLEANUP,
SHUTDOWN_HOOK_PRIORITY);
exitCode = ToolRunner.run(getDefaultConf(), distCp, argv);
}
catch (Exception e) {
LOG.error("Couldn't complete DistCp operation: ", e);
exitCode = DistCpConstants.UNKNOWN_ERROR;
}
System.exit(exitCode);
}
/**
* Mark a path to be deleted on JVM shutdown.
*
* @param f the existing path to delete.
*
* @return true if deleteOnExit is successful, otherwise false.
*
* @throws AccessControlException If access is denied
* @throws UnsupportedFileSystemException If file system for <code>f</code> is
* not supported
* @throws IOException If an I/O error occurred
*
* Exceptions applicable to file systems accessed over RPC:
* @throws RpcClientException If an exception occurred in the RPC client
* @throws RpcServerException If an exception occurred in the RPC server
* @throws UnexpectedServerException If server implementation throws
* undeclared exception to RPC server
*/
public boolean deleteOnExit(Path f) throws AccessControlException,
IOException {
if (!this.util().exists(f)) {
return false;
}
synchronized (DELETE_ON_EXIT) {
if (DELETE_ON_EXIT.isEmpty()) {
ShutdownHookManager.get().addShutdownHook(FINALIZER, SHUTDOWN_HOOK_PRIORITY);
}
Set<Path> set = DELETE_ON_EXIT.get(this);
if (set == null) {
set = new TreeSet<Path>();
DELETE_ON_EXIT.put(this, set);
}
set.add(f);
}
return true;
}
public static SpanReceiverHost get(Configuration conf, String confPrefix) {
synchronized (SpanReceiverHost.class) {
SpanReceiverHost host = hosts.get(confPrefix);
if (host != null) {
return host;
}
final SpanReceiverHost newHost = new SpanReceiverHost(confPrefix);
newHost.loadSpanReceivers(conf);
ShutdownHookManager.get().addShutdownHook(new Runnable() {
public void run() {
newHost.closeReceivers();
}
}, 0);
hosts.put(confPrefix, newHost);
return newHost;
}
}
public static void main(String[] args) {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(SharedCacheManager.class, args, LOG);
try {
Configuration conf = new YarnConfiguration();
SharedCacheManager sharedCacheManager = new SharedCacheManager();
ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(sharedCacheManager),
SHUTDOWN_HOOK_PRIORITY);
sharedCacheManager.init(conf);
sharedCacheManager.start();
} catch (Throwable t) {
LOG.fatal("Error starting SharedCacheManager", t);
System.exit(-1);
}
}
protected void shutDown() {
new Thread() {
@Override
public void run() {
try {
NodeManager.this.stop();
} catch (Throwable t) {
LOG.error("Error while shutting down NodeManager", t);
} finally {
if (shouldExitOnShutdownEvent
&& !ShutdownHookManager.get().isShutdownInProgress()) {
ExitUtil.terminate(-1);
}
}
}
}.start();
}
private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) {
try {
// Remove the old hook if we are rebooting.
if (hasToReboot && null != nodeManagerShutdownHook) {
ShutdownHookManager.get().removeShutdownHook(nodeManagerShutdownHook);
}
nodeManagerShutdownHook = new CompositeServiceShutdownHook(this);
ShutdownHookManager.get().addShutdownHook(nodeManagerShutdownHook,
SHUTDOWN_HOOK_PRIORITY);
// System exit should be called only when NodeManager is instantiated from
// main() funtion
this.shouldExitOnShutdownEvent = true;
this.init(conf);
this.start();
} catch (Throwable t) {
LOG.fatal("Error starting NodeManager", t);
System.exit(-1);
}
}
public static void main(String argv[]) {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
try {
Configuration conf = new YarnConfiguration();
GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
argv = hParser.getRemainingArgs();
// If -format-state-store, then delete RMStateStore; else startup normally
if (argv.length == 1 && argv[0].equals("-format-state-store")) {
deleteRMStateStore(conf);
} else {
ResourceManager resourceManager = new ResourceManager();
ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(resourceManager),
SHUTDOWN_HOOK_PRIORITY);
resourceManager.init(conf);
resourceManager.start();
}
} catch (Throwable t) {
LOG.fatal("Error starting ResourceManager", t);
System.exit(-1);
}
}
static ApplicationHistoryServer launchAppHistoryServer(String[] args) {
Thread
.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(ApplicationHistoryServer.class, args,
LOG);
ApplicationHistoryServer appHistoryServer = null;
try {
appHistoryServer = new ApplicationHistoryServer();
ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(appHistoryServer),
SHUTDOWN_HOOK_PRIORITY);
YarnConfiguration conf = new YarnConfiguration();
new GenericOptionsParser(conf, args);
appHistoryServer.init(conf);
appHistoryServer.start();
} catch (Throwable t) {
LOG.fatal("Error starting ApplicationHistoryServer", t);
ExitUtil.terminate(-1, "Error starting ApplicationHistoryServer");
}
return appHistoryServer;
}
/**
* Report a fatal error to the parent (task) tracker.
*/
protected void reportFatalError(TaskAttemptID id, Throwable throwable,
String logMsg) {
LOG.fatal(logMsg);
if (ShutdownHookManager.get().isShutdownInProgress()) {
return;
}
Throwable tCause = throwable.getCause();
String cause = tCause == null
? StringUtils.stringifyException(throwable)
: StringUtils.stringifyException(tCause);
try {
umbilical.fatalError(id, cause);
} catch (IOException ioe) {
LOG.fatal("Failed to contact the tasktracker", ioe);
System.exit(-1);
}
}
static JobHistoryServer launchJobHistoryServer(String[] args) {
Thread.
setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(JobHistoryServer.class, args, LOG);
JobHistoryServer jobHistoryServer = null;
try {
jobHistoryServer = new JobHistoryServer();
ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(jobHistoryServer),
SHUTDOWN_HOOK_PRIORITY);
YarnConfiguration conf = new YarnConfiguration(new JobConf());
new GenericOptionsParser(conf, args);
jobHistoryServer.init(conf);
jobHistoryServer.start();
} catch (Throwable t) {
LOG.fatal("Error starting JobHistoryServer", t);
ExitUtil.terminate(-1, "Error starting JobHistoryServer");
}
return jobHistoryServer;
}
DFSClientCache(NfsConfiguration config, int clientCache) {
this.config = config;
this.clientCache = CacheBuilder.newBuilder()
.maximumSize(clientCache)
.removalListener(clientRemovalListener())
.build(clientLoader());
this.inputstreamCache = CacheBuilder.newBuilder()
.maximumSize(DEFAULT_DFS_INPUTSTREAM_CACHE_SIZE)
.expireAfterAccess(DEFAULT_DFS_INPUTSTREAM_CACHE_TTL, TimeUnit.SECONDS)
.removalListener(inputStreamRemovalListener())
.build(inputStreamLoader());
ShutdownHookManager.get().addShutdownHook(new CacheFinalizer(),
SHUTDOWN_HOOK_PRIORITY);
}
/**
* Main function of the DistCp program. Parses the input arguments (via OptionsParser),
* and invokes the DistCp::run() method, via the ToolRunner.
* @param argv Command-line arguments sent to DistCp.
*/
public static void main(String argv[]) {
int exitCode;
try {
DistCp distCp = new DistCp();
Cleanup CLEANUP = new Cleanup(distCp);
ShutdownHookManager.get().addShutdownHook(CLEANUP,
SHUTDOWN_HOOK_PRIORITY);
exitCode = ToolRunner.run(getDefaultConf(), distCp, argv);
}
catch (Exception e) {
LOG.error("Couldn't complete DistCp operation: ", e);
exitCode = DistCpConstants.UNKNOWN_ERROR;
}
System.exit(exitCode);
}
/**
* Mark a path to be deleted on JVM shutdown.
*
* @param f the existing path to delete.
*
* @return true if deleteOnExit is successful, otherwise false.
*
* @throws AccessControlException If access is denied
* @throws UnsupportedFileSystemException If file system for <code>f</code> is
* not supported
* @throws IOException If an I/O error occurred
*
* Exceptions applicable to file systems accessed over RPC:
* @throws RpcClientException If an exception occurred in the RPC client
* @throws RpcServerException If an exception occurred in the RPC server
* @throws UnexpectedServerException If server implementation throws
* undeclared exception to RPC server
*/
public boolean deleteOnExit(Path f) throws AccessControlException,
IOException {
if (!this.util().exists(f)) {
return false;
}
synchronized (DELETE_ON_EXIT) {
if (DELETE_ON_EXIT.isEmpty()) {
ShutdownHookManager.get().addShutdownHook(FINALIZER, SHUTDOWN_HOOK_PRIORITY);
}
Set<Path> set = DELETE_ON_EXIT.get(this);
if (set == null) {
set = new TreeSet<Path>();
DELETE_ON_EXIT.put(this, set);
}
set.add(f);
}
return true;
}
public static SpanReceiverHost get(Configuration conf, String confPrefix) {
synchronized (SpanReceiverHost.class) {
SpanReceiverHost host = hosts.get(confPrefix);
if (host != null) {
return host;
}
final SpanReceiverHost newHost = new SpanReceiverHost(confPrefix);
newHost.loadSpanReceivers(conf);
ShutdownHookManager.get().addShutdownHook(new Runnable() {
public void run() {
newHost.closeReceivers();
}
}, 0);
hosts.put(confPrefix, newHost);
return newHost;
}
}
@SuppressWarnings("unchecked")
protected void dispatch(Event event) {
//all events go thru this loop
if (LOG.isDebugEnabled()) {
LOG.debug("Dispatching the event " + event.getClass().getName() + "."
+ event.toString());
}
Class<? extends Enum> type = event.getType().getDeclaringClass();
try{
EventHandler handler = eventDispatchers.get(type);
if(handler != null) {
handler.handle(event);
} else {
throw new Exception("No handler for registered for " + type);
}
} catch (Throwable t) {
//TODO Maybe log the state of the queue
LOG.fatal("Error in dispatcher thread:" + event.getType(), t);
if (exitOnDispatchException && (ShutdownHookManager.get().isShutdownInProgress()) == false) {
LOG.info("Exiting, bye..");
System.exit(-1);
}
} finally {
}
}
/**
* Shutdown the volumeset.
*/
public void shutdown() {
saveVolumeSetUsed();
stopDiskChecker();
if (shutdownHook != null) {
ShutdownHookManager.get().removeShutdownHook(shutdownHook);
}
}
@SuppressWarnings("unchecked")
protected void dispatch(Event event) {
//all events go thru this loop
if (LOG.isDebugEnabled()) {
LOG.debug("Dispatching the event " + event.getClass().getName() + "."
+ event.toString());
}
Class<? extends Enum> type = event.getType().getDeclaringClass();
try{
EventHandler handler = eventDispatchers.get(type);
if(handler != null) {
handler.handle(event);
} else {
throw new Exception("No handler for registered for " + type);
}
} catch (Throwable t) {
//TODO Maybe log the state of the queue
LOG.fatal("Error in dispatcher thread", t);
// If serviceStop is called, we should exit this thread gracefully.
if (exitOnDispatchException
&& (ShutdownHookManager.get().isShutdownInProgress()) == false
&& stopped == false) {
Thread shutDownThread = new Thread(createShutDownThread());
shutDownThread.setName("AsyncDispatcher ShutDown handler");
shutDownThread.start();
}
}
}
/**
* Start proxy server.
*
* @return proxy server instance.
*/
protected static WebAppProxyServer startServer(Configuration configuration)
throws Exception {
WebAppProxyServer proxy = new WebAppProxyServer();
ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(proxy), SHUTDOWN_HOOK_PRIORITY);
proxy.init(configuration);
proxy.start();
return proxy;
}
@Override
public void run() {
SchedulerEvent event;
while (!stopped && !Thread.currentThread().isInterrupted()) {
try {
event = eventQueue.take();
} catch (InterruptedException e) {
LOG.error("Returning, interrupted : " + e);
return; // TODO: Kill RM.
}
try {
scheduler.handle(event);
} catch (Throwable t) {
// An error occurred, but we are shutting down anyway.
// If it was an InterruptedException, the very act of
// shutdown could have caused it and is probably harmless.
if (stopped) {
LOG.warn("Exception during shutdown: ", t);
break;
}
LOG.fatal("Error in handling event type " + event.getType()
+ " to the scheduler", t);
if (shouldExitOnError
&& !ShutdownHookManager.get().isShutdownInProgress()) {
LOG.info("Exiting, bbye..");
System.exit(-1);
}
}
}
}