下面列出了怎么用org.apache.hadoop.util.StringUtils的API类实例代码及写法,或者点击链接到github查看源代码。
@Test (timeout = 120000)
public void testSetClasspathWithNoUserPrecendence() {
Configuration conf = new Configuration();
conf.setBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, true);
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false);
Map<String, String> env = new HashMap<String, String>();
try {
MRApps.setClasspath(env, conf);
} catch (Exception e) {
fail("Got exception while setting classpath");
}
String env_str = env.get("CLASSPATH");
String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*",
ApplicationConstants.Environment.PWD.$$() + "/*"));
assertTrue("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, and job.jar is not in"
+ " the classpath!", env_str.contains(expectedClasspath));
assertFalse("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, but taking effect!",
env_str.startsWith(expectedClasspath));
}
/**
* Abort the application and wait for it to finish.
* @param t the exception that signalled the problem
* @throws IOException A wrapper around the exception that was passed in
*/
void abort(Throwable t) throws IOException {
LOG.info("Aborting because of " + StringUtils.stringifyException(t));
try {
downlink.abort();
downlink.flush();
} catch (IOException e) {
// IGNORE cleanup problems
}
try {
handler.waitForFinish();
} catch (Throwable ignored) {
process.destroy();
}
IOException wrapper = new IOException("pipe child exception");
wrapper.initCause(t);
throw wrapper;
}
/**
* Creates a server instance.
* <p>
* It uses the provided configuration instead loading it from the config dir.
*
* @param name server name.
* @param homeDir server home directory.
* @param configDir config directory.
* @param logDir log directory.
* @param tempDir temp directory.
* @param config server configuration.
*/
public Server(String name, String homeDir, String configDir, String logDir, String tempDir, Configuration config) {
this.name = StringUtils.toLowerCase(Check.notEmpty(name, "name").trim());
this.homeDir = Check.notEmpty(homeDir, "homeDir");
this.configDir = Check.notEmpty(configDir, "configDir");
this.logDir = Check.notEmpty(logDir, "logDir");
this.tempDir = Check.notEmpty(tempDir, "tempDir");
checkAbsolutePath(homeDir, "homeDir");
checkAbsolutePath(configDir, "configDir");
checkAbsolutePath(logDir, "logDir");
checkAbsolutePath(tempDir, "tempDir");
if (config != null) {
this.config = new Configuration(false);
ConfigurationUtils.copy(config, this.config);
}
status = Status.UNDEF;
}
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
LOG.info("POST");
resp.setContentType("text/html");
PrintWriter out = resp.getWriter();
Reconfigurable reconf = getReconfigurable(req);
String nodeName = reconf.getClass().getCanonicalName();
printHeader(out, nodeName);
try {
applyChanges(out, reconf, req);
} catch (ReconfigurationException e) {
resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
StringUtils.stringifyException(e));
return;
}
out.println("<p><a href=\"" + req.getServletPath() + "\">back</a></p>");
printFooter(out);
}
@Test
public void testFailOnExternalEntities() throws Exception {
final String externalEntitiesXml =
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
+ " <!DOCTYPE foo [ <!ENTITY xxe SYSTEM \"/tmp/foo\"> ] >"
+ " <ClusterVersion>&xee;</ClusterVersion>";
Client client = mock(Client.class);
RemoteAdmin admin = new RemoteAdmin(client, HBaseConfiguration.create(), null);
Response resp = new Response(200, null, Bytes.toBytes(externalEntitiesXml));
when(client.get("/version/cluster", Constants.MIMETYPE_XML)).thenReturn(resp);
try {
admin.getClusterVersion();
fail("Expected getClusterVersion() to throw an exception");
} catch (IOException e) {
assertEquals("Cause of exception ought to be a failure to parse the stream due to our " +
"invalid external entity. Make sure this isn't just a false positive due to " +
"implementation. see HBASE-19020.", UnmarshalException.class, e.getCause().getClass());
final String exceptionText = StringUtils.stringifyException(e);
final String expectedText = "\"xee\"";
LOG.debug("exception text: '" + exceptionText + "'", e);
assertTrue("Exception does not contain expected text", exceptionText.contains(expectedText));
}
}
@Override
public int run(Configuration conf, List<String> args) throws IOException {
String name = StringUtils.popFirstNonOption(args);
if (name == null) {
System.err.println("You must specify a name when deleting a " +
"cache pool.");
return 1;
}
if (!args.isEmpty()) {
System.err.print("Can't understand arguments: " +
Joiner.on(" ").join(args) + "\n");
System.err.println("Usage is " + getShortUsage());
return 1;
}
DistributedFileSystem dfs = AdminHelper.getDFS(conf);
try {
dfs.removeCachePool(name);
} catch (IOException e) {
System.err.println(AdminHelper.prettifyException(e));
return 2;
}
System.out.println("Successfully removed cache pool " + name + ".");
return 0;
}
private boolean markMovedIfGoodBlock(BalancerBlock block) {
synchronized(block) {
synchronized(movedBlocks) {
if (isGoodBlockCandidate(source, target, block)) {
this.block = block;
if ( chooseProxySource() ) {
movedBlocks.add(block);
if (LOG.isDebugEnabled()) {
LOG.debug("Decided to move block "+ block.getBlockId()
+" with a length of "+StringUtils.byteDesc(block.getNumBytes())
+ " bytes from " + source.getName()
+ " to " + target.getName()
+ " using proxy source " + proxySource.getName() );
}
return true;
}
}
}
}
return false;
}
/**
* Build ACL from the given two Strings.
* The Strings contain comma separated values.
*
* @param aclString build ACL from array of Strings
*/
private void buildACL(String[] userGroupStrings) {
users = new HashSet<String>();
groups = new HashSet<String>();
for (String aclPart : userGroupStrings) {
if (aclPart != null && isWildCardACLValue(aclPart)) {
allAllowed = true;
break;
}
}
if (!allAllowed) {
if (userGroupStrings.length >= 1 && userGroupStrings[0] != null) {
users = StringUtils.getTrimmedStringCollection(userGroupStrings[0]);
}
if (userGroupStrings.length == 2 && userGroupStrings[1] != null) {
groups = StringUtils.getTrimmedStringCollection(userGroupStrings[1]);
groupsMapping.cacheGroupsAdd(new LinkedList<String>(groups));
}
}
}
/**
* Submit this job to mapred. The state becomes RUNNING if submission
* is successful, FAILED otherwise.
*/
protected synchronized void submit() {
try {
if (theJobConf.getBoolean("create.empty.dir.if.nonexist", false)) {
FileSystem fs = FileSystem.get(theJobConf);
Path inputPaths[] = FileInputFormat.getInputPaths(theJobConf);
for (int i = 0; i < inputPaths.length; i++) {
if (!fs.exists(inputPaths[i])) {
try {
fs.mkdirs(inputPaths[i]);
} catch (IOException e) {
}
}
}
}
RunningJob running = jc.submitJob(theJobConf);
this.mapredJobID = running.getID();
this.state = Job.RUNNING;
} catch (IOException ioe) {
this.state = Job.FAILED;
this.message = StringUtils.stringifyException(ioe);
}
}
/**
* Change the permissions on a file / directory, recursively, if
* needed.
* @param filename name of the file whose permissions are to change
* @param perm permission string
* @param recursive true, if permissions should be changed recursively
* @return the exit code from the command.
* @throws IOException
*/
public static int chmod(String filename, String perm, boolean recursive)
throws IOException {
String [] cmd = Shell.getSetPermissionCommand(perm, recursive);
String[] args = new String[cmd.length + 1];
System.arraycopy(cmd, 0, args, 0, cmd.length);
args[cmd.length] = new File(filename).getPath();
ShellCommandExecutor shExec = new ShellCommandExecutor(args);
try {
shExec.execute();
}catch(IOException e) {
if(LOG.isDebugEnabled()) {
LOG.debug("Error while changing permission : " + filename
+" Exception: " + StringUtils.stringifyException(e));
}
}
return shExec.getExitCode();
}
public static void main(String argv[]) {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
try {
Configuration conf = new YarnConfiguration();
GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
argv = hParser.getRemainingArgs();
// If -format-state-store, then delete RMStateStore; else startup normally
if (argv.length == 1 && argv[0].equals("-format-state-store")) {
deleteRMStateStore(conf);
} else {
ResourceManager resourceManager = new ResourceManager();
ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(resourceManager),
SHUTDOWN_HOOK_PRIORITY);
resourceManager.init(conf);
resourceManager.start();
}
} catch (Throwable t) {
LOG.fatal("Error starting ResourceManager", t);
System.exit(-1);
}
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//value: 马老师 一名老师 刚老师 周老师
String[] strs = StringUtils.split(value.toString(), ' ');
for (int i = 1; i < strs.length; i++) {
mkey.set(getFof(strs[0],strs[i]));
mval.set(0);
context.write(mkey,mval);
for (int j = i+1; j < strs.length; j++) {
mkey.set(getFof(strs[i],strs[j]));
mval.set(1);
context.write(mkey,mval);
}
}
}
/**
* Returns an array of column ids(start from zero) which is set in the given
* parameter <tt>conf</tt>.
*/
public static ArrayList<Integer> getReadColumnIDs(Configuration conf) {
if (conf == null) {
return new ArrayList<Integer>(0);
}
String skips = conf.get(READ_COLUMN_IDS_CONF_STR, "");
String[] list = StringUtils.split(skips);
ArrayList<Integer> result = new ArrayList<Integer>(list.length);
for (String element : list) {
// it may contain duplicates, remove duplicates
Integer toAdd = Integer.parseInt(element);
if (!result.contains(toAdd)) {
result.add(toAdd);
}
}
return result;
}
@Test
@TestException(exception = FileSystemAccessException.class, msgRegExp = "H05.*")
@TestDir
public void NameNodeNotinWhitelists() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration conf = new Configuration(false);
conf.set("server.services", services);
conf.set("server.hadoop.name.node.whitelist", "NN");
Server server = new Server("server", dir, dir, dir, dir, conf);
server.init();
FileSystemAccessService fsAccess = (FileSystemAccessService) server.get(FileSystemAccess.class);
fsAccess.validateNamenode("NNx");
}
static String stringifyDataStatistics(DataStatistics stats) {
if (stats != null) {
StringBuffer buffer = new StringBuffer();
String compressionStatus = stats.isDataCompressed()
? "Compressed"
: "Uncompressed";
buffer.append(compressionStatus).append(" input data size: ");
buffer.append(StringUtils.humanReadableInt(stats.getDataSize()));
buffer.append(", ");
buffer.append("Number of files: ").append(stats.getNumFiles());
return buffer.toString();
} else {
return Summarizer.NA;
}
}
@Test
public void testMoveThrowsPleaseHoldException() throws IOException {
final TableName tableName = TableName.valueOf(name.getMethodName());
HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
TableDescriptorBuilder tableDescriptorBuilder =
TableDescriptorBuilder.newBuilder(tableName);
ColumnFamilyDescriptor columnFamilyDescriptor =
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("value")).build();
tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
admin.createTable(tableDescriptorBuilder.build());
try {
List<RegionInfo> tableRegions = admin.getRegions(tableName);
master.setInitialized(false); // fake it, set back later
admin.move(tableRegions.get(0).getEncodedNameAsBytes());
fail("Region should not be moved since master is not initialized");
} catch (IOException ioe) {
assertTrue(StringUtils.stringifyException(ioe).contains("PleaseHoldException"));
} finally {
master.setInitialized(true);
TEST_UTIL.deleteTable(tableName);
}
}
private static void createStorageDirs(DataStorage storage, Configuration conf,
int numDirs) throws IOException {
List<Storage.StorageDirectory> dirs =
new ArrayList<Storage.StorageDirectory>();
List<String> dirStrings = new ArrayList<String>();
for (int i = 0; i < numDirs; i++) {
File loc = new File(BASE_DIR + "/data" + i);
dirStrings.add(new Path(loc.toString()).toUri().toString());
loc.mkdirs();
dirs.add(createStorageDirectory(loc));
when(storage.getStorageDir(i)).thenReturn(dirs.get(i));
}
String dataDir = StringUtils.join(",", dirStrings);
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDir);
when(storage.dirIterator()).thenReturn(dirs.iterator());
when(storage.getNumStorageDirs()).thenReturn(numDirs);
}
@Test
public void testWithDuplicateProxyGroups() throws Exception {
Configuration conf = new Configuration();
conf.set(
DefaultImpersonationProvider.getTestProvider().
getProxySuperuserGroupConfKey(REAL_USER_NAME),
StringUtils.join(",", Arrays.asList(GROUP_NAMES,GROUP_NAMES)));
conf.set(
DefaultImpersonationProvider.getTestProvider().
getProxySuperuserIpConfKey(REAL_USER_NAME),
PROXY_IP);
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
Collection<String> groupsToBeProxied =
ProxyUsers.getDefaultImpersonationProvider().getProxyGroups().get(
DefaultImpersonationProvider.getTestProvider().
getProxySuperuserGroupConfKey(REAL_USER_NAME));
assertEquals (1,groupsToBeProxied.size());
}
static AMSApplicationServer launchAMSApplicationServer(String[] args) {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(AMSApplicationServer.class, args, LOG);
AMSApplicationServer amsApplicationServer = null;
try {
amsApplicationServer = new AMSApplicationServer();
ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(amsApplicationServer),
SHUTDOWN_HOOK_PRIORITY);
YarnConfiguration conf = new YarnConfiguration();
amsApplicationServer.init(conf);
amsApplicationServer.start();
} catch (Throwable t) {
LOG.fatal("Error starting AMSApplicationServer", t);
ExitUtil.terminate(-1, "Error starting AMSApplicationServer");
}
return amsApplicationServer;
}
static String stringifyDataStatistics(DataStatistics stats) {
if (stats != null) {
StringBuffer buffer = new StringBuffer();
String compressionStatus = stats.isDataCompressed()
? "Compressed"
: "Uncompressed";
buffer.append(compressionStatus).append(" input data size: ");
buffer.append(StringUtils.humanReadableInt(stats.getDataSize()));
buffer.append(", ");
buffer.append("Number of files: ").append(stats.getNumFiles());
return buffer.toString();
} else {
return Summarizer.NA;
}
}
public void metadataStuff(String table) {
Connection dbcon = this.getConnection();
String sql = "select top 1 * from " + table;
Statement st;
try {
st = dbcon.createStatement();
ResultSet rs = st.executeQuery(sql);
ResultSetMetaData rsmd = rs.getMetaData();
for (int i = 1; i <= rsmd.getColumnCount(); i++) {
System.out.println(rsmd.getColumnName(i) + "\t"
+ rsmd.getColumnClassName(i) + "\t"
+ rsmd.getColumnType(i) + "\t"
+ rsmd.getColumnTypeName(i) + "\n");
}
} catch (SQLException e) {
LOG.error(StringUtils.stringifyException(e));
}
}
protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) {
int oldNumEntries = this.numEntries.getAndSet(0);
String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null;
if (oldPath != null) {
this.walFile2Props.put(oldPath,
new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
this.totalLogSize.addAndGet(oldFileLen);
LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}",
CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen),
newPathString);
} else {
LOG.info("New WAL {}", newPathString);
}
}
public HealthCheckChore(int sleepTime, Stoppable stopper, Configuration conf) {
super("HealthChecker", stopper, sleepTime);
LOG.info("Health Check Chore runs every " + StringUtils.formatTime(sleepTime));
this.config = conf;
String healthCheckScript = this.config.get(HConstants.HEALTH_SCRIPT_LOC);
long scriptTimeout = this.config.getLong(HConstants.HEALTH_SCRIPT_TIMEOUT,
HConstants.DEFAULT_HEALTH_SCRIPT_TIMEOUT);
healthChecker = new HealthChecker();
healthChecker.init(healthCheckScript, scriptTimeout);
this.threshold = config.getInt(HConstants.HEALTH_FAILURE_THRESHOLD,
HConstants.DEFAULT_HEALTH_FAILURE_THRESHOLD);
this.failureWindow = (long)this.threshold * (long)sleepTime;
}
private void checkAndInit() throws IOException {
String jobStatus = this.job.get(Keys.JOB_STATUS);
if (Values.PREP.name().equals(jobStatus)) {
hasUpdates = true;
LOG.info("Calling init from RM for job " + jip.getJobID().toString());
try {
initJob(jip);
} catch (Throwable t) {
LOG.error("Job initialization failed : \n"
+ StringUtils.stringifyException(t));
failJob(jip);
throw new IOException(t);
}
}
}
@Test
public void testNoHostsForUsers() throws Exception {
Configuration conf = new Configuration(false);
conf.set("y." + REAL_USER_NAME + ".users",
StringUtils.join(",", Arrays.asList(AUTHORIZED_PROXY_USER_NAME)));
ProxyUsers.refreshSuperUserGroupsConfiguration(conf, "y");
UserGroupInformation realUserUgi = UserGroupInformation
.createRemoteUser(REAL_USER_NAME);
UserGroupInformation proxyUserUgi = UserGroupInformation.createProxyUserForTesting(
AUTHORIZED_PROXY_USER_NAME, realUserUgi, GROUP_NAMES);
// IP doesn't matter
assertNotAuthorized(proxyUserUgi, "1.2.3.4");
}
/**
* get the uris of all the files/caches
*/
protected void getURIs(String lcacheArchives, String lcacheFiles) {
String archives[] = StringUtils.getStrings(lcacheArchives);
String files[] = StringUtils.getStrings(lcacheFiles);
fileURIs = StringUtils.stringToURI(files);
archiveURIs = StringUtils.stringToURI(archives);
}
private void verifyRead(FSDataInputStream stm, byte[] fileContents,
int seekOff, int toRead) throws IOException {
byte[] out = new byte[toRead];
stm.seek(seekOff);
stm.readFully(out);
byte[] expected = Arrays.copyOfRange(fileContents, seekOff, seekOff+toRead);
if (!Arrays.equals(out, expected)) {
String s ="\nExpected: " +
StringUtils.byteToHexString(expected) +
"\ngot: " +
StringUtils.byteToHexString(out) +
"\noff=" + seekOff + " len=" + toRead;
fail(s);
}
}
private static void assertExceptionContains(
Throwable t, String substring) {
String msg = StringUtils.stringifyException(t);
assertTrue("Exception should contain substring '" + substring + "':\n" +
msg, msg.contains(substring));
LOG.info("Got expected exception", t);
}
@Override
public void run() {
LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
do {
try {
int numNewMaps = getMapCompletionEvents();
if (numNewMaps == 0) {
if (getNumMapsCopyCompleted() == numMaps) {
break;
}
}
else if (numNewMaps > 0) {
LOG.info(reduceTask.getTaskID() + ": " +
"Got " + numNewMaps + " new map-outputs");
synchronized (mapLocations) {
mapLocations.notify();
}
}
Thread.sleep(SLEEP_TIME);
}
catch (InterruptedException e) {
// ignore. if we are shutting down - the while condition
// will check for it and exit. otherwise this could be a
// spurious interrupt due to log4j interaction
}
catch (Throwable t) {
String msg = reduceTask.getTaskID()
+ " GetMapEventsThread Ignoring exception : "
+ StringUtils.stringifyException(t);
reportFatalError(getTaskID(), t, msg);
}
} while (!exitGetMapEvents);
LOG.info("GetMapEventsThread exiting");
}
@Test
@TestException(exception = ServiceException.class, msgRegExp = "H09.*")
@TestDir
public void invalidSecurity() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration conf = new Configuration(false);
conf.set("server.services", services);
conf.set("server.hadoop.authentication.type", "foo");
Server server = new Server("server", dir, dir, dir, dir, conf);
server.init();
}