下面列出了org.apache.hadoop.io.Stringifier#org.apache.hadoop.io.DefaultStringifier 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void configureGenericRecordExportInputFormat(Job job, String tableName)
throws IOException {
ConnManager connManager = context.getConnManager();
Map<String, Integer> columnTypeInts;
if (options.getCall() == null) {
columnTypeInts = connManager.getColumnTypes(
tableName,
options.getSqlQuery());
} else {
columnTypeInts = connManager.getColumnTypesForProcedure(
options.getCall());
}
String[] specifiedColumns = options.getColumns();
MapWritable columnTypes = new MapWritable();
for (Map.Entry<String, Integer> e : columnTypeInts.entrySet()) {
String column = e.getKey();
column = (specifiedColumns == null) ? column : options.getColumnNameCaseInsensitive(column);
if (column != null) {
Text columnName = new Text(column);
Text columnType = new Text(connManager.toJavaType(tableName, column, e.getValue()));
columnTypes.put(columnName, columnType);
}
}
DefaultStringifier.store(job.getConfiguration(), columnTypes,
AvroExportMapper.AVRO_COLUMN_TYPES_MAP);
}
protected LinkedMapWritable getForestStatusMap(Configuration conf)
throws IOException {
String forestHost = conf.get(OUTPUT_FOREST_HOST);
if (forestHost != null) {
//Restores the object from the configuration.
LinkedMapWritable fhmap = DefaultStringifier.load(conf, OUTPUT_FOREST_HOST,
LinkedMapWritable.class);
// must be in fast load mode, otherwise won't reach here
String s = conf.get(ASSIGNMENT_POLICY);
//EXECUTION_MODE must have a value in mlcp;
//default is "distributed" in hadoop connector
String mode = conf.get(EXECUTION_MODE, MODE_DISTRIBUTED);
if (MODE_DISTRIBUTED.equals(mode)) {
AssignmentPolicy.Kind policy =
AssignmentPolicy.Kind.forName(s);
am.initialize(policy, fhmap, conf.getInt(BATCH_SIZE, 10));
}
return fhmap;
} else {
throw new IOException("Forest host map not found");
}
}
@Override
public void checkOutputSpecs(Configuration conf, ContentSource cs)
throws IOException {
// check for required configuration
if (conf.get(OUTPUT_QUERY) == null) {
throw new IllegalArgumentException(OUTPUT_QUERY +
" is not specified.");
}
// warn against unsupported configuration
if (conf.get(BATCH_SIZE) != null) {
LOG.warn("Config entry for " +
"\"mapreduce.marklogic.output.batchsize\" is not " +
"supported for " + this.getClass().getName() +
" and will be ignored.");
}
String queryLanguage = conf.get(OUTPUT_QUERY_LANGUAGE);
if (queryLanguage != null) {
InternalUtilities.checkQueryLanguage(queryLanguage);
}
// store hosts into config system
DefaultStringifier.store(conf, queryHosts(cs), OUTPUT_FOREST_HOST);
}
/**
* Creates a {@link JobConf} for one of the Maps or Reduce in the chain.
* <p/>
* It creates a new JobConf using the chain job's JobConf as base and adds to
* it the configuration properties for the chain element. The keys of the
* chain element jobConf have precedence over the given JobConf.
*
* @param jobConf the chain job's JobConf.
* @param confKey the key for chain element configuration serialized in the
* chain job's JobConf.
* @return a new JobConf aggregating the chain job's JobConf with the chain
* element configuration properties.
*/
private static JobConf getChainElementConf(JobConf jobConf, String confKey) {
JobConf conf;
try {
Stringifier<JobConf> stringifier =
new DefaultStringifier<JobConf>(jobConf, JobConf.class);
conf = stringifier.fromString(jobConf.get(confKey, null));
} catch (IOException ioex) {
throw new RuntimeException(ioex);
}
// we have to do this because the Writable desearialization clears all
// values set in the conf making not possible do do a new JobConf(jobConf)
// in the creation of the conf above
jobConf = new JobConf(jobConf);
for(Map.Entry<String, String> entry : conf) {
jobConf.set(entry.getKey(), entry.getValue());
}
return jobConf;
}
/**
* Creates a {@link JobConf} for one of the Maps or Reduce in the chain.
* <p/>
* It creates a new JobConf using the chain job's JobConf as base and adds to
* it the configuration properties for the chain element. The keys of the
* chain element jobConf have precedence over the given JobConf.
*
* @param jobConf the chain job's JobConf.
* @param confKey the key for chain element configuration serialized in the
* chain job's JobConf.
* @return a new JobConf aggregating the chain job's JobConf with the chain
* element configuration properties.
*/
private static JobConf getChainElementConf(JobConf jobConf, String confKey) {
JobConf conf;
try {
Stringifier<JobConf> stringifier =
new DefaultStringifier<JobConf>(jobConf, JobConf.class);
conf = stringifier.fromString(jobConf.get(confKey, null));
} catch (IOException ioex) {
throw new RuntimeException(ioex);
}
// we have to do this because the Writable desearialization clears all
// values set in the conf making not possible do do a new JobConf(jobConf)
// in the creation of the conf above
jobConf = new JobConf(jobConf);
for(Map.Entry<String, String> entry : conf) {
jobConf.set(entry.getKey(), entry.getValue());
}
return jobConf;
}
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
// Instantiate a copy of the user's class to hold and parse the record.
String recordClassName = conf.get(
ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
if (null == recordClassName) {
throw new IOException("Export table class name ("
+ ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
+ ") is not set!");
}
try {
Class cls = Class.forName(recordClassName, true,
Thread.currentThread().getContextClassLoader());
recordImpl = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
}
if (null == recordImpl) {
throw new IOException("Could not instantiate object of type "
+ recordClassName);
}
columnTypes = DefaultStringifier.load(conf, AVRO_COLUMN_TYPES_MAP,
MapWritable.class);
}
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
// Instantiate a copy of the user's class to hold and parse the record.
String recordClassName = conf.get(
ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
if (null == recordClassName) {
throw new IOException("Export table class name ("
+ ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
+ ") is not set!");
}
try {
Class cls = Class.forName(recordClassName, true,
Thread.currentThread().getContextClassLoader());
recordImpl = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
}
if (null == recordImpl) {
throw new IOException("Could not instantiate object of type "
+ recordClassName);
}
columnTypes = DefaultStringifier.load(conf, AVRO_COLUMN_TYPES_MAP,
MapWritable.class);
}
@Test
public void testAvroWithNoColumnsSpecified() throws Exception {
SqoopOptions opts = new SqoopOptions();
opts.setExportDir("myexportdir");
JdbcExportJob jdbcExportJob = stubJdbcExportJob(opts, FileType.AVRO_DATA_FILE);
Job job = new Job();
jdbcExportJob.configureInputFormat(job, null, null, null);
assertEquals(asSetOfText("Age", "Name", "Gender"), DefaultStringifier.load(job.getConfiguration(), AvroExportMapper.AVRO_COLUMN_TYPES_MAP, MapWritable.class).keySet());
}
@Test
public void testAvroWithAllColumnsSpecified() throws Exception {
SqoopOptions opts = new SqoopOptions();
opts.setExportDir("myexportdir");
String[] columns = { "Age", "Name", "Gender" };
opts.setColumns(columns);
JdbcExportJob jdbcExportJob = stubJdbcExportJob(opts, FileType.AVRO_DATA_FILE);
Job job = new Job();
jdbcExportJob.configureInputFormat(job, null, null, null);
assertEquals(asSetOfText("Age", "Name", "Gender"), DefaultStringifier.load(job.getConfiguration(), AvroExportMapper.AVRO_COLUMN_TYPES_MAP, MapWritable.class).keySet());
}
@Test
public void testAvroWithOneColumnSpecified() throws Exception {
SqoopOptions opts = new SqoopOptions();
opts.setExportDir("myexportdir");
String[] columns = { "Gender" };
opts.setColumns(columns);
JdbcExportJob jdbcExportJob = stubJdbcExportJob(opts, FileType.AVRO_DATA_FILE);
Job job = new Job();
jdbcExportJob.configureInputFormat(job, null, null, null);
assertEquals(asSetOfText("Gender"), DefaultStringifier.load(job.getConfiguration(), AvroExportMapper.AVRO_COLUMN_TYPES_MAP, MapWritable.class).keySet());
}
@Test
public void testAvroWithSomeColumnsSpecified() throws Exception {
SqoopOptions opts = new SqoopOptions();
opts.setExportDir("myexportdir");
String[] columns = { "Age", "Name" };
opts.setColumns(columns);
JdbcExportJob jdbcExportJob = stubJdbcExportJob(opts, FileType.AVRO_DATA_FILE);
Job job = new Job();
jdbcExportJob.configureInputFormat(job, null, null, null);
assertEquals(asSetOfText("Age", "Name"), DefaultStringifier.load(job.getConfiguration(), AvroExportMapper.AVRO_COLUMN_TYPES_MAP, MapWritable.class).keySet());
}
@Test
public void testAvroWithMoreColumnsSpecified() throws Exception {
SqoopOptions opts = new SqoopOptions();
opts.setExportDir("myexportdir");
String[] columns = { "Age", "Name", "Gender", "Address" };
opts.setColumns(columns);
JdbcExportJob jdbcExportJob = stubJdbcExportJob(opts, FileType.AVRO_DATA_FILE);
Job job = new Job();
jdbcExportJob.configureInputFormat(job, null, null, null);
assertEquals(asSetOfText("Age", "Name", "Gender"), DefaultStringifier.load(job.getConfiguration(), AvroExportMapper.AVRO_COLUMN_TYPES_MAP, MapWritable.class).keySet());
}
@Override
public void checkOutputSpecs(Configuration conf, ContentSource cs)
throws IOException {
super.checkOutputSpecs(conf, cs);
// store mimetypes map into config system
DefaultStringifier.store(conf, getMimetypesMap(),
ConfigConstants.CONF_MIMETYPES);
}
protected LinkedMapWritable getRoleMap(TaskAttemptContext context) throws IOException{
//Restores the object from the configuration.
Configuration conf = context.getConfiguration();
LinkedMapWritable fhmap = null;
if(conf.get(ConfigConstants.CONF_ROLE_MAP)!=null) {
fhmap = DefaultStringifier.load(conf, ConfigConstants.CONF_ROLE_MAP,
LinkedMapWritable.class);
}
return fhmap;
}
protected String getServerVersion(TaskAttemptContext context) throws IOException{
//Restores the object from the configuration.
Configuration conf = context.getConfiguration();
Text version = DefaultStringifier.load(conf, ConfigConstants.CONF_ML_VERSION,
Text.class);
return version.toString();
}
@Override
public void checkOutputSpecs(Configuration conf, ContentSource cs)
throws IOException {
// warn against unsupported configuration
if (conf.get(BATCH_SIZE) != null) {
LOG.warn("Config entry for " +
"\"mapreduce.marklogic.output.batchsize\" is not " +
"supported for " + this.getClass().getName() +
" and will be ignored.");
}
// store hosts into config system
DefaultStringifier.store(conf, queryHosts(cs), OUTPUT_FOREST_HOST);
}
protected TextArrayWritable getHosts(Configuration conf) throws IOException {
String forestHost = conf.get(OUTPUT_FOREST_HOST);
if (forestHost != null) {
// Restores the object from the configuration.
TextArrayWritable hosts = DefaultStringifier.load(conf,
OUTPUT_FOREST_HOST, TextArrayWritable.class);
return hosts;
} else {
throw new IOException("Forest host map not found");
}
}
@Override
public void checkOutputSpecs(Configuration conf, ContentSource cs)
throws IOException {
// warn against unsupported configuration
if (conf.get(BATCH_SIZE) != null) {
LOG.warn("Config entry for " +
"\"mapreduce.marklogic.output.batchsize\" is not " +
"supported for " + this.getClass().getName() +
" and will be ignored.");
}
// store hosts into config system
DefaultStringifier.store(conf, queryHosts(cs), OUTPUT_FOREST_HOST);
}
public SqoopHCatExportHelper(Configuration conf, boolean isOdps)
throws IOException, InterruptedException {
this.isOdps = isOdps;
if (!isOdps) {
colTypesJava =
DefaultStringifier.load(conf, SqoopHCatUtilities.HCAT_DB_OUTPUT_COLTYPES_JAVA,
MapWritable.class);
colTypesSql =
DefaultStringifier.load(conf, SqoopHCatUtilities.HCAT_DB_OUTPUT_COLTYPES_SQL,
MapWritable.class);
}
// Instantiate a copy of the user's class to hold and parse the record.
String recordClassName = conf.get(
ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
if (null == recordClassName) {
throw new IOException("Export table class name ("
+ ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
+ ") is not set!");
}
bigDecimalFormatString = conf.getBoolean(
ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
debugHCatExportMapper = conf.getBoolean(
SqoopHCatUtilities.DEBUG_HCAT_EXPORT_MAPPER_PROP, false);
try {
Class<?> cls = Class.forName(recordClassName, true,
Thread.currentThread().getContextClassLoader());
sqoopRecord = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
}
if (null == sqoopRecord) {
throw new IOException("Could not instantiate object of type "
+ recordClassName);
}
String inputJobInfoStr = conf.get(HCatConstants.HCAT_KEY_JOB_INFO);
jobInfo =
(InputJobInfo) HCatUtil.deserialize(inputJobInfoStr);
HCatSchema tableSchema = jobInfo.getTableInfo().getDataColumns();
HCatSchema partitionSchema =
jobInfo.getTableInfo().getPartitionColumns();
hCatFullTableSchema = new HCatSchema(tableSchema.getFields());
for (HCatFieldSchema hfs : partitionSchema.getFields()) {
hCatFullTableSchema.append(hfs);
}
}
public static void configureImportOutputFormat(SqoopOptions opts, Job job,
ConnManager connMgr, String dbTable, Configuration config)
throws IOException {
LOG.info("Configuring HCatalog for import job");
SqoopHCatUtilities.instance().configureHCat(opts, job, connMgr, dbTable,
job.getConfiguration());
LOG.info("Validating dynamic partition keys");
SqoopHCatUtilities.instance().validateFieldAndColumnMappings();
SqoopHCatUtilities.instance().validateDynamicPartitionKeysMapping();
job.setOutputFormatClass(getOutputFormatClass());
IntWritable[] delimChars = new IntWritable[5];
String hiveReplacement = "";
LOG.debug("Hive delimiters will be fixed during import");
DelimiterSet delims = opts.getOutputDelimiters();
if (!opts.explicitOutputDelims()) {
delims = DelimiterSet.HIVE_DELIMITERS;
}
delimChars = new IntWritable[] {
new IntWritable(delims.getFieldsTerminatedBy()),
new IntWritable(delims.getLinesTerminatedBy()),
new IntWritable(delims.getEnclosedBy()),
new IntWritable(delims.getEscapedBy()),
new IntWritable(delims.isEncloseRequired() ? 1 : 0), };
hiveReplacement = opts.getHiveDelimsReplacement();
if (hiveReplacement == null) {
hiveReplacement = "";
}
LOG.debug("Setting hive delimiters information");
DefaultStringifier.storeArray(config, delimChars,
HIVE_DELIMITERS_TO_REPLACE_PROP);
config.set(HIVE_DELIMITERS_REPLACEMENT_PROP, hiveReplacement);
if (opts.doHiveDropDelims() || opts.getHiveDelimsReplacement() != null) {
LOG.debug("Enabling hive delimter replacement");
config.set(HIVE_DELIMITERS_REPLACEMENT_ENABLED_PROP, "true");
} else {
LOG.debug("Disabling hive delimter replacement");
config.set(HIVE_DELIMITERS_REPLACEMENT_ENABLED_PROP, "false");
}
}
public SqoopHCatImportHelper(Configuration conf) throws IOException,
InterruptedException {
String inputJobInfoStr = conf.get(HCatConstants.HCAT_KEY_JOB_INFO);
jobInfo = (InputJobInfo) HCatUtil.deserialize(inputJobInfoStr);
dataColsSchema = jobInfo.getTableInfo().getDataColumns();
partitionSchema = jobInfo.getTableInfo().getPartitionColumns();
StringBuilder storerInfoStr = new StringBuilder(1024);
StorerInfo storerInfo = jobInfo.getTableInfo().getStorerInfo();
storerInfoStr.append("HCatalog Storer Info : ").append("\n\tHandler = ")
.append(storerInfo.getStorageHandlerClass())
.append("\n\tInput format class = ").append(storerInfo.getIfClass())
.append("\n\tOutput format class = ").append(storerInfo.getOfClass())
.append("\n\tSerde class = ").append(storerInfo.getSerdeClass());
Properties storerProperties = storerInfo.getProperties();
if (!storerProperties.isEmpty()) {
storerInfoStr.append("\nStorer properties ");
for (Map.Entry<Object, Object> entry : storerProperties.entrySet()) {
String key = (String) entry.getKey();
Object val = entry.getValue();
storerInfoStr.append("\n\t").append(key).append('=').append(val);
}
}
storerInfoStr.append("\n");
LOG.info(storerInfoStr);
hCatFullTableSchema = new HCatSchema(dataColsSchema.getFields());
for (HCatFieldSchema hfs : partitionSchema.getFields()) {
hCatFullTableSchema.append(hfs);
}
fieldCount = hCatFullTableSchema.size();
lobLoader = new LargeObjectLoader(conf, new Path(jobInfo.getTableInfo()
.getTableLocation()));
bigDecimalFormatString = conf.getBoolean(
ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
debugHCatImportMapper = conf.getBoolean(
SqoopHCatUtilities.DEBUG_HCAT_IMPORT_MAPPER_PROP, false);
IntWritable[] delimChars = DefaultStringifier.loadArray(conf,
SqoopHCatUtilities.HIVE_DELIMITERS_TO_REPLACE_PROP, IntWritable.class);
hiveDelimiters = new DelimiterSet((char) delimChars[0].get(),
(char) delimChars[1].get(), (char) delimChars[2].get(),
(char) delimChars[3].get(), delimChars[4].get() == 1 ? true : false);
hiveDelimsReplacement = conf
.get(SqoopHCatUtilities.HIVE_DELIMITERS_REPLACEMENT_PROP);
if (hiveDelimsReplacement == null) {
hiveDelimsReplacement = "";
}
doHiveDelimsReplacement = Boolean.valueOf(conf
.get(SqoopHCatUtilities.HIVE_DELIMITERS_REPLACEMENT_ENABLED_PROP));
IntWritable[] fPos = DefaultStringifier.loadArray(conf,
SqoopHCatUtilities.HCAT_FIELD_POSITIONS_PROP, IntWritable.class);
hCatFieldPositions = new int[fPos.length];
for (int i = 0; i < fPos.length; ++i) {
hCatFieldPositions[i] = fPos[i].get();
}
LOG.debug("Hive delims replacement enabled : " + doHiveDelimsReplacement);
LOG.debug("Hive Delimiters : " + hiveDelimiters.toString());
LOG.debug("Hive delimiters replacement : " + hiveDelimsReplacement);
staticPartitionKeys = conf
.getStrings(SqoopHCatUtilities.HCAT_STATIC_PARTITION_KEY_PROP);
String partKeysString = staticPartitionKeys == null ? ""
: Arrays.toString(staticPartitionKeys);
LOG.debug("Static partition key used : " + partKeysString);
}
/**
* initialize mimetype map if not initialized, return the map
*
* @return
* @throws IOException
*/
private LinkedMapWritable getMimetypesMap() throws IOException {
if (mimetypeMap != null)
return mimetypeMap;
String mtmap = conf.get(ConfigConstants.CONF_MIMETYPES);
if (mtmap != null) {
mimetypeMap = DefaultStringifier.load(conf,
ConfigConstants.CONF_MIMETYPES, LinkedMapWritable.class);
return mimetypeMap;
}
String[] hosts = conf.getStrings(OUTPUT_HOST);
Session session = null;
ResultSequence result = null;
for (int i = 0; i < hosts.length; i++) {
try {
String host = hosts[i];
ContentSource cs = InternalUtilities.getOutputContentSource(conf,
host);
session = cs.newSession();
AdhocQuery query = session.newAdhocQuery(MIMETYPES_QUERY);
RequestOptions options = new RequestOptions();
options.setDefaultXQueryVersion("1.0-ml");
query.setOptions(options);
result = session.submitRequest(query);
if (!result.hasNext())
throw new IOException(
"Server-side transform requires MarkLogic 7 or later");
mimetypeMap = new LinkedMapWritable();
while (result.hasNext()) {
String suffs = result.next().asString();
Text format = new Text(result.next().asString());
// some extensions are in a space separated string
for (String s : suffs.split(" ")) {
Text suff = new Text(s);
mimetypeMap.put(suff, format);
}
}
return mimetypeMap;
} catch (Exception e) {
if (e.getCause() instanceof ServerConnectionException) {
LOG.warn("Unable to connect to " + hosts[i]
+ " to query destination information");
continue;
}
LOG.error(e.getMessage(), e);
throw new IOException(e);
} finally {
if (result != null) {
result.close();
}
if (session != null) {
session.close();
}
}
}
return null;
}
public List<InputSplit> getSplits(JobContext job) throws IOException {
boolean delimSplit = isSplitInput(job.getConfiguration());
//if delimSplit is true, size of each split is determined by
//Math.max(minSize, Math.min(maxSize, blockSize)) in FileInputFormat
List<InputSplit> splits = super.getSplits(job);
if (!delimSplit) {
return splits;
}
if (splits.size()>= SPLIT_COUNT_LIMIT) {
//if #splits > 1 million, there is enough parallelism
//therefore no point to split
LOG.warn("Exceeding SPLIT_COUNT_LIMIT, input_split is off:"
+ SPLIT_COUNT_LIMIT);
DefaultStringifier.store(job.getConfiguration(), false, ConfigConstants.CONF_SPLIT_INPUT);
return splits;
}
// add header info into splits
List<InputSplit> populatedSplits = new ArrayList<InputSplit>();
LOG.info(splits.size() + " DelimitedSplits generated");
Configuration conf = job.getConfiguration();
char delimiter =0;
ArrayList<Text> hlist = new ArrayList<Text>();
for (InputSplit file: splits) {
FileSplit fsplit = ((FileSplit)file);
Path path = fsplit.getPath();
FileSystem fs = path.getFileSystem(conf);
if (fsplit.getStart() == 0) {
// parse the inSplit, get the header
FSDataInputStream fileIn = fs.open(path);
String delimStr = conf.get(ConfigConstants.CONF_DELIMITER,
ConfigConstants.DEFAULT_DELIMITER);
if (delimStr.length() == 1) {
delimiter = delimStr.charAt(0);
} else {
LOG.error("Incorrect delimitor: " + delimiter
+ ". Expects single character.");
}
String encoding = conf.get(
MarkLogicConstants.OUTPUT_CONTENT_ENCODING,
MarkLogicConstants.DEFAULT_OUTPUT_CONTENT_ENCODING);
InputStreamReader instream = new InputStreamReader(fileIn, encoding);
CSVParser parser = new CSVParser(instream, CSVParserFormatter.
getFormat(delimiter, DelimitedTextReader.encapsulator,
true, true));
Iterator<CSVRecord> it = parser.iterator();
String[] header = null;
if (it.hasNext()) {
CSVRecord record = (CSVRecord)it.next();
Iterator<String> recordIterator = record.iterator();
int recordSize = record.size();
header = new String[recordSize];
for (int i = 0; i < recordSize; i++) {
if (recordIterator.hasNext()) {
header[i] = (String)recordIterator.next();
} else {
throw new IOException("Record size doesn't match the real size");
}
}
EncodingUtil.handleBOMUTF8(header, 0);
hlist.clear();
for (String s : header) {
hlist.add(new Text(s));
}
}
instream.close();
}
DelimitedSplit ds = new DelimitedSplit(new TextArrayWritable(
hlist.toArray(new Text[hlist.size()])), path,
fsplit.getStart(), fsplit.getLength(),
fsplit.getLocations());
populatedSplits.add(ds);
}
return populatedSplits;
}
/**
* Sets the Reducer class to the chain job's JobConf.
* <p/>
* The configuration properties of the chain job have precedence over the
* configuration properties of the Reducer.
*
* @param jobConf chain job's JobConf to add the Reducer class.
* @param klass the Reducer class to add.
* @param inputKeyClass reducer input key class.
* @param inputValueClass reducer input value class.
* @param outputKeyClass reducer output key class.
* @param outputValueClass reducer output value class.
* @param byValue indicates if key/values should be passed by value
* to the next Mapper in the chain, if any.
* @param reducerConf a JobConf with the configuration for the Reducer
* class. It is recommended to use a JobConf without default values using the
* <code>JobConf(boolean loadDefaults)</code> constructor with FALSE.
*/
public static <K1, V1, K2, V2> void setReducer(JobConf jobConf,
Class<? extends Reducer<K1, V1, K2, V2>> klass,
Class<? extends K1> inputKeyClass,
Class<? extends V1> inputValueClass,
Class<? extends K2> outputKeyClass,
Class<? extends V2> outputValueClass,
boolean byValue, JobConf reducerConf) {
String prefix = getPrefix(false);
if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null) != null) {
throw new IllegalStateException("Reducer has been already set");
}
jobConf.setClass(prefix + CHAIN_REDUCER_CLASS, klass, Reducer.class);
// if the Reducer does not have a private JobConf create an empty one
if (reducerConf == null) {
// using a JobConf without defaults to make it lightweight.
// still the chain JobConf may have all defaults and this conf is
// overlapped to the chain JobConf one.
reducerConf = new JobConf(false);
}
// store in the private reducer conf the input/output classes of the reducer
// and if it works by value or by reference
reducerConf.setBoolean(MAPPER_BY_VALUE, byValue);
reducerConf.setClass(REDUCER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
reducerConf.setClass(REDUCER_INPUT_VALUE_CLASS, inputValueClass,
Object.class);
reducerConf.setClass(REDUCER_OUTPUT_KEY_CLASS, outputKeyClass,
Object.class);
reducerConf.setClass(REDUCER_OUTPUT_VALUE_CLASS, outputValueClass,
Object.class);
// serialize the private mapper jobconf in the chain jobconf.
Stringifier<JobConf> stringifier =
new DefaultStringifier<JobConf>(jobConf, JobConf.class);
try {
jobConf.set(prefix + CHAIN_REDUCER_CONFIG,
stringifier.toString(new JobConf(reducerConf)));
}
catch (IOException ioEx) {
throw new RuntimeException(ioEx);
}
}
/**
* Sets the Reducer class to the chain job's JobConf.
* <p/>
* The configuration properties of the chain job have precedence over the
* configuration properties of the Reducer.
*
* @param jobConf chain job's JobConf to add the Reducer class.
* @param klass the Reducer class to add.
* @param inputKeyClass reducer input key class.
* @param inputValueClass reducer input value class.
* @param outputKeyClass reducer output key class.
* @param outputValueClass reducer output value class.
* @param byValue indicates if key/values should be passed by value
* to the next Mapper in the chain, if any.
* @param reducerConf a JobConf with the configuration for the Reducer
* class. It is recommended to use a JobConf without default values using the
* <code>JobConf(boolean loadDefaults)</code> constructor with FALSE.
*/
public static <K1, V1, K2, V2> void setReducer(JobConf jobConf,
Class<? extends Reducer<K1, V1, K2, V2>> klass,
Class<? extends K1> inputKeyClass,
Class<? extends V1> inputValueClass,
Class<? extends K2> outputKeyClass,
Class<? extends V2> outputValueClass,
boolean byValue, JobConf reducerConf) {
String prefix = getPrefix(false);
if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null) != null) {
throw new IllegalStateException("Reducer has been already set");
}
jobConf.setClass(prefix + CHAIN_REDUCER_CLASS, klass, Reducer.class);
// if the Reducer does not have a private JobConf create an empty one
if (reducerConf == null) {
// using a JobConf without defaults to make it lightweight.
// still the chain JobConf may have all defaults and this conf is
// overlapped to the chain JobConf one.
reducerConf = new JobConf(false);
}
// store in the private reducer conf the input/output classes of the reducer
// and if it works by value or by reference
reducerConf.setBoolean(MAPPER_BY_VALUE, byValue);
reducerConf.setClass(REDUCER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
reducerConf.setClass(REDUCER_INPUT_VALUE_CLASS, inputValueClass,
Object.class);
reducerConf.setClass(REDUCER_OUTPUT_KEY_CLASS, outputKeyClass,
Object.class);
reducerConf.setClass(REDUCER_OUTPUT_VALUE_CLASS, outputValueClass,
Object.class);
// serialize the private mapper jobconf in the chain jobconf.
Stringifier<JobConf> stringifier =
new DefaultStringifier<JobConf>(jobConf, JobConf.class);
try {
jobConf.set(prefix + CHAIN_REDUCER_CONFIG,
stringifier.toString(new JobConf(reducerConf)));
}
catch (IOException ioEx) {
throw new RuntimeException(ioEx);
}
}