下面列出了org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#setCompressOutput ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Configure the {@link Job} for enabling compression emulation.
*/
static void configure(final Job job) throws IOException, InterruptedException,
ClassNotFoundException {
// set the random text mapper
job.setMapperClass(RandomTextDataMapper.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(GenDataFormat.class);
job.setJarByClass(GenerateData.class);
// set the output compression true
FileOutputFormat.setCompressOutput(job, true);
try {
FileInputFormat.addInputPath(job, new Path("ignored"));
} catch (IOException e) {
LOG.error("Error while adding input path ", e);
}
}
/**
* Configure the {@link Job} for enabling compression emulation.
*/
static void configure(final Job job) throws IOException, InterruptedException,
ClassNotFoundException {
// set the random text mapper
job.setMapperClass(RandomTextDataMapper.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(GenDataFormat.class);
job.setJarByClass(GenerateData.class);
// set the output compression true
FileOutputFormat.setCompressOutput(job, true);
try {
FileInputFormat.addInputPath(job, new Path("ignored"));
} catch (IOException e) {
LOG.error("Error while adding input path ", e);
}
}
@Override
public void setStoreLocation(String location, Job job) throws IOException {
job.getConfiguration().set(MRConfiguration.TEXTOUTPUTFORMAT_SEPARATOR, "");
FileOutputFormat.setOutputPath(job, new Path(location));
if( "true".equals( job.getConfiguration().get( "output.compression.enabled" ) ) ) {
FileOutputFormat.setCompressOutput( job, true );
String codec = job.getConfiguration().get( "output.compression.codec" );
try {
FileOutputFormat.setOutputCompressorClass( job, (Class<? extends CompressionCodec>) Class.forName( codec ) );
} catch (ClassNotFoundException e) {
throw new RuntimeException("Class not found: " + codec );
}
} else {
// This makes it so that storing to a directory ending with ".gz" or ".bz2" works.
setCompression(new Path(location), job);
}
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
Job job = Job.getInstance(config);
//通过job设置一些参数
job.setJarByClass(ParseLogJob.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
//设置reduce个数为0
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(LogWritable.class);
job.setOutputValueClass(Text.class);
//添加输入和输出数据
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
//设置压缩类型
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
/**
* Job configurator
*
* @param job job instance
* @param jarByClass class of the jar
* @param mapperClass mapper
* @param reducerClass reducer
* @param commaSeparatedInputFiles input paths
* @param outputPath output
* @throws IOException I/O exception
*/
public static void configureJob(Job job, Class<?> jarByClass,
Class<? extends Mapper> mapperClass, Class<? extends Reducer> reducerClass,
String commaSeparatedInputFiles, String outputPath)
throws IOException
{
job.setJarByClass(jarByClass);
job.setJobName(jarByClass.getName());
// mapper
job.setMapperClass(mapperClass);
// reducer
job.setReducerClass(reducerClass);
// input-output is warc
job.setInputFormatClass(WARCInputFormat.class);
// prevent producing empty files
LazyOutputFormat.setOutputFormatClass(job, WARCOutputFormat.class);
// intermediate data
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(WARCWritable.class);
// output data
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(WARCWritable.class);
// set output compression to GZip
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
FileInputFormat.addInputPaths(job, commaSeparatedInputFiles);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
}
@Override
public int run(String[] args)
throws Exception
{
Job job = Job.getInstance(getConf());
//set from the command line
job.setJarByClass(Phase2ExactMatchDeDuplication.class);
job.setJobName(Phase2ExactMatchDeDuplication.class.getName());
// mapper
job.setMapperClass(ExactMatchDetectionMapper.class);
// we will compress the mapper's output (use fast Snappy compressor)
job.getConfiguration().setBoolean(Job.MAP_OUTPUT_COMPRESS, true);
job.getConfiguration()
.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, SnappyCodec.class, CompressionCodec.class);
// reducer
job.setReducerClass(UniqueWarcWriterReducer.class);
// no combiner, as the output classes in mapper and reducer are different!
// input-output is warc
job.setInputFormatClass(WARCInputFormat.class);
job.setOutputFormatClass(WARCOutputFormat.class);
// mapper output data
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(WARCWritable.class);
// set output compression to GZip
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
FileInputFormat.addInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
@Override
public int run(String[] args)
throws Exception
{
Job job = Job.getInstance(getConf());
// set from the command line
job.setJarByClass(Phase1FullJob.class);
job.setJobName(Phase1FullJob.class.getName());
// mapper
job.setMapperClass(MapperClass.class);
// we will compress the mapper's output (use fast Snappy compressor)
job.getConfiguration().setBoolean(Job.MAP_OUTPUT_COMPRESS, true);
job.getConfiguration()
.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, SnappyCodec.class, CompressionCodec.class);
// reducer
job.setReducerClass(SimpleWarcWriterReducer.class);
// input-output is warc
job.setInputFormatClass(WARCInputFormat.class);
job.setOutputFormatClass(WARCOutputFormat.class);
// mapper output data
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(WARCWritable.class);
// set output compression to GZip
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
FileInputFormat.addInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
/**
* {@inheritDoc}
*/
@Override
public int run(String[] args)
throws Exception
{
Job job = Job.getInstance(getConf());
// set from the command line
job.setJarByClass(URIExtractor.class);
job.setJobName(URIExtractor.class.getName());
// mapper
job.setMapperClass(URIExtractorMapper.class);
job.setReducerClass(URIExtractorReducer.class);
// input-output is warc
job.setInputFormatClass(WARCInputFormat.class);
// is necessary, so that Hadoop does not mix the map input format up.
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// set output compression to GZip
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
FileInputFormat.addInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
private Job createJob(String inputPath, String outputPath) throws Exception {
Configuration conf = getConf();
Job job = new Job(conf);
job.setJarByClass(VisualJob.class);
job.setNumReduceTasks(90);
FileSystem fs = FileSystem.get(new URI(outputPath), conf);
if (fs.exists(new Path(outputPath))) {
fs.delete(new Path(outputPath), true);
}
FileInputFormat.setInputPaths(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FloatArrayWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(VisualThreadedMapper.class);
job.setReducerClass(VisualReducer.class);
return job;
}
@Override
protected void configure(Job job, KV<AvroKey<IndexedRecord>, NullWritable> sample) {
super.configure(job, sample);
AvroKey<IndexedRecord> k = sample.getKey();
AvroJob.setOutputKeySchema(job, k.datum().getSchema());
FileOutputFormat.setCompressOutput(job, true);
job.getConfiguration().set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC);
}
/** Test an error is throw when the user wants their output compressed. */
@Test
public void testCheckOutputSpecsCompressedOutput() throws IOException {
// Setup configuration.
FileOutputFormat.setCompressOutput(job, true);
IOException thrown =
assertThrows(
IOException.class, () -> outputFormat.checkOutputSpecs(mockTaskAttemptContext));
assertThat(thrown)
.hasMessageThat()
.contains("Compression isn't supported for this OutputFormat.");
}
private void setCompression(Path path, Job job) {
String location=path.getName();
if (location.endsWith(".bz2") || location.endsWith(".bz")) {
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
} else if (location.endsWith(".gz")) {
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
} else {
FileOutputFormat.setCompressOutput( job, false);
}
}
@Override
public void setStoreLocation(String location, Job job) throws IOException {
job.getConfiguration().set(MRConfiguration.TEXTOUTPUTFORMAT_SEPARATOR, "");
FileOutputFormat.setOutputPath(job, new Path(location));
if (comp == Compression.bz2 || comp == Compression.bz) {
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
} else if (comp == Compression.gz) {
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
}
}
@Override
public int run(String[] args) throws Exception {
Options options = new Options();
try {
options.addOption(OPTION_JOB_NAME);
options.addOption(OPTION_CUBE_NAME);
options.addOption(OPTION_OUTPUT_PATH);
options.addOption(OPTION_SEGMENT_ID);
parseOptions(options, args);
job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
String cubeName = getOptionValue(OPTION_CUBE_NAME);
Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
String segmentId = getOptionValue(OPTION_SEGMENT_ID);
// ----------------------------------------------------------------------------
// add metadata to distributed cache
CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
CubeInstance cube = cubeMgr.getCube(cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentId);
logger.info("Starting: " + job.getJobName());
setJobClasspath(job, cube.getConfig());
KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv());
KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(cube.getRootFactTable());
String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
String topic = kafkaConfig.getTopic();
if (brokers == null || brokers.length() == 0 || topic == null) {
throw new IllegalArgumentException("Invalid Kafka information, brokers " + brokers + ", topic " + topic);
}
JobEngineConfig jobEngineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
job.getConfiguration().addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null)));
KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
job.getConfiguration().addResource(new Path(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf()));
job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers);
job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic);
job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));
job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json");
job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName());
job.getConfiguration().set(CONFIG_KAFKA_SPLIT_ROWS, String.valueOf(kafkaConfig.getSplitRows()));
job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name
appendKafkaOverrideProperties(cube.getConfig(), job.getConfiguration());
setupMapper(cube.getSegmentById(segmentId));
job.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(job, output);
FileOutputFormat.setCompressOutput(job, true);
org.apache.log4j.Logger.getRootLogger().info("Output hdfs location: " + output);
org.apache.log4j.Logger.getRootLogger().info("Output hdfs compression: " + true);
job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
attachCubeMetadata(cube, job.getConfiguration());
deletePath(job.getConfiguration(), output);
return waitForCompletion(job);
} catch (Exception e) {
logger.error("error in KafkaFlatTableJob", e);
printUsage(options);
throw e;
} finally {
if (job != null)
cleanupTempConfFile(job.getConfiguration());
}
}
public static void main(String[] args) throws Exception {
//1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
//2.设置MapReduce作业配置信息
String jobName = "CompressOutput"; //作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(CompressOutput.class); //指定运行时作业类
job.setJar("export\\CompressOutput.jar"); //指定本地jar包
job.setMapperClass(CompressOutputMapper.class); //指定Mapper类
job.setMapOutputKeyClass(Text.class); //设置Mapper输出Key类型
job.setMapOutputValueClass(IntWritable.class); //设置Mapper输出Value类型
job.setReducerClass(CompressOutputReducer.class); //指定Reducer类
job.setOutputKeyClass(Text.class); //设置Reduce输出Key类型
job.setOutputValueClass(IntWritable.class); //设置Reduce输出Value类型
//设置对输出结果进行压缩,指定压缩编码方式
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
//3.设置作业输入和输出路径
String dataDir = "/expr/compress/data"; //实验数据目录
String outputDir = "/expr/compress/output"; //实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
@Override
public int run(String[] args)
throws Exception
{
Job job = Job.getInstance(getConf());
job.setJarByClass(Phase4RemoveDuplicatesUsingReduceSideJoins.class);
job.setJobName(Phase4RemoveDuplicatesUsingReduceSideJoins.class.getName());
// paths
// text files of ids to be deleted
String textFilePath = args[0];
// corpus with *.warc.gz
String commaSeparatedInputFiles = args[1];
// output
String outputPath = args[2];
//second input the look up text file
MultipleInputs.addInputPath(job, new Path(textFilePath), TextInputFormat.class,
JoinTextMapper.class);
//first input the data set (check comma separated availability)
MultipleInputs.addInputPath(job, new Path(commaSeparatedInputFiles), WARCInputFormat.class,
JoinWARCMapper.class);
job.setPartitionerClass(SourceJoiningKeyPartitioner.class);
job.setGroupingComparatorClass(SourceJoiningGroupingComparator.class);
job.setMapOutputKeyClass(CompositeKey.class);
job.setMapOutputValueClass(WARCWritable.class);
job.setReducerClass(JoinReducer.class);
job.setOutputFormatClass(WARCOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(WARCWritable.class);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
return job.waitForCompletion(true) ? 0 : 1;
}
@Override
protected void execute(OptionsHelper optionsHelper) throws Exception {
String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
String counterPath = optionsHelper.getOptionValue(OPTION_COUNTER_PATH);
int samplingPercent = Integer.parseInt(optionsHelper.getOptionValue(OPTION_STATS_SAMPLING_PERCENT));
String enableObjectReuseOptValue = optionsHelper.getOptionValue(OPTION_ENABLE_OBJECT_REUSE);
Job job = Job.getInstance();
FileSystem fs = HadoopUtil.getWorkingFileSystem(job.getConfiguration());
HadoopUtil.deletePath(job.getConfiguration(), new Path(outputPath));
final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration());
KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
final FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(cubeInstance);
final int totalReducer = reducerMapping.getTotalReducerNum();
logger.info("getTotalReducerNum: {}", totalReducer);
logger.info("getCuboidRowCounterReducerNum: {}", reducerMapping.getCuboidRowCounterReducerNum());
logger.info("counter path {}", counterPath);
boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
// calculate source record bytes size
final String bytesWrittenName = "byte-writer-counter";
final String recordCounterName = "record-counter";
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
if (!StringUtil.isEmpty(enableObjectReuseOptValue) &&
enableObjectReuseOptValue.equalsIgnoreCase("true")) {
env.getConfig().enableObjectReuse();
}
DataSet<String[]> recordDataSet = FlinkUtil.readHiveRecords(isSequenceFile, env, inputPath, hiveTable, job);
// read record from flat table
// output:
// 1, statistic
// 2, field value of dict col
// 3, min/max field value of not dict col
DataSet<Tuple2<SelfDefineSortableKey, Text>> flatOutputDataSet = recordDataSet.mapPartition(
new FlatOutputMapPartitionFunction(sConf, cubeName, segmentId, metaUrl, samplingPercent,
bytesWrittenName, recordCounterName));
// repartition data, make each reducer handle only one col data or the statistic data
DataSet<Tuple2<SelfDefineSortableKey, Text>> partitionDataSet = flatOutputDataSet
.partitionCustom(new FactDistinctColumnPartitioner(cubeName, metaUrl, sConf), 0)
.setParallelism(totalReducer);
// multiple output result
// 1, CFG_OUTPUT_COLUMN: field values of dict col, which will not be built in reducer, like globalDictCol
// 2, CFG_OUTPUT_DICT: dictionary object built in reducer
// 3, CFG_OUTPUT_STATISTICS: cube statistic: hll of cuboids ...
// 4, CFG_OUTPUT_PARTITION: dimension value range(min,max)
DataSet<Tuple2<String, Tuple3<Writable, Writable, String>>> outputDataSet = partitionDataSet
.mapPartition(new MultiOutputMapPartitionFunction(sConf, cubeName, segmentId, metaUrl, samplingPercent))
.setParallelism(totalReducer);
// make each reducer output to respective dir
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class,
NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class,
NullWritable.class, ArrayPrimitiveWritable.class);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class,
LongWritable.class, BytesWritable.class);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class,
NullWritable.class, LongWritable.class);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
FileOutputFormat.setCompressOutput(job, false);
// prevent to create zero-sized default output
LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
outputDataSet.output(new HadoopMultipleOutputFormat(new LazyOutputFormat(), job));
JobExecutionResult jobExecutionResult =
env.execute("Fact distinct columns for:" + cubeName + " segment " + segmentId);
Map<String, Object> accumulatorResults = jobExecutionResult.getAllAccumulatorResults();
Long recordCount = (Long) accumulatorResults.get(recordCounterName);
Long bytesWritten = (Long) accumulatorResults.get(bytesWrittenName);
logger.info("Map input records={}", recordCount);
logger.info("HDFS Read: {} HDFS Write", bytesWritten);
logger.info("HDFS: Number of bytes written=" + FlinkBatchCubingJobBuilder2.getFileSize(outputPath, fs));
Map<String, String> counterMap = Maps.newHashMap();
counterMap.put(ExecutableConstants.SOURCE_RECORDS_COUNT, String.valueOf(recordCount));
counterMap.put(ExecutableConstants.SOURCE_RECORDS_SIZE, String.valueOf(bytesWritten));
// save counter to hdfs
HadoopUtil.writeToSequenceFile(job.getConfiguration(), counterPath, counterMap);
}
@Override
public int run(String[] args) throws Exception {
Options options = new Options();
try {
options.addOption(OPTION_JOB_NAME);
options.addOption(OPTION_CUBE_NAME);
options.addOption(OPTION_OUTPUT_PATH);
options.addOption(OPTION_SEGMENT_ID);
parseOptions(options, args);
job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
String cubeName = getOptionValue(OPTION_CUBE_NAME);
Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
String segmentId = getOptionValue(OPTION_SEGMENT_ID);
// ----------------------------------------------------------------------------
// add metadata to distributed cache
CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
CubeInstance cube = cubeMgr.getCube(cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentId);
logger.info("Starting: " + job.getJobName());
setJobClasspath(job, cube.getConfig());
KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv());
KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(cube.getRootFactTable());
String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
String topic = kafkaConfig.getTopic();
if (brokers == null || brokers.length() == 0 || topic == null) {
throw new IllegalArgumentException("Invalid Kafka information, brokers " + brokers + ", topic " + topic);
}
JobEngineConfig jobEngineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
job.getConfiguration().addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null)));
KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
job.getConfiguration().addResource(new Path(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf()));
job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers);
job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic);
job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));
job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json");
job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName());
job.getConfiguration().set(CONFIG_KAFKA_SPLIT_ROWS, String.valueOf(kafkaConfig.getSplitRows()));
job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name
appendKafkaOverrideProperties(cube.getConfig(), job.getConfiguration());
setupMapper(cube.getSegmentById(segmentId));
job.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(job, output);
FileOutputFormat.setCompressOutput(job, true);
org.apache.log4j.Logger.getRootLogger().info("Output hdfs location: " + output);
org.apache.log4j.Logger.getRootLogger().info("Output hdfs compression: " + true);
job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
attachCubeMetadata(cube, job.getConfiguration());
deletePath(job.getConfiguration(), output);
return waitForCompletion(job);
} catch (Exception e) {
logger.error("error in KafkaFlatTableJob", e);
printUsage(options);
throw e;
} finally {
if (job != null)
cleanupTempConfFile(job.getConfiguration());
}
}
/**
* Enable output compression using the deflate codec and
* specify its level.
*/
public static void setDeflateLevel(Job job, int level) {
FileOutputFormat.setCompressOutput(job, true);
job.getConfiguration().setInt(DEFLATE_LEVEL_KEY, level);
}
/**
* Enable output compression using the deflate codec and
* specify its level.
*/
public static void setDeflateLevel(Job job, int level) {
FileOutputFormat.setCompressOutput(job, true);
job.getConfiguration().setInt(DEFLATE_LEVEL_KEY, level);
}