下面列出了怎么用org.apache.hadoop.io.Stringifier的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Creates a {@link JobConf} for one of the Maps or Reduce in the chain.
* <p/>
* It creates a new JobConf using the chain job's JobConf as base and adds to
* it the configuration properties for the chain element. The keys of the
* chain element jobConf have precedence over the given JobConf.
*
* @param jobConf the chain job's JobConf.
* @param confKey the key for chain element configuration serialized in the
* chain job's JobConf.
* @return a new JobConf aggregating the chain job's JobConf with the chain
* element configuration properties.
*/
private static JobConf getChainElementConf(JobConf jobConf, String confKey) {
JobConf conf;
try {
Stringifier<JobConf> stringifier =
new DefaultStringifier<JobConf>(jobConf, JobConf.class);
conf = stringifier.fromString(jobConf.get(confKey, null));
} catch (IOException ioex) {
throw new RuntimeException(ioex);
}
// we have to do this because the Writable desearialization clears all
// values set in the conf making not possible do do a new JobConf(jobConf)
// in the creation of the conf above
jobConf = new JobConf(jobConf);
for(Map.Entry<String, String> entry : conf) {
jobConf.set(entry.getKey(), entry.getValue());
}
return jobConf;
}
/**
* Creates a {@link JobConf} for one of the Maps or Reduce in the chain.
* <p/>
* It creates a new JobConf using the chain job's JobConf as base and adds to
* it the configuration properties for the chain element. The keys of the
* chain element jobConf have precedence over the given JobConf.
*
* @param jobConf the chain job's JobConf.
* @param confKey the key for chain element configuration serialized in the
* chain job's JobConf.
* @return a new JobConf aggregating the chain job's JobConf with the chain
* element configuration properties.
*/
private static JobConf getChainElementConf(JobConf jobConf, String confKey) {
JobConf conf;
try {
Stringifier<JobConf> stringifier =
new DefaultStringifier<JobConf>(jobConf, JobConf.class);
conf = stringifier.fromString(jobConf.get(confKey, null));
} catch (IOException ioex) {
throw new RuntimeException(ioex);
}
// we have to do this because the Writable desearialization clears all
// values set in the conf making not possible do do a new JobConf(jobConf)
// in the creation of the conf above
jobConf = new JobConf(jobConf);
for(Map.Entry<String, String> entry : conf) {
jobConf.set(entry.getKey(), entry.getValue());
}
return jobConf;
}
/**
* Sets the Reducer class to the chain job's JobConf.
* <p/>
* The configuration properties of the chain job have precedence over the
* configuration properties of the Reducer.
*
* @param jobConf chain job's JobConf to add the Reducer class.
* @param klass the Reducer class to add.
* @param inputKeyClass reducer input key class.
* @param inputValueClass reducer input value class.
* @param outputKeyClass reducer output key class.
* @param outputValueClass reducer output value class.
* @param byValue indicates if key/values should be passed by value
* to the next Mapper in the chain, if any.
* @param reducerConf a JobConf with the configuration for the Reducer
* class. It is recommended to use a JobConf without default values using the
* <code>JobConf(boolean loadDefaults)</code> constructor with FALSE.
*/
public static <K1, V1, K2, V2> void setReducer(JobConf jobConf,
Class<? extends Reducer<K1, V1, K2, V2>> klass,
Class<? extends K1> inputKeyClass,
Class<? extends V1> inputValueClass,
Class<? extends K2> outputKeyClass,
Class<? extends V2> outputValueClass,
boolean byValue, JobConf reducerConf) {
String prefix = getPrefix(false);
if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null) != null) {
throw new IllegalStateException("Reducer has been already set");
}
jobConf.setClass(prefix + CHAIN_REDUCER_CLASS, klass, Reducer.class);
// if the Reducer does not have a private JobConf create an empty one
if (reducerConf == null) {
// using a JobConf without defaults to make it lightweight.
// still the chain JobConf may have all defaults and this conf is
// overlapped to the chain JobConf one.
reducerConf = new JobConf(false);
}
// store in the private reducer conf the input/output classes of the reducer
// and if it works by value or by reference
reducerConf.setBoolean(MAPPER_BY_VALUE, byValue);
reducerConf.setClass(REDUCER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
reducerConf.setClass(REDUCER_INPUT_VALUE_CLASS, inputValueClass,
Object.class);
reducerConf.setClass(REDUCER_OUTPUT_KEY_CLASS, outputKeyClass,
Object.class);
reducerConf.setClass(REDUCER_OUTPUT_VALUE_CLASS, outputValueClass,
Object.class);
// serialize the private mapper jobconf in the chain jobconf.
Stringifier<JobConf> stringifier =
new DefaultStringifier<JobConf>(jobConf, JobConf.class);
try {
jobConf.set(prefix + CHAIN_REDUCER_CONFIG,
stringifier.toString(new JobConf(reducerConf)));
}
catch (IOException ioEx) {
throw new RuntimeException(ioEx);
}
}
/**
* Sets the Reducer class to the chain job's JobConf.
* <p/>
* The configuration properties of the chain job have precedence over the
* configuration properties of the Reducer.
*
* @param jobConf chain job's JobConf to add the Reducer class.
* @param klass the Reducer class to add.
* @param inputKeyClass reducer input key class.
* @param inputValueClass reducer input value class.
* @param outputKeyClass reducer output key class.
* @param outputValueClass reducer output value class.
* @param byValue indicates if key/values should be passed by value
* to the next Mapper in the chain, if any.
* @param reducerConf a JobConf with the configuration for the Reducer
* class. It is recommended to use a JobConf without default values using the
* <code>JobConf(boolean loadDefaults)</code> constructor with FALSE.
*/
public static <K1, V1, K2, V2> void setReducer(JobConf jobConf,
Class<? extends Reducer<K1, V1, K2, V2>> klass,
Class<? extends K1> inputKeyClass,
Class<? extends V1> inputValueClass,
Class<? extends K2> outputKeyClass,
Class<? extends V2> outputValueClass,
boolean byValue, JobConf reducerConf) {
String prefix = getPrefix(false);
if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null) != null) {
throw new IllegalStateException("Reducer has been already set");
}
jobConf.setClass(prefix + CHAIN_REDUCER_CLASS, klass, Reducer.class);
// if the Reducer does not have a private JobConf create an empty one
if (reducerConf == null) {
// using a JobConf without defaults to make it lightweight.
// still the chain JobConf may have all defaults and this conf is
// overlapped to the chain JobConf one.
reducerConf = new JobConf(false);
}
// store in the private reducer conf the input/output classes of the reducer
// and if it works by value or by reference
reducerConf.setBoolean(MAPPER_BY_VALUE, byValue);
reducerConf.setClass(REDUCER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
reducerConf.setClass(REDUCER_INPUT_VALUE_CLASS, inputValueClass,
Object.class);
reducerConf.setClass(REDUCER_OUTPUT_KEY_CLASS, outputKeyClass,
Object.class);
reducerConf.setClass(REDUCER_OUTPUT_VALUE_CLASS, outputValueClass,
Object.class);
// serialize the private mapper jobconf in the chain jobconf.
Stringifier<JobConf> stringifier =
new DefaultStringifier<JobConf>(jobConf, JobConf.class);
try {
jobConf.set(prefix + CHAIN_REDUCER_CONFIG,
stringifier.toString(new JobConf(reducerConf)));
}
catch (IOException ioEx) {
throw new RuntimeException(ioEx);
}
}