下面列出了怎么用org.apache.hadoop.mapred.SkipBadRecords的API类实例代码及写法,或者点击链接到github查看源代码。
@SuppressWarnings("unchecked")
public void configure(JobConf jobConf) {
int numberOfThreads =
jobConf.getInt(MultithreadedMapper.NUM_THREADS, 10);
if (LOG.isDebugEnabled()) {
LOG.debug("Configuring jobConf " + jobConf.getJobName() +
" to use " + numberOfThreads + " threads");
}
this.job = jobConf;
//increment processed counter only if skipping feature is enabled
this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 &&
SkipBadRecords.getAutoIncrMapperProcCount(job);
this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(),
jobConf);
// Creating a threadpool of the configured size to execute the Mapper
// map method in parallel.
executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads,
0L, TimeUnit.MILLISECONDS,
new BlockingArrayQueue
(numberOfThreads));
}
public void configure(JobConf job) {
super.configure(job);
//disable the auto increment of the counter. For streaming, no of
//processed records could be different(equal or less) than the no of
//records input.
SkipBadRecords.setAutoIncrReducerProcCount(job, false);
skipping = job.getBoolean(MRJobConfig.SKIP_RECORDS, false);
try {
reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8");
reduceInputFieldSeparator = job_.get("stream.reduce.input.field.separator", "\t").getBytes("UTF-8");
this.numOfReduceOutputKeyFields = job_.getInt("stream.num.reduce.output.key.fields", 1);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("The current system does not support UTF-8 encoding!", e);
}
}
public void configure(JobConf job) {
super.configure(job);
//disable the auto increment of the counter. For streaming, no of
//processed records could be different(equal or less) than the no of
//records input.
SkipBadRecords.setAutoIncrMapperProcCount(job, false);
skipping = job.getBoolean(MRJobConfig.SKIP_RECORDS, false);
if (mapInputWriterClass_.getCanonicalName().equals(TextInputWriter.class.getCanonicalName())) {
String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName();
ignoreKey = job.getBoolean("stream.map.input.ignoreKey",
inputFormatClassName.equals(TextInputFormat.class.getCanonicalName()));
}
try {
mapOutputFieldSeparator = job.get("stream.map.output.field.separator", "\t").getBytes("UTF-8");
mapInputFieldSeparator = job.get("stream.map.input.field.separator", "\t").getBytes("UTF-8");
numOfMapOutputKeyFields = job.getInt("stream.num.map.output.key.fields", 1);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("The current system does not support UTF-8 encoding!", e);
}
}
public App(String[] args) throws Exception{
if(args.length>0) {
isReducer = Boolean.parseBoolean(args[0]);
}
String counter = SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS;
if(isReducer) {
counter = SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS;
}
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
String line;
int count = 0;
while ((line = in.readLine()) != null) {
processLine(line);
count++;
if(count>=10) {
System.err.println("reporter:counter:"+SkipBadRecords.COUNTER_GROUP+
","+counter+","+count);
count = 0;
}
}
}
@SuppressWarnings("unchecked")
public void configure(JobConf jobConf) {
int numberOfThreads =
jobConf.getInt(MultithreadedMapper.NUM_THREADS, 10);
if (LOG.isDebugEnabled()) {
LOG.debug("Configuring jobConf " + jobConf.getJobName() +
" to use " + numberOfThreads + " threads");
}
this.job = jobConf;
//increment processed counter only if skipping feature is enabled
this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 &&
SkipBadRecords.getAutoIncrMapperProcCount(job);
this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(),
jobConf);
// Creating a threadpool of the configured size to execute the Mapper
// map method in parallel.
executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads,
0L, TimeUnit.MILLISECONDS,
new BlockingArrayQueue
(numberOfThreads));
}
public void configure(JobConf job) {
super.configure(job);
//disable the auto increment of the counter. For streaming, no of
//processed records could be different(equal or less) than the no of
//records input.
SkipBadRecords.setAutoIncrReducerProcCount(job, false);
skipping = job.getBoolean(MRJobConfig.SKIP_RECORDS, false);
try {
reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8");
reduceInputFieldSeparator = job_.get("stream.reduce.input.field.separator", "\t").getBytes("UTF-8");
this.numOfReduceOutputKeyFields = job_.getInt("stream.num.reduce.output.key.fields", 1);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("The current system does not support UTF-8 encoding!", e);
}
}
public void configure(JobConf job) {
super.configure(job);
//disable the auto increment of the counter. For streaming, no of
//processed records could be different(equal or less) than the no of
//records input.
SkipBadRecords.setAutoIncrMapperProcCount(job, false);
skipping = job.getBoolean(MRJobConfig.SKIP_RECORDS, false);
if (mapInputWriterClass_.getCanonicalName().equals(TextInputWriter.class.getCanonicalName())) {
String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName();
ignoreKey = job.getBoolean("stream.map.input.ignoreKey",
inputFormatClassName.equals(TextInputFormat.class.getCanonicalName()));
}
try {
mapOutputFieldSeparator = job.get("stream.map.output.field.separator", "\t").getBytes("UTF-8");
mapInputFieldSeparator = job.get("stream.map.input.field.separator", "\t").getBytes("UTF-8");
numOfMapOutputKeyFields = job.getInt("stream.num.map.output.key.fields", 1);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("The current system does not support UTF-8 encoding!", e);
}
}
public App(String[] args) throws Exception{
if(args.length>0) {
isReducer = Boolean.parseBoolean(args[0]);
}
String counter = SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS;
if(isReducer) {
counter = SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS;
}
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
String line;
int count = 0;
while ((line = in.readLine()) != null) {
processLine(line);
count++;
if(count>=10) {
System.err.println("reporter:counter:"+SkipBadRecords.COUNTER_GROUP+
","+counter+","+count);
count = 0;
}
}
}
@SuppressWarnings("unchecked")
public void configure(JobConf jobConf) {
int numberOfThreads =
jobConf.getInt("mapred.map.multithreadedrunner.threads", 10);
if (LOG.isDebugEnabled()) {
LOG.debug("Configuring jobConf " + jobConf.getJobName() +
" to use " + numberOfThreads + " threads");
}
this.job = jobConf;
//increment processed counter only if skipping feature is enabled
this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 &&
SkipBadRecords.getAutoIncrMapperProcCount(job);
this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(),
jobConf);
// Creating a threadpool of the configured size to execute the Mapper
// map method in parallel.
executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads,
0L, TimeUnit.MILLISECONDS,
new BlockingArrayQueue
(numberOfThreads));
}
public void configure(JobConf job) {
super.configure(job);
//disable the auto increment of the counter. For streaming, no of
//processed records could be different(equal or less) than the no of
//records input.
SkipBadRecords.setAutoIncrReducerProcCount(job, false);
skipping = job.getBoolean("mapred.skip.on", false);
try {
reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8");
reduceInputFieldSeparator = job_.get("stream.reduce.input.field.separator", "\t").getBytes("UTF-8");
this.numOfReduceOutputKeyFields = job_.getInt("stream.num.reduce.output.key.fields", 1);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("The current system does not support UTF-8 encoding!", e);
}
}
public void configure(JobConf job) {
super.configure(job);
//disable the auto increment of the counter. For streaming, no of
//processed records could be different(equal or less) than the no of
//records input.
SkipBadRecords.setAutoIncrMapperProcCount(job, false);
skipping = job.getBoolean("mapred.skip.on", false);
String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName();
ignoreKey = inputFormatClassName.equals(TextInputFormat.class.getCanonicalName());
try {
mapOutputFieldSeparator = job.get("stream.map.output.field.separator", "\t").getBytes("UTF-8");
mapInputFieldSeparator = job.get("stream.map.input.field.separator", "\t").getBytes("UTF-8");
numOfMapOutputKeyFields = job.getInt("stream.num.map.output.key.fields", 1);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("The current system does not support UTF-8 encoding!", e);
}
}
public App(String[] args) throws Exception{
if(args.length>0) {
isReducer = Boolean.parseBoolean(args[0]);
}
String counter = SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS;
if(isReducer) {
counter = SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS;
}
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
String line;
int count = 0;
while ((line = in.readLine()) != null) {
processLine(line);
count++;
if(count>=10) {
System.err.println("reporter:counter:"+SkipBadRecords.COUNTER_GROUP+
","+counter+","+count);
count = 0;
}
}
}
@SuppressWarnings("unchecked")
public void configure(JobConf jobConf) {
int numberOfThreads =
jobConf.getInt("mapred.map.multithreadedrunner.threads", 10);
if (LOG.isDebugEnabled()) {
LOG.debug("Configuring jobConf " + jobConf.getJobName() +
" to use " + numberOfThreads + " threads");
}
this.job = jobConf;
//increment processed counter only if skipping feature is enabled
this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 &&
SkipBadRecords.getAutoIncrMapperProcCount(job);
this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(),
jobConf);
// Creating a threadpool of the configured size to execute the Mapper
// map method in parallel.
executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads,
0L, TimeUnit.MILLISECONDS,
new BlockingArrayQueue
(numberOfThreads));
}
public void configure(JobConf job) {
super.configure(job);
//disable the auto increment of the counter. For streaming, no of
//processed records could be different(equal or less) than the no of
//records input.
SkipBadRecords.setAutoIncrReducerProcCount(job, false);
skipping = job.getBoolean("mapred.skip.on", false);
try {
reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8");
reduceInputFieldSeparator = job_.get("stream.reduce.input.field.separator", "\t").getBytes("UTF-8");
this.numOfReduceOutputKeyFields = job_.getInt("stream.num.reduce.output.key.fields", 1);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("The current system does not support UTF-8 encoding!", e);
}
}
public void configure(JobConf job) {
super.configure(job);
//disable the auto increment of the counter. For streaming, no of
//processed records could be different(equal or less) than the no of
//records input.
SkipBadRecords.setAutoIncrMapperProcCount(job, false);
skipping = job.getBoolean("mapred.skip.on", false);
String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName();
ignoreKey = inputFormatClassName.equals(TextInputFormat.class.getCanonicalName());
try {
mapOutputFieldSeparator = job.get("stream.map.output.field.separator", "\t").getBytes("UTF-8");
mapInputFieldSeparator = job.get("stream.map.input.field.separator", "\t").getBytes("UTF-8");
numOfMapOutputKeyFields = job.getInt("stream.num.map.output.key.fields", 1);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("The current system does not support UTF-8 encoding!", e);
}
}
public App(String[] args) throws Exception{
if(args.length>0) {
isReducer = Boolean.parseBoolean(args[0]);
}
String counter = SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS;
if(isReducer) {
counter = SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS;
}
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
String line;
int count = 0;
while ((line = in.readLine()) != null) {
processLine(line);
count++;
if(count>=10) {
System.err.println("reporter:counter:"+SkipBadRecords.COUNTER_GROUP+
","+counter+","+count);
count = 0;
}
}
}
public void configure(JobConf job) {
this.job = job;
//disable the auto increment of the counter. For pipes, no of processed
//records could be different(equal or less) than the no of records input.
SkipBadRecords.setAutoIncrReducerProcCount(job, false);
skipping = job.getBoolean(MRJobConfig.SKIP_RECORDS, false);
}
/**
* Get the new configuration.
* @param job the job's configuration
*/
public void configure(JobConf job) {
this.job = job;
//disable the auto increment of the counter. For pipes, no of processed
//records could be different(equal or less) than the no of records input.
SkipBadRecords.setAutoIncrMapperProcCount(job, false);
}
public void configure(JobConf job) {
this.job = job;
//disable the auto increment of the counter. For pipes, no of processed
//records could be different(equal or less) than the no of records input.
SkipBadRecords.setAutoIncrReducerProcCount(job, false);
skipping = job.getBoolean(MRJobConfig.SKIP_RECORDS, false);
}
/**
* Get the new configuration.
* @param job the job's configuration
*/
public void configure(JobConf job) {
this.job = job;
//disable the auto increment of the counter. For pipes, no of processed
//records could be different(equal or less) than the no of records input.
SkipBadRecords.setAutoIncrMapperProcCount(job, false);
}
public void configure(JobConf job) {
this.job = job;
//disable the auto increment of the counter. For pipes, no of processed
//records could be different(equal or less) than the no of records input.
SkipBadRecords.setAutoIncrReducerProcCount(job, false);
skipping = job.getBoolean("mapred.skip.on", false);
}
/**
* Get the new configuration.
* @param job the job's configuration
*/
public void configure(JobConf job) {
this.job = job;
//disable the auto increment of the counter. For pipes, no of processed
//records could be different(equal or less) than the no of records input.
SkipBadRecords.setAutoIncrMapperProcCount(job, false);
}
public void testSkip() throws Exception {
JobConf clusterConf = createJobConf();
createInput();
int attSkip =0;
SkipBadRecords.setAttemptsToStartSkipping(clusterConf,attSkip);
//the no of attempts to successfully complete the task depends
//on the no of bad records.
int mapperAttempts = attSkip+1+MAPPER_BAD_RECORDS.size();
int reducerAttempts = attSkip+1+REDUCER_BAD_RECORDS.size();
String[] args = new String[] {
"-input", (new Path(getInputDir(), "text.txt")).toString(),
"-output", getOutputDir().toString(),
"-mapper", badMapper,
"-reducer", badReducer,
"-verbose",
"-inputformat", "org.apache.hadoop.mapred.KeyValueTextInputFormat",
"-jobconf", "mapred.skip.attempts.to.start.skipping="+attSkip,
"-jobconf", "mapred.skip.out.dir=none",
"-jobconf", "mapred.map.max.attempts="+mapperAttempts,
"-jobconf", "mapred.reduce.max.attempts="+reducerAttempts,
"-jobconf", "mapred.skip.map.max.skip.records="+Long.MAX_VALUE,
"-jobconf", "mapred.skip.reduce.max.skip.groups="+Long.MAX_VALUE,
"-jobconf", "mapred.map.tasks=1",
"-jobconf", "mapred.reduce.tasks=1",
"-jobconf", "fs.default.name="+clusterConf.get("fs.default.name"),
"-jobconf", "mapred.job.tracker="+clusterConf.get("mapred.job.tracker"),
"-jobconf", "mapred.job.tracker.http.address="
+clusterConf.get("mapred.job.tracker.http.address"),
"-jobconf", "stream.debug=set",
"-jobconf", "keep.failed.task.files=true",
"-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
};
StreamJob job = new StreamJob(args, false);
job.go();
validateOutput(job.running_, false);
//validate that there is no skip directory as it has been set to "none"
assertTrue(SkipBadRecords.getSkipOutputPath(job.jobConf_)==null);
}
public void testNarrowDown() throws Exception {
createInput();
JobConf clusterConf = createJobConf();
String[] args = new String[] {
"-input", (new Path(getInputDir(), "text.txt")).toString(),
"-output", getOutputDir().toString(),
"-mapper", badMapper,
"-reducer", badReducer,
"-verbose",
"-inputformat", "org.apache.hadoop.mapred.KeyValueTextInputFormat",
"-jobconf", "mapred.skip.attempts.to.start.skipping=1",
//actually fewer attempts are required than specified
//but to cater to the case of slow processed counter update, need to
//have more attempts
"-jobconf", "mapred.map.max.attempts=20",
"-jobconf", "mapred.reduce.max.attempts=15",
"-jobconf", "mapred.skip.map.max.skip.records=1",
"-jobconf", "mapred.skip.reduce.max.skip.groups=1",
"-jobconf", "mapred.map.tasks=1",
"-jobconf", "mapred.reduce.tasks=1",
"-jobconf", "fs.default.name="+clusterConf.get("fs.default.name"),
"-jobconf", "mapred.job.tracker="+clusterConf.get("mapred.job.tracker"),
"-jobconf", "mapred.job.tracker.http.address="
+clusterConf.get("mapred.job.tracker.http.address"),
"-jobconf", "stream.debug=set",
"-jobconf", "keep.failed.task.files=true",
"-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
};
StreamJob job = new StreamJob(args, false);
job.go();
validateOutput(job.running_, true);
assertTrue(SkipBadRecords.getSkipOutputPath(job.jobConf_)!=null);
}
/**
* Get the new configuration.
* @param job the job's configuration
*/
public void configure(JobConf job) {
this.job = job;
//disable the auto increment of the counter. For pipes, no of processed
//records could be different(equal or less) than the no of records input.
SkipBadRecords.setAutoIncrMapperProcCount(job, false);
}
public void configure(JobConf job) {
this.job = job;
//disable the auto increment of the counter. For pipes, no of processed
//records could be different(equal or less) than the no of records input.
SkipBadRecords.setAutoIncrReducerProcCount(job, false);
skipping = job.getBoolean("mapred.skip.on", false);
}
/**
* Get the new configuration.
* @param job the job's configuration
*/
public void configure(JobConf job) {
this.job = job;
//disable the auto increment of the counter. For pipes, no of processed
//records could be different(equal or less) than the no of records input.
SkipBadRecords.setAutoIncrMapperProcCount(job, false);
}
public void testSkip() throws Exception {
JobConf clusterConf = createJobConf();
createInput();
int attSkip =0;
SkipBadRecords.setAttemptsToStartSkipping(clusterConf,attSkip);
//the no of attempts to successfully complete the task depends
//on the no of bad records.
int mapperAttempts = attSkip+1+MAPPER_BAD_RECORDS.size();
int reducerAttempts = attSkip+1+REDUCER_BAD_RECORDS.size();
String[] args = new String[] {
"-input", (new Path(getInputDir(), "text.txt")).toString(),
"-output", getOutputDir().toString(),
"-mapper", badMapper,
"-reducer", badReducer,
"-verbose",
"-inputformat", "org.apache.hadoop.mapred.KeyValueTextInputFormat",
"-jobconf", "mapred.skip.attempts.to.start.skipping="+attSkip,
"-jobconf", "mapred.skip.out.dir=none",
"-jobconf", "mapred.map.max.attempts="+mapperAttempts,
"-jobconf", "mapred.reduce.max.attempts="+reducerAttempts,
"-jobconf", "mapred.skip.map.max.skip.records="+Long.MAX_VALUE,
"-jobconf", "mapred.skip.reduce.max.skip.groups="+Long.MAX_VALUE,
"-jobconf", "mapred.map.tasks=1",
"-jobconf", "mapred.reduce.tasks=1",
"-jobconf", "fs.default.name="+clusterConf.get("fs.default.name"),
"-jobconf", "mapred.job.tracker="+clusterConf.get("mapred.job.tracker"),
"-jobconf", "mapred.job.tracker.http.address="
+clusterConf.get("mapred.job.tracker.http.address"),
"-jobconf", "stream.debug=set",
"-jobconf", "keep.failed.task.files=true",
"-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
};
StreamJob job = new StreamJob(args, false);
job.go();
validateOutput(job.running_, false);
//validate that there is no skip directory as it has been set to "none"
assertTrue(SkipBadRecords.getSkipOutputPath(job.jobConf_)==null);
}
public void testNarrowDown() throws Exception {
createInput();
JobConf clusterConf = createJobConf();
String[] args = new String[] {
"-input", (new Path(getInputDir(), "text.txt")).toString(),
"-output", getOutputDir().toString(),
"-mapper", badMapper,
"-reducer", badReducer,
"-verbose",
"-inputformat", "org.apache.hadoop.mapred.KeyValueTextInputFormat",
"-jobconf", "mapred.skip.attempts.to.start.skipping=1",
//actually fewer attempts are required than specified
//but to cater to the case of slow processed counter update, need to
//have more attempts
"-jobconf", "mapred.map.max.attempts=20",
"-jobconf", "mapred.reduce.max.attempts=15",
"-jobconf", "mapred.skip.map.max.skip.records=1",
"-jobconf", "mapred.skip.reduce.max.skip.groups=1",
"-jobconf", "mapred.map.tasks=1",
"-jobconf", "mapred.reduce.tasks=1",
"-jobconf", "fs.default.name="+clusterConf.get("fs.default.name"),
"-jobconf", "mapred.job.tracker="+clusterConf.get("mapred.job.tracker"),
"-jobconf", "mapred.job.tracker.http.address="
+clusterConf.get("mapred.job.tracker.http.address"),
"-jobconf", "stream.debug=set",
"-jobconf", "keep.failed.task.files=true",
"-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
};
StreamJob job = new StreamJob(args, false);
job.go();
validateOutput(job.running_, true);
assertTrue(SkipBadRecords.getSkipOutputPath(job.jobConf_)!=null);
}