下面列出了怎么用org.apache.hadoop.mapreduce.lib.input.InvalidInputException的API类实例代码及写法,或者点击链接到github查看源代码。
private RuntimeException adapt(RuntimeException e) {
if (e.getCause() instanceof AccessControlException)
return createFrom((AccessControlException) e.getCause());
if (e.getCause() instanceof LoginException)
return createFrom((LoginException) e.getCause());
if (e.getCause() instanceof InvalidInputException)
return createFrom((InvalidInputException) e.getCause());
if (e.getCause() instanceof IOException) {
RuntimeException ex = createFromOrNull((IOException) e.getCause());
if (ex != null)
return ex;
}
return e;
}
private List<FileStatus> simpleListStatus(JobContext job, Path[] dirs,
PathFilter inputFilter, boolean recursive) throws IOException {
List<FileStatus> result = new ArrayList<FileStatus>();
List<IOException> errors = new ArrayList<IOException>();
Configuration conf = job.getConfiguration();
for (int i=0; i < dirs.length; ++i) {
Path p = dirs[i];
FileSystem fs = p.getFileSystem(conf);
FileStatus[] matches;
try {
matches = fs.globStatus(p, inputFilter);
} catch (IllegalArgumentException e) {
errors.add(new IOException(e.getMessage()));
continue;
}
if (matches == null) {
errors.add(new IOException("Input path does not exist: " + p));
} else if (matches.length == 0) {
errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
} else {
for (FileStatus globStat: matches) {
if (globStat.isDirectory()) {
FileStatus[] files = fs.listStatus(globStat.getPath(), inputFilter);
for (int j = 0; j < files.length; j++) {
if (recursive && files[j].isDirectory()) {
simpleAddInputPathRecursively(result, fs, files[j].getPath(),inputFilter);
} else {
result.add(files[j]);
}
}
} else {
result.add(globStat);
}
}
}
}
if (!errors.isEmpty()) {
throw new InvalidInputException(errors);
}
return result;
}
private TalendRuntimeException createFrom(InvalidInputException cause) {
// TODO: more specific method for file not found errors.
if (accessType == AccessType.Read)
return SimpleFileIOErrorCode.createInputNotAuthorized(cause, username, path);
return SimpleFileIOErrorCode.createOutputNotAuthorized(cause, username, path);
}
@Test
public void testNonExistingInputPathInSkewJoin() throws Exception {
String script =
"exists = LOAD '" + INPUT_FILE2 + "' AS (a:long, x:chararray);" +
"missing = LOAD '/non/existing/directory' AS (a:long);" +
"missing = FOREACH ( GROUP missing BY a ) GENERATE $0 AS a, COUNT_STAR($1);" +
"joined = JOIN exists BY a, missing BY a USING 'skewed';";
String logFile = Util.createTempFileDelOnExit("tmp", ".log").getAbsolutePath();
Logger logger = Logger.getLogger("org.apache.pig");
logger.setLevel(Level.INFO);
SimpleLayout layout = new SimpleLayout();
FileAppender appender = new FileAppender(layout, logFile.toString(), false, false, 0);
logger.addAppender(appender);
try {
pigServer.registerScript(new ByteArrayInputStream(script.getBytes("UTF-8")));
pigServer.openIterator("joined");
} catch (Exception e) {
boolean foundInvalidInputException = false;
// Search through chained exceptions for InvalidInputException. If
// input splits are calculated on the front-end, we will see this
// exception in the stack trace.
Throwable cause = e.getCause();
while (cause != null) {
if (cause instanceof InvalidInputException) {
foundInvalidInputException = true;
break;
}
cause = cause.getCause();
}
// InvalidInputException was not found in the stack trace. But it's
// possible that the exception was thrown in the back-end, and Pig
// couldn't retrieve it in the front-end. To be safe, search the log
// file before declaring a failure.
if (!foundInvalidInputException) {
FileInputStream fis = new FileInputStream(new File(logFile));
int bytes = fis.available();
byte[] buffer = new byte[bytes];
fis.read(buffer);
String str = new String(buffer, "UTF-8");
if (str.contains(InvalidInputException.class.getName())) {
foundInvalidInputException = true;
}
fis.close();
}
assertTrue("This exception was not caused by InvalidInputException: " + e,
foundInvalidInputException);
} finally {
LogManager.resetConfiguration();
}
}