org.apache.hadoop.io.Stringifier#org.apache.hadoop.io.DefaultStringifier源码实例Demo

下面列出了org.apache.hadoop.io.Stringifier#org.apache.hadoop.io.DefaultStringifier 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

private void configureGenericRecordExportInputFormat(Job job, String tableName)
    throws IOException {
  ConnManager connManager = context.getConnManager();
  Map<String, Integer> columnTypeInts;
  if (options.getCall() == null) {
    columnTypeInts = connManager.getColumnTypes(
        tableName,
        options.getSqlQuery());
  } else {
    columnTypeInts = connManager.getColumnTypesForProcedure(
        options.getCall());
  }
  String[] specifiedColumns = options.getColumns();
  MapWritable columnTypes = new MapWritable();
  for (Map.Entry<String, Integer> e : columnTypeInts.entrySet()) {
    String column = e.getKey();
    column = (specifiedColumns == null) ? column : options.getColumnNameCaseInsensitive(column);
    if (column != null) {
      Text columnName = new Text(column);
      Text columnType = new Text(connManager.toJavaType(tableName, column, e.getValue()));
      columnTypes.put(columnName, columnType);
    }
  }
  DefaultStringifier.store(job.getConfiguration(), columnTypes,
      AvroExportMapper.AVRO_COLUMN_TYPES_MAP);
}
 
protected LinkedMapWritable getForestStatusMap(Configuration conf) 
throws IOException {
    String forestHost = conf.get(OUTPUT_FOREST_HOST);
    if (forestHost != null) {
        //Restores the object from the configuration.
        LinkedMapWritable fhmap = DefaultStringifier.load(conf, OUTPUT_FOREST_HOST, 
            LinkedMapWritable.class);
        // must be in fast load mode, otherwise won't reach here
        String s = conf.get(ASSIGNMENT_POLICY);
        //EXECUTION_MODE must have a value in mlcp;
        //default is "distributed" in hadoop connector
        String mode = conf.get(EXECUTION_MODE, MODE_DISTRIBUTED);
        if (MODE_DISTRIBUTED.equals(mode)) {
        	AssignmentPolicy.Kind policy =
        			AssignmentPolicy.Kind.forName(s);
            am.initialize(policy, fhmap, conf.getInt(BATCH_SIZE, 10));
        }
        return fhmap;
    } else {
        throw new IOException("Forest host map not found");
    }
}
 
@Override
public void checkOutputSpecs(Configuration conf, ContentSource cs) 
throws IOException {
    // check for required configuration
    if (conf.get(OUTPUT_QUERY) == null) {
        throw new IllegalArgumentException(OUTPUT_QUERY + 
        " is not specified.");
    }
    // warn against unsupported configuration
    if (conf.get(BATCH_SIZE) != null) {
        LOG.warn("Config entry for " +
                "\"mapreduce.marklogic.output.batchsize\" is not " +
                "supported for " + this.getClass().getName() + 
                " and will be ignored.");
    }
    String queryLanguage = conf.get(OUTPUT_QUERY_LANGUAGE);
    if (queryLanguage != null) {
        InternalUtilities.checkQueryLanguage(queryLanguage);
    }
    // store hosts into config system
    DefaultStringifier.store(conf, queryHosts(cs), OUTPUT_FOREST_HOST);
}
 
源代码4 项目: RDFS   文件: Chain.java
/**
 * Creates a {@link JobConf} for one of the Maps or Reduce in the chain.
 * <p/>
 * It creates a new JobConf using the chain job's JobConf as base and adds to
 * it the configuration properties for the chain element. The keys of the
 * chain element jobConf have precedence over the given JobConf.
 *
 * @param jobConf the chain job's JobConf.
 * @param confKey the key for chain element configuration serialized in the
 *                chain job's JobConf.
 * @return a new JobConf aggregating the chain job's JobConf with the chain
 *         element configuration properties.
 */
private static JobConf getChainElementConf(JobConf jobConf, String confKey) {
  JobConf conf;
  try {
    Stringifier<JobConf> stringifier =
      new DefaultStringifier<JobConf>(jobConf, JobConf.class);
    conf = stringifier.fromString(jobConf.get(confKey, null));
  } catch (IOException ioex) {
    throw new RuntimeException(ioex);
  }
  // we have to do this because the Writable desearialization clears all
  // values set in the conf making not possible do do a new JobConf(jobConf)
  // in the creation of the conf above
  jobConf = new JobConf(jobConf);

  for(Map.Entry<String, String> entry : conf) {
    jobConf.set(entry.getKey(), entry.getValue());
  }
  return jobConf;
}
 
源代码5 项目: hadoop-gpu   文件: Chain.java
/**
 * Creates a {@link JobConf} for one of the Maps or Reduce in the chain.
 * <p/>
 * It creates a new JobConf using the chain job's JobConf as base and adds to
 * it the configuration properties for the chain element. The keys of the
 * chain element jobConf have precedence over the given JobConf.
 *
 * @param jobConf the chain job's JobConf.
 * @param confKey the key for chain element configuration serialized in the
 *                chain job's JobConf.
 * @return a new JobConf aggregating the chain job's JobConf with the chain
 *         element configuration properties.
 */
private static JobConf getChainElementConf(JobConf jobConf, String confKey) {
  JobConf conf;
  try {
    Stringifier<JobConf> stringifier =
      new DefaultStringifier<JobConf>(jobConf, JobConf.class);
    conf = stringifier.fromString(jobConf.get(confKey, null));
  } catch (IOException ioex) {
    throw new RuntimeException(ioex);
  }
  // we have to do this because the Writable desearialization clears all
  // values set in the conf making not possible do do a new JobConf(jobConf)
  // in the creation of the conf above
  jobConf = new JobConf(jobConf);

  for(Map.Entry<String, String> entry : conf) {
    jobConf.set(entry.getKey(), entry.getValue());
  }
  return jobConf;
}
 
@Override
protected void setup(Context context) throws IOException, InterruptedException {
  super.setup(context);

  Configuration conf = context.getConfiguration();

  // Instantiate a copy of the user's class to hold and parse the record.
  String recordClassName = conf.get(
      ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
  if (null == recordClassName) {
    throw new IOException("Export table class name ("
        + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
        + ") is not set!");
  }

  try {
    Class cls = Class.forName(recordClassName, true,
        Thread.currentThread().getContextClassLoader());
    recordImpl = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
  } catch (ClassNotFoundException cnfe) {
    throw new IOException(cnfe);
  }

  if (null == recordImpl) {
    throw new IOException("Could not instantiate object of type "
        + recordClassName);
  }

  columnTypes = DefaultStringifier.load(conf, AVRO_COLUMN_TYPES_MAP,
      MapWritable.class);
}
 
@Override
protected void setup(Context context) throws IOException, InterruptedException {
  super.setup(context);

  Configuration conf = context.getConfiguration();

  // Instantiate a copy of the user's class to hold and parse the record.
  String recordClassName = conf.get(
      ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
  if (null == recordClassName) {
    throw new IOException("Export table class name ("
        + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
        + ") is not set!");
  }

  try {
    Class cls = Class.forName(recordClassName, true,
        Thread.currentThread().getContextClassLoader());
    recordImpl = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
  } catch (ClassNotFoundException cnfe) {
    throw new IOException(cnfe);
  }

  if (null == recordImpl) {
    throw new IOException("Could not instantiate object of type "
        + recordClassName);
  }

  columnTypes = DefaultStringifier.load(conf, AVRO_COLUMN_TYPES_MAP,
      MapWritable.class);
}
 
@Test
public void testAvroWithNoColumnsSpecified() throws Exception {
  SqoopOptions opts = new SqoopOptions();
  opts.setExportDir("myexportdir");
  JdbcExportJob jdbcExportJob = stubJdbcExportJob(opts, FileType.AVRO_DATA_FILE);
  Job job = new Job();
  jdbcExportJob.configureInputFormat(job, null, null, null);
  assertEquals(asSetOfText("Age", "Name", "Gender"), DefaultStringifier.load(job.getConfiguration(), AvroExportMapper.AVRO_COLUMN_TYPES_MAP, MapWritable.class).keySet());
}
 
@Test
public void testAvroWithAllColumnsSpecified() throws Exception {
  SqoopOptions opts = new SqoopOptions();
  opts.setExportDir("myexportdir");
  String[] columns = { "Age", "Name", "Gender" };
  opts.setColumns(columns);
  JdbcExportJob jdbcExportJob = stubJdbcExportJob(opts, FileType.AVRO_DATA_FILE);
  Job job = new Job();
  jdbcExportJob.configureInputFormat(job, null, null, null);
  assertEquals(asSetOfText("Age", "Name", "Gender"), DefaultStringifier.load(job.getConfiguration(), AvroExportMapper.AVRO_COLUMN_TYPES_MAP, MapWritable.class).keySet());
}
 
@Test
public void testAvroWithOneColumnSpecified() throws Exception {
  SqoopOptions opts = new SqoopOptions();
  opts.setExportDir("myexportdir");
  String[] columns = { "Gender" };
  opts.setColumns(columns);
  JdbcExportJob jdbcExportJob = stubJdbcExportJob(opts, FileType.AVRO_DATA_FILE);
  Job job = new Job();
  jdbcExportJob.configureInputFormat(job, null, null, null);
  assertEquals(asSetOfText("Gender"), DefaultStringifier.load(job.getConfiguration(), AvroExportMapper.AVRO_COLUMN_TYPES_MAP, MapWritable.class).keySet());
}
 
@Test
public void testAvroWithSomeColumnsSpecified() throws Exception {
  SqoopOptions opts = new SqoopOptions();
  opts.setExportDir("myexportdir");
  String[] columns = { "Age", "Name" };
  opts.setColumns(columns);
  JdbcExportJob jdbcExportJob = stubJdbcExportJob(opts, FileType.AVRO_DATA_FILE);
  Job job = new Job();
  jdbcExportJob.configureInputFormat(job, null, null, null);
  assertEquals(asSetOfText("Age", "Name"), DefaultStringifier.load(job.getConfiguration(), AvroExportMapper.AVRO_COLUMN_TYPES_MAP, MapWritable.class).keySet());
}
 
@Test
public void testAvroWithMoreColumnsSpecified() throws Exception {
  SqoopOptions opts = new SqoopOptions();
  opts.setExportDir("myexportdir");
  String[] columns = { "Age", "Name", "Gender", "Address" };
  opts.setColumns(columns);
  JdbcExportJob jdbcExportJob = stubJdbcExportJob(opts, FileType.AVRO_DATA_FILE);
  Job job = new Job();
  jdbcExportJob.configureInputFormat(job, null, null, null);
  assertEquals(asSetOfText("Age", "Name", "Gender"), DefaultStringifier.load(job.getConfiguration(), AvroExportMapper.AVRO_COLUMN_TYPES_MAP, MapWritable.class).keySet());
}
 
@Override
public void checkOutputSpecs(Configuration conf, ContentSource cs)
    throws IOException {
    super.checkOutputSpecs(conf, cs);

    // store mimetypes map into config system
    DefaultStringifier.store(conf, getMimetypesMap(),
        ConfigConstants.CONF_MIMETYPES);
}
 
源代码14 项目: marklogic-contentpump   文件: RDFInputFormat.java
protected LinkedMapWritable getRoleMap(TaskAttemptContext context) throws IOException{
    //Restores the object from the configuration.
    Configuration conf = context.getConfiguration();
    LinkedMapWritable fhmap = null;
    if(conf.get(ConfigConstants.CONF_ROLE_MAP)!=null) {
        fhmap = DefaultStringifier.load(conf, ConfigConstants.CONF_ROLE_MAP, 
            LinkedMapWritable.class);
    }
    return fhmap;
}
 
源代码15 项目: marklogic-contentpump   文件: RDFInputFormat.java
protected String getServerVersion(TaskAttemptContext context) throws IOException{
    //Restores the object from the configuration.
    Configuration conf = context.getConfiguration();
    Text version = DefaultStringifier.load(conf, ConfigConstants.CONF_ML_VERSION, 
        Text.class);
    return version.toString();
}
 
源代码16 项目: marklogic-contentpump   文件: NodeOutputFormat.java
@Override
public void checkOutputSpecs(Configuration conf, ContentSource cs) 
throws IOException {
    // warn against unsupported configuration
    if (conf.get(BATCH_SIZE) != null) {
        LOG.warn("Config entry for " +
                "\"mapreduce.marklogic.output.batchsize\" is not " +
                "supported for " + this.getClass().getName() + 
                " and will be ignored.");
    }     
    // store hosts into config system
    DefaultStringifier.store(conf, queryHosts(cs), OUTPUT_FOREST_HOST);
}
 
protected TextArrayWritable getHosts(Configuration conf) throws IOException {
    String forestHost = conf.get(OUTPUT_FOREST_HOST);
    if (forestHost != null) {
        // Restores the object from the configuration.
        TextArrayWritable hosts = DefaultStringifier.load(conf,
            OUTPUT_FOREST_HOST, TextArrayWritable.class);
        return hosts;
    } else {
        throw new IOException("Forest host map not found");
    }
}
 
@Override
public void checkOutputSpecs(Configuration conf, ContentSource cs) 
throws IOException {
    // warn against unsupported configuration
    if (conf.get(BATCH_SIZE) != null) {
        LOG.warn("Config entry for " +
                "\"mapreduce.marklogic.output.batchsize\" is not " +
                "supported for " + this.getClass().getName() + 
                " and will be ignored.");
    }      
    // store hosts into config system
    DefaultStringifier.store(conf, queryHosts(cs), OUTPUT_FOREST_HOST);
}
 
public SqoopHCatExportHelper(Configuration conf, boolean isOdps)
  throws IOException, InterruptedException {
  this.isOdps = isOdps;

  if (!isOdps) {
    colTypesJava =
        DefaultStringifier.load(conf, SqoopHCatUtilities.HCAT_DB_OUTPUT_COLTYPES_JAVA,
            MapWritable.class);
    colTypesSql =
        DefaultStringifier.load(conf, SqoopHCatUtilities.HCAT_DB_OUTPUT_COLTYPES_SQL,
            MapWritable.class);
  }
  // Instantiate a copy of the user's class to hold and parse the record.

  String recordClassName = conf.get(
    ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
  if (null == recordClassName) {
    throw new IOException("Export table class name ("
      + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
      + ") is not set!");
  }

  bigDecimalFormatString = conf.getBoolean(
    ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
    ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);

  debugHCatExportMapper = conf.getBoolean(
    SqoopHCatUtilities.DEBUG_HCAT_EXPORT_MAPPER_PROP, false);
  try {
    Class<?> cls = Class.forName(recordClassName, true,
      Thread.currentThread().getContextClassLoader());
    sqoopRecord = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
  } catch (ClassNotFoundException cnfe) {
    throw new IOException(cnfe);
  }

  if (null == sqoopRecord) {
    throw new IOException("Could not instantiate object of type "
      + recordClassName);
  }

  String inputJobInfoStr = conf.get(HCatConstants.HCAT_KEY_JOB_INFO);
  jobInfo =
    (InputJobInfo) HCatUtil.deserialize(inputJobInfoStr);
  HCatSchema tableSchema = jobInfo.getTableInfo().getDataColumns();
  HCatSchema partitionSchema =
    jobInfo.getTableInfo().getPartitionColumns();
  hCatFullTableSchema = new HCatSchema(tableSchema.getFields());
  for (HCatFieldSchema hfs : partitionSchema.getFields()) {
    hCatFullTableSchema.append(hfs);
  }
}
 
public static void configureImportOutputFormat(SqoopOptions opts, Job job,
  ConnManager connMgr, String dbTable, Configuration config)
  throws IOException {

  LOG.info("Configuring HCatalog for import job");
  SqoopHCatUtilities.instance().configureHCat(opts, job, connMgr, dbTable,
    job.getConfiguration());
  LOG.info("Validating dynamic partition keys");
  SqoopHCatUtilities.instance().validateFieldAndColumnMappings();
  SqoopHCatUtilities.instance().validateDynamicPartitionKeysMapping();
  job.setOutputFormatClass(getOutputFormatClass());
  IntWritable[] delimChars = new IntWritable[5];
  String hiveReplacement = "";
  LOG.debug("Hive delimiters will be fixed during import");
  DelimiterSet delims = opts.getOutputDelimiters();
  if (!opts.explicitOutputDelims()) {
    delims = DelimiterSet.HIVE_DELIMITERS;
  }
  delimChars = new IntWritable[] {
    new IntWritable(delims.getFieldsTerminatedBy()),
    new IntWritable(delims.getLinesTerminatedBy()),
    new IntWritable(delims.getEnclosedBy()),
    new IntWritable(delims.getEscapedBy()),
    new IntWritable(delims.isEncloseRequired() ? 1 : 0), };
  hiveReplacement = opts.getHiveDelimsReplacement();
  if (hiveReplacement == null) {
    hiveReplacement = "";
  }

  LOG.debug("Setting hive delimiters information");
  DefaultStringifier.storeArray(config, delimChars,
    HIVE_DELIMITERS_TO_REPLACE_PROP);
  config.set(HIVE_DELIMITERS_REPLACEMENT_PROP, hiveReplacement);
  if (opts.doHiveDropDelims() || opts.getHiveDelimsReplacement() != null) {
    LOG.debug("Enabling hive delimter replacement");
    config.set(HIVE_DELIMITERS_REPLACEMENT_ENABLED_PROP, "true");
  } else {
    LOG.debug("Disabling hive delimter replacement");
    config.set(HIVE_DELIMITERS_REPLACEMENT_ENABLED_PROP, "false");
  }
}
 
public SqoopHCatImportHelper(Configuration conf) throws IOException,
  InterruptedException {

  String inputJobInfoStr = conf.get(HCatConstants.HCAT_KEY_JOB_INFO);
  jobInfo = (InputJobInfo) HCatUtil.deserialize(inputJobInfoStr);
  dataColsSchema = jobInfo.getTableInfo().getDataColumns();
  partitionSchema = jobInfo.getTableInfo().getPartitionColumns();
  StringBuilder storerInfoStr = new StringBuilder(1024);
  StorerInfo storerInfo = jobInfo.getTableInfo().getStorerInfo();
  storerInfoStr.append("HCatalog Storer Info : ").append("\n\tHandler = ")
    .append(storerInfo.getStorageHandlerClass())
    .append("\n\tInput format class = ").append(storerInfo.getIfClass())
    .append("\n\tOutput format class = ").append(storerInfo.getOfClass())
    .append("\n\tSerde class = ").append(storerInfo.getSerdeClass());
  Properties storerProperties = storerInfo.getProperties();
  if (!storerProperties.isEmpty()) {
    storerInfoStr.append("\nStorer properties ");
    for (Map.Entry<Object, Object> entry : storerProperties.entrySet()) {
      String key = (String) entry.getKey();
      Object val = entry.getValue();
      storerInfoStr.append("\n\t").append(key).append('=').append(val);
    }
  }
  storerInfoStr.append("\n");
  LOG.info(storerInfoStr);

  hCatFullTableSchema = new HCatSchema(dataColsSchema.getFields());
  for (HCatFieldSchema hfs : partitionSchema.getFields()) {
    hCatFullTableSchema.append(hfs);
  }
  fieldCount = hCatFullTableSchema.size();
  lobLoader = new LargeObjectLoader(conf, new Path(jobInfo.getTableInfo()
    .getTableLocation()));
  bigDecimalFormatString = conf.getBoolean(
    ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
    ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
  debugHCatImportMapper = conf.getBoolean(
    SqoopHCatUtilities.DEBUG_HCAT_IMPORT_MAPPER_PROP, false);
  IntWritable[] delimChars = DefaultStringifier.loadArray(conf,
    SqoopHCatUtilities.HIVE_DELIMITERS_TO_REPLACE_PROP, IntWritable.class);
  hiveDelimiters = new DelimiterSet((char) delimChars[0].get(),
    (char) delimChars[1].get(), (char) delimChars[2].get(),
    (char) delimChars[3].get(), delimChars[4].get() == 1 ? true : false);
  hiveDelimsReplacement = conf
    .get(SqoopHCatUtilities.HIVE_DELIMITERS_REPLACEMENT_PROP);
  if (hiveDelimsReplacement == null) {
    hiveDelimsReplacement = "";
  }
  doHiveDelimsReplacement = Boolean.valueOf(conf
    .get(SqoopHCatUtilities.HIVE_DELIMITERS_REPLACEMENT_ENABLED_PROP));

  IntWritable[] fPos = DefaultStringifier.loadArray(conf,
    SqoopHCatUtilities.HCAT_FIELD_POSITIONS_PROP, IntWritable.class);
  hCatFieldPositions = new int[fPos.length];
  for (int i = 0; i < fPos.length; ++i) {
    hCatFieldPositions[i] = fPos[i].get();
  }

  LOG.debug("Hive delims replacement enabled : " + doHiveDelimsReplacement);
  LOG.debug("Hive Delimiters : " + hiveDelimiters.toString());
  LOG.debug("Hive delimiters replacement : " + hiveDelimsReplacement);
  staticPartitionKeys = conf
    .getStrings(SqoopHCatUtilities.HCAT_STATIC_PARTITION_KEY_PROP);
  String partKeysString = staticPartitionKeys == null ? ""
    : Arrays.toString(staticPartitionKeys);
  LOG.debug("Static partition key used : "  + partKeysString);
}
 
/**
 * initialize mimetype map if not initialized, return the map
 * 
 * @return
 * @throws IOException
 */
private LinkedMapWritable getMimetypesMap() throws IOException {
    if (mimetypeMap != null)
        return mimetypeMap;
    String mtmap = conf.get(ConfigConstants.CONF_MIMETYPES);
    if (mtmap != null) {
        mimetypeMap = DefaultStringifier.load(conf,
            ConfigConstants.CONF_MIMETYPES, LinkedMapWritable.class);
        return mimetypeMap;
    }
    String[] hosts = conf.getStrings(OUTPUT_HOST);
    Session session = null;
    ResultSequence result = null;

    for (int i = 0; i < hosts.length; i++) {
        try {
            String host = hosts[i];
            ContentSource cs = InternalUtilities.getOutputContentSource(conf,
                host);
            session = cs.newSession();
            AdhocQuery query = session.newAdhocQuery(MIMETYPES_QUERY);
            RequestOptions options = new RequestOptions();
            options.setDefaultXQueryVersion("1.0-ml");
            query.setOptions(options);
            result = session.submitRequest(query);
            if (!result.hasNext())
                throw new IOException(
                    "Server-side transform requires MarkLogic 7 or later");
            mimetypeMap = new LinkedMapWritable();
            while (result.hasNext()) {
                String suffs = result.next().asString();
                Text format = new Text(result.next().asString());
                // some extensions are in a space separated string
                for (String s : suffs.split(" ")) {
                    Text suff = new Text(s);
                    mimetypeMap.put(suff, format);
                }
            }
            return mimetypeMap;
        } catch (Exception e) {
            if (e.getCause() instanceof ServerConnectionException) {
                LOG.warn("Unable to connect to " + hosts[i]
                        + " to query destination information");
                continue;
            }
            LOG.error(e.getMessage(), e);
            throw new IOException(e);
        } finally {
            if (result != null) {
                result.close();
            }
            if (session != null) {
                session.close();
            }
        }
    }
    return null;
}
 
public List<InputSplit> getSplits(JobContext job) throws IOException {
    boolean delimSplit = isSplitInput(job.getConfiguration());
    //if delimSplit is true, size of each split is determined by 
    //Math.max(minSize, Math.min(maxSize, blockSize)) in FileInputFormat
    List<InputSplit> splits = super.getSplits(job);
    if (!delimSplit) {
        return splits;
    }

    if (splits.size()>= SPLIT_COUNT_LIMIT) {
        //if #splits > 1 million, there is enough parallelism
        //therefore no point to split
        LOG.warn("Exceeding SPLIT_COUNT_LIMIT, input_split is off:"
            + SPLIT_COUNT_LIMIT);
        DefaultStringifier.store(job.getConfiguration(), false, ConfigConstants.CONF_SPLIT_INPUT);
        return splits;
    }
    // add header info into splits
    List<InputSplit> populatedSplits = new ArrayList<InputSplit>();
    LOG.info(splits.size() + " DelimitedSplits generated");
    Configuration conf = job.getConfiguration();
    char delimiter =0;
    ArrayList<Text> hlist = new ArrayList<Text>();
    for (InputSplit file: splits) {
        FileSplit fsplit = ((FileSplit)file);
        Path path = fsplit.getPath();
        FileSystem fs = path.getFileSystem(conf);
        
        if (fsplit.getStart() == 0) {
        // parse the inSplit, get the header
            FSDataInputStream fileIn = fs.open(path);

            String delimStr = conf.get(ConfigConstants.CONF_DELIMITER,
                ConfigConstants.DEFAULT_DELIMITER);
            if (delimStr.length() == 1) {
                delimiter = delimStr.charAt(0);
            } else {
                LOG.error("Incorrect delimitor: " + delimiter
                    + ". Expects single character.");
            }
            String encoding = conf.get(
                MarkLogicConstants.OUTPUT_CONTENT_ENCODING,
                MarkLogicConstants.DEFAULT_OUTPUT_CONTENT_ENCODING);
            InputStreamReader instream = new InputStreamReader(fileIn, encoding);
            CSVParser parser = new CSVParser(instream, CSVParserFormatter.
            		getFormat(delimiter, DelimitedTextReader.encapsulator,
            				true, true));
            Iterator<CSVRecord> it = parser.iterator();
            
            String[] header = null;
            if (it.hasNext()) {
            	CSVRecord record = (CSVRecord)it.next();
            	Iterator<String> recordIterator = record.iterator();
                int recordSize = record.size();
                header = new String[recordSize];
                for (int i = 0; i < recordSize; i++) {
                	if (recordIterator.hasNext()) {
                		header[i] = (String)recordIterator.next();
                	} else {
                		throw new IOException("Record size doesn't match the real size");
                	}
                }
                
                EncodingUtil.handleBOMUTF8(header, 0);
                
                hlist.clear();
                for (String s : header) {
                    hlist.add(new Text(s));
                }
            }
            instream.close();
        }
        
        DelimitedSplit ds = new DelimitedSplit(new TextArrayWritable(
            hlist.toArray(new Text[hlist.size()])), path,
            fsplit.getStart(), fsplit.getLength(),
            fsplit.getLocations());
        populatedSplits.add(ds);
    }
    
    return populatedSplits;
}
 
源代码24 项目: RDFS   文件: Chain.java
/**
 * Sets the Reducer class to the chain job's JobConf.
 * <p/>
 * The configuration properties of the chain job have precedence over the
 * configuration properties of the Reducer.
 *
 * @param jobConf              chain job's JobConf to add the Reducer class.
 * @param klass            the Reducer class to add.
 * @param inputKeyClass    reducer input key class.
 * @param inputValueClass  reducer input value class.
 * @param outputKeyClass   reducer output key class.
 * @param outputValueClass reducer output value class.
 * @param byValue          indicates if key/values should be passed by value
 * to the next Mapper in the chain, if any.
 * @param reducerConf      a JobConf with the configuration for the Reducer
 * class. It is recommended to use a JobConf without default values using the
 * <code>JobConf(boolean loadDefaults)</code> constructor with FALSE.
 */
public static <K1, V1, K2, V2> void setReducer(JobConf jobConf,
                        Class<? extends Reducer<K1, V1, K2, V2>> klass,
                        Class<? extends K1> inputKeyClass,
                        Class<? extends V1> inputValueClass,
                        Class<? extends K2> outputKeyClass,
                        Class<? extends V2> outputValueClass,
                        boolean byValue, JobConf reducerConf) {
  String prefix = getPrefix(false);

  if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null) != null) {
    throw new IllegalStateException("Reducer has been already set");
  }

  jobConf.setClass(prefix + CHAIN_REDUCER_CLASS, klass, Reducer.class);

  // if the Reducer does not have a private JobConf create an empty one
  if (reducerConf == null) {
    // using a JobConf without defaults to make it lightweight.
    // still the chain JobConf may have all defaults and this conf is
    // overlapped to the chain JobConf one.
    reducerConf = new JobConf(false);
  }

  // store in the private reducer conf the input/output classes of the reducer
  // and if it works by value or by reference
  reducerConf.setBoolean(MAPPER_BY_VALUE, byValue);
  reducerConf.setClass(REDUCER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
  reducerConf.setClass(REDUCER_INPUT_VALUE_CLASS, inputValueClass,
                       Object.class);
  reducerConf.setClass(REDUCER_OUTPUT_KEY_CLASS, outputKeyClass,
                       Object.class);
  reducerConf.setClass(REDUCER_OUTPUT_VALUE_CLASS, outputValueClass,
                       Object.class);

  // serialize the private mapper jobconf in the chain jobconf.
  Stringifier<JobConf> stringifier =
    new DefaultStringifier<JobConf>(jobConf, JobConf.class);
  try {
    jobConf.set(prefix + CHAIN_REDUCER_CONFIG,
                stringifier.toString(new JobConf(reducerConf)));
  }
  catch (IOException ioEx) {
    throw new RuntimeException(ioEx);
  }
}
 
源代码25 项目: hadoop-gpu   文件: Chain.java
/**
 * Sets the Reducer class to the chain job's JobConf.
 * <p/>
 * The configuration properties of the chain job have precedence over the
 * configuration properties of the Reducer.
 *
 * @param jobConf              chain job's JobConf to add the Reducer class.
 * @param klass            the Reducer class to add.
 * @param inputKeyClass    reducer input key class.
 * @param inputValueClass  reducer input value class.
 * @param outputKeyClass   reducer output key class.
 * @param outputValueClass reducer output value class.
 * @param byValue          indicates if key/values should be passed by value
 * to the next Mapper in the chain, if any.
 * @param reducerConf      a JobConf with the configuration for the Reducer
 * class. It is recommended to use a JobConf without default values using the
 * <code>JobConf(boolean loadDefaults)</code> constructor with FALSE.
 */
public static <K1, V1, K2, V2> void setReducer(JobConf jobConf,
                        Class<? extends Reducer<K1, V1, K2, V2>> klass,
                        Class<? extends K1> inputKeyClass,
                        Class<? extends V1> inputValueClass,
                        Class<? extends K2> outputKeyClass,
                        Class<? extends V2> outputValueClass,
                        boolean byValue, JobConf reducerConf) {
  String prefix = getPrefix(false);

  if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null) != null) {
    throw new IllegalStateException("Reducer has been already set");
  }

  jobConf.setClass(prefix + CHAIN_REDUCER_CLASS, klass, Reducer.class);

  // if the Reducer does not have a private JobConf create an empty one
  if (reducerConf == null) {
    // using a JobConf without defaults to make it lightweight.
    // still the chain JobConf may have all defaults and this conf is
    // overlapped to the chain JobConf one.
    reducerConf = new JobConf(false);
  }

  // store in the private reducer conf the input/output classes of the reducer
  // and if it works by value or by reference
  reducerConf.setBoolean(MAPPER_BY_VALUE, byValue);
  reducerConf.setClass(REDUCER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
  reducerConf.setClass(REDUCER_INPUT_VALUE_CLASS, inputValueClass,
                       Object.class);
  reducerConf.setClass(REDUCER_OUTPUT_KEY_CLASS, outputKeyClass,
                       Object.class);
  reducerConf.setClass(REDUCER_OUTPUT_VALUE_CLASS, outputValueClass,
                       Object.class);

  // serialize the private mapper jobconf in the chain jobconf.
  Stringifier<JobConf> stringifier =
    new DefaultStringifier<JobConf>(jobConf, JobConf.class);
  try {
    jobConf.set(prefix + CHAIN_REDUCER_CONFIG,
                stringifier.toString(new JobConf(reducerConf)));
  }
  catch (IOException ioEx) {
    throw new RuntimeException(ioEx);
  }
}