下面列出了怎么用org.apache.hadoop.io.ArrayPrimitiveWritable的API类实例代码及写法,或者点击链接到github查看源代码。
private void outputDict(TblColRef col, Dictionary<String> dict,
List<Tuple2<String, Tuple3<Writable, Writable, String>>> result)
throws IOException {
// output written to baseDir/colName/colName.rldict-r-00000 (etc)
String dictFileName = col.getIdentity() + "/" + col.getName() + DICT_FILE_POSTFIX;
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream outputStream = new DataOutputStream(baos)) {
outputStream.writeUTF(dict.getClass().getName());
dict.write(outputStream);
result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(BatchConstants.CFG_OUTPUT_DICT,
new Tuple3<Writable, Writable, String>(NullWritable.get(),
new ArrayPrimitiveWritable(baos.toByteArray()), dictFileName)));
}
}
@Override
public ArrayPrimitiveWritable arrayAllReduce(ArrayPrimitiveWritable byteSerialArray, int rank) throws Mp4jException {
try {
byte[] obj = (byte[])byteSerialArray.get();
allReduceByteArray.write(obj);
barrier.await();
if (rank == 0) {
allReduceReturnArray = new ArrayPrimitiveWritable(allReduceByteArray.toByteArray());
allReduceByteArray.reset();
}
barrier.await();
return allReduceReturnArray;
} catch (Exception e) {
throw new Mp4jException("array allreduce exception!");
}
}
private void testNormalDim() throws IOException {
setConfigurations();
setMultipleOutputs(BatchConstants.CFG_OUTPUT_COLUMN, reduceDriver.getConfiguration(),
SequenceFileOutputFormat.class, NullWritable.class, Text.class);
setMultipleOutputs(BatchConstants.CFG_OUTPUT_DICT, reduceDriver.getConfiguration(),
SequenceFileOutputFormat.class, NullWritable.class, ArrayPrimitiveWritable.class);
setMultipleOutputs(BatchConstants.CFG_OUTPUT_PARTITION, reduceDriver.getConfiguration(), TextOutputFormat.class,
NullWritable.class, LongWritable.class);
int nDimReducers = cubeDesc.getRowkey().getRowKeyColumns().length;
setContextTaskId(nDimReducers - 1);
ByteBuffer tmpBuf = ByteBuffer.allocate(4096);
String val = "100";
tmpBuf.put(Bytes.toBytes(val));
Text outputKey1 = new Text();
outputKey1.set(tmpBuf.array(), 0, tmpBuf.position());
SelfDefineSortableKey key1 = new SelfDefineSortableKey();
key1.init(outputKey1, (byte) 0);
reduceDriver.setInput(key1, ImmutableList.of(new Text()));
List<Pair<NullWritable, Text>> result = reduceDriver.run();
assertEquals(0, result.size());
}
private void outputDict(TblColRef col, Dictionary<String> dict,
List<Tuple2<String, Tuple3<Writable, Writable, String>>> result)
throws IOException {
// output written to baseDir/colName/colName.rldict-r-00000 (etc)
String dictFileName = col.getIdentity() + "/" + col.getName() + DICT_FILE_POSTFIX;
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream outputStream = new DataOutputStream(baos)) {
outputStream.writeUTF(dict.getClass().getName());
dict.write(outputStream);
result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(BatchConstants.CFG_OUTPUT_DICT,
new Tuple3<Writable, Writable, String>(NullWritable.get(),
new ArrayPrimitiveWritable(baos.toByteArray()), dictFileName)));
}
}
private void setupReducer(Path output, CubeSegment cubeSeg)
throws IOException {
FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(cubeSeg.getCubeInstance());
int numberOfReducers = reducerMapping.getTotalReducerNum();
logger.info("{} has reducers {}.", this.getClass().getName(), numberOfReducers);
if (numberOfReducers > 250) {
throw new IllegalArgumentException(
"The max reducer number for FactDistinctColumnsJob is 250, but now it is "
+ numberOfReducers
+ ", decrease 'kylin.engine.mr.uhc-reducer-count'");
}
job.setReducerClass(FactDistinctColumnsReducer.class);
job.setPartitionerClass(FactDistinctColumnPartitioner.class);
job.setNumReduceTasks(numberOfReducers);
// make each reducer output to respective dir
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, ArrayPrimitiveWritable.class);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class, LongWritable.class, BytesWritable.class);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class, NullWritable.class, LongWritable.class);
FileOutputFormat.setOutputPath(job, output);
job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
// prevent to create zero-sized default output
LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
deletePath(job.getConfiguration(), output);
}
private void setupReducer(Path output, int numberOfReducers) throws IOException {
job.setReducerClass(UHCDictionaryReducer.class);
job.setPartitionerClass(UHCDictionaryPartitioner.class);
job.setNumReduceTasks(numberOfReducers);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, ArrayPrimitiveWritable.class);
FileOutputFormat.setOutputPath(job, output);
job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
//prevent to create zero-sized default output
LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
deletePath(job.getConfiguration(), output);
}
private void outputDict(TblColRef col, Dictionary<String> dict) throws IOException, InterruptedException {
// output written to baseDir/colName/colName.rldict-r-00000 (etc)
String dictFileName = col.getIdentity() + "/" + col.getName() + DICT_FILE_POSTFIX;
try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream outputStream = new DataOutputStream(baos);) {
outputStream.writeUTF(dict.getClass().getName());
dict.write(outputStream);
mos.write(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(), new ArrayPrimitiveWritable(baos.toByteArray()), dictFileName);
}
mos.close();
}
private void outputDict(TblColRef col, Dictionary<String> dict) throws IOException, InterruptedException {
// output written to baseDir/colName/colName.rldict-r-00000 (etc)
String dictFileName = col.getIdentity() + "/" + col.getName() + DICT_FILE_POSTFIX;
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream outputStream = new DataOutputStream(baos);) {
outputStream.writeUTF(dict.getClass().getName());
dict.write(outputStream);
mos.write(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(),
new ArrayPrimitiveWritable(baos.toByteArray()), dictFileName);
}
}
@Override
public Tuple2<String, Tuple3<Writable, Writable, String>> call(Tuple2<Integer, List<String>> columnValues) throws Exception {
if (initialized == false) {
synchronized (SparkFactDistinct.class) {
if (initialized == false) {
init();
}
}
}
try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig.setAndUnsetThreadLocalConfig(config);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream outputStream = new DataOutputStream(baos)) {
TblColRef col = uhcColumns.get(columnValues._1);
logger.info("Processing column " + col.getName());
if (cube.getDescriptor().getShardByColumns().contains(col)) {
//for ShardByColumns
builder = DictionaryGenerator.newDictionaryBuilder(col.getType());
builder.init(null, 0, null);
} else {
//for GlobalDictionaryColumns
DictionaryInfo dictionaryInfo = new DictionaryInfo(col.getColumnDesc(), col.getDatatype());
String builderClass = cubeDesc.getDictionaryBuilderClass(col);
builder = (IDictionaryBuilder) ClassUtil.newInstance(builderClass);
builder.init(dictionaryInfo, 0, hdfsDir);
}
Iterator<String> values = columnValues._2.iterator();
while (values.hasNext()) {
builder.addValue(values.next());
}
Dictionary<String> dict = builder.build();
String dictFileName = col.getIdentity() + "/" + col.getName() + DICT_FILE_POSTFIX;
logger.info("Dictionary file name is " + dictFileName);
outputStream.writeUTF(dict.getClass().getName());
dict.write(outputStream);
Tuple3 tuple3 = new Tuple3(NullWritable.get(), new ArrayPrimitiveWritable(baos.toByteArray()), dictFileName);
return new Tuple2<>(BatchConstants.CFG_OUTPUT_DICT, tuple3);
}
}
private void setupReducer(Path output, CubeSegment cubeSeg)
throws IOException {
FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(cubeSeg.getCubeInstance());
int numberOfReducers = reducerMapping.getTotalReducerNum();
logger.info("{} has reducers {}.", this.getClass().getName(), numberOfReducers);
if (numberOfReducers > 250) {
throw new IllegalArgumentException(
"The max reducer number for FactDistinctColumnsJob is 250, but now it is "
+ numberOfReducers
+ ", decrease 'kylin.engine.mr.uhc-reducer-count'");
}
job.setReducerClass(FactDistinctColumnsReducer.class);
job.setPartitionerClass(FactDistinctColumnPartitioner.class);
job.setNumReduceTasks(numberOfReducers);
// make each reducer output to respective dir
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, ArrayPrimitiveWritable.class);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class, LongWritable.class, BytesWritable.class);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class, NullWritable.class, LongWritable.class);
FileOutputFormat.setOutputPath(job, output);
job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
// prevent to create zero-sized default output
LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
deletePath(job.getConfiguration(), output);
}
private void setupReducer(Path output, int numberOfReducers) throws IOException {
job.setReducerClass(UHCDictionaryReducer.class);
job.setPartitionerClass(UHCDictionaryPartitioner.class);
job.setNumReduceTasks(numberOfReducers);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, ArrayPrimitiveWritable.class);
FileOutputFormat.setOutputPath(job, output);
job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
//prevent to create zero-sized default output
LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
deletePath(job.getConfiguration(), output);
}
private void outputDict(TblColRef col, Dictionary<String> dict) throws IOException, InterruptedException {
// output written to baseDir/colName/colName.rldict-r-00000 (etc)
String dictFileName = col.getIdentity() + "/" + col.getName() + DICT_FILE_POSTFIX;
try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream outputStream = new DataOutputStream(baos);) {
outputStream.writeUTF(dict.getClass().getName());
dict.write(outputStream);
mos.write(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(), new ArrayPrimitiveWritable(baos.toByteArray()), dictFileName);
}
mos.close();
}
private void outputDict(TblColRef col, Dictionary<String> dict, Visitor visitor) throws IOException {
// output written to baseDir/colName/colName.rldict-r-00000 (etc)
String dictFileName = col.getIdentity() + "/" + col.getName() + DICT_FILE_POSTFIX;
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream outputStream = new DataOutputStream(baos)) {
outputStream.writeUTF(dict.getClass().getName());
dict.write(outputStream);
visitor.collect(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(),
new ArrayPrimitiveWritable(baos.toByteArray()), dictFileName);
}
}
private void outputDict(TblColRef col, Dictionary<String> dict) throws IOException, InterruptedException {
// output written to baseDir/colName/colName.rldict-r-00000 (etc)
String dictFileName = col.getIdentity() + "/" + col.getName() + DICT_FILE_POSTFIX;
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream outputStream = new DataOutputStream(baos);) {
outputStream.writeUTF(dict.getClass().getName());
dict.write(outputStream);
mos.write(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(),
new ArrayPrimitiveWritable(baos.toByteArray()), dictFileName);
}
}
@Override
public Tuple2<String, Tuple3<Writable, Writable, String>> call(Tuple2<Integer, List<String>> columnValues) throws Exception {
if (initialized == false) {
synchronized (SparkFactDistinct.class) {
if (initialized == false) {
init();
}
}
}
try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig.setAndUnsetThreadLocalConfig(config);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream outputStream = new DataOutputStream(baos)) {
TblColRef col = uhcColumns.get(columnValues._1);
logger.info("Processing column " + col.getName());
if (cube.getDescriptor().getShardByColumns().contains(col)) {
//for ShardByColumns
builder = DictionaryGenerator.newDictionaryBuilder(col.getType());
builder.init(null, 0, null);
} else {
//for GlobalDictionaryColumns
DictionaryInfo dictionaryInfo = new DictionaryInfo(col.getColumnDesc(), col.getDatatype());
String builderClass = cubeDesc.getDictionaryBuilderClass(col);
builder = (IDictionaryBuilder) ClassUtil.newInstance(builderClass);
builder.init(dictionaryInfo, 0, hdfsDir);
}
Iterator<String> values = columnValues._2.iterator();
while (values.hasNext()) {
builder.addValue(values.next());
}
Dictionary<String> dict = builder.build();
String dictFileName = col.getIdentity() + "/" + col.getName() + DICT_FILE_POSTFIX;
logger.info("Dictionary file name is " + dictFileName);
outputStream.writeUTF(dict.getClass().getName());
dict.write(outputStream);
Tuple3 tuple3 = new Tuple3(NullWritable.get(), new ArrayPrimitiveWritable(baos.toByteArray()), dictFileName);
return new Tuple2<>(BatchConstants.CFG_OUTPUT_DICT, tuple3);
}
}
@Override
protected void execute(OptionsHelper optionsHelper) throws Exception {
String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
String counterPath = optionsHelper.getOptionValue(OPTION_COUNTER_PATH);
Class[] kryoClassArray = new Class[]{Class.forName("scala.reflect.ClassTag$$anon$1"),
Class.forName("org.apache.kylin.engine.mr.steps.SelfDefineSortableKey")};
SparkConf conf = new SparkConf().setAppName("Build uhc dictionary with spark for:" + cubeName + " segment " + segmentId);
//serialization conf
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
KylinSparkJobListener jobListener = new KylinSparkJobListener();
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
sc.sc().addSparkListener(jobListener);
HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
Configuration hadoopConf = sc.hadoopConfiguration();
hadoopConf.set("mapreduce.input.pathFilter.class", "org.apache.kylin.engine.mr.steps.filter.UHCDictPathFilter");
final SerializableConfiguration sConf = new SerializableConfiguration(hadoopConf);
KylinConfig config = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
CubeManager cubeMgr = CubeManager.getInstance(config);
CubeInstance cube = cubeMgr.getCube(cubeName);
final Job job = Job.getInstance(sConf.get());
// calculate source record bytes size
final LongAccumulator bytesWritten = sc.sc().longAccumulator();
String hdfsDir = sc.hadoopConfiguration().get(BatchConstants.CFG_GLOBAL_DICT_BASE_DIR);
List<TblColRef> uhcColumns = cube.getDescriptor().getAllUHCColumns();
int reducerCount = uhcColumns.size();
if (reducerCount == 0) {
return;
}
logger.info("RDD Output path: {}", outputPath);
logger.info("getTotalReducerNum: {}", reducerCount);
logger.info("counter path {}", counterPath);
JavaPairRDD<String, String> wholeSequenceFileNames = null;
for (TblColRef tblColRef : uhcColumns) {
String columnPath = inputPath + "/" + tblColRef.getIdentity();
if (!HadoopUtil.getFileSystem(columnPath).exists(new Path(columnPath))) {
continue;
}
if (wholeSequenceFileNames == null) {
wholeSequenceFileNames = sc.wholeTextFiles(columnPath);
} else {
wholeSequenceFileNames = wholeSequenceFileNames.union(sc.wholeTextFiles(columnPath));
}
}
if (wholeSequenceFileNames == null) {
logger.error("There're no sequence files at " + inputPath + " !");
return;
}
JavaPairRDD<String, Tuple3<Writable, Writable, String>> pairRDD = wholeSequenceFileNames.map(tuple -> tuple._1)
.mapToPair(new InputPathAndFilterAddFunction2(config, uhcColumns))
.filter(tuple -> tuple._1 != -1)
.reduceByKey((list1, list2) -> combineAllColumnDistinctValues(list1, list2))
.mapToPair(new ProcessUHCColumnValues(cubeName, config, hdfsDir, uhcColumns));
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class,
NullWritable.class, ArrayPrimitiveWritable.class);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, outputPath);
//prevent to create zero-sized default output
LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
MultipleOutputsRDD multipleOutputsRDD = MultipleOutputsRDD.rddToMultipleOutputsRDD(pairRDD);
multipleOutputsRDD.saveAsNewAPIHadoopDatasetWithMultipleOutputs(job.getConfiguration());
logger.info("Map input records={}", reducerCount);
logger.info("HDFS Read: {} HDFS Write", bytesWritten.value());
Map<String, String> counterMap = Maps.newHashMap();
counterMap.put(ExecutableConstants.SOURCE_RECORDS_COUNT, String.valueOf(reducerCount));
counterMap.put(ExecutableConstants.SOURCE_RECORDS_SIZE, String.valueOf(bytesWritten.value()));
// save counter to hdfs
HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, counterMap);
HadoopUtil.deleteHDFSMeta(metaUrl);
}
}
@Override
protected void execute(OptionsHelper optionsHelper) throws Exception {
String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
String counterPath = optionsHelper.getOptionValue(OPTION_COUNTER_PATH);
int samplingPercent = Integer.parseInt(optionsHelper.getOptionValue(OPTION_STATS_SAMPLING_PERCENT));
String enableObjectReuseOptValue = optionsHelper.getOptionValue(OPTION_ENABLE_OBJECT_REUSE);
Job job = Job.getInstance();
FileSystem fs = HadoopUtil.getWorkingFileSystem(job.getConfiguration());
HadoopUtil.deletePath(job.getConfiguration(), new Path(outputPath));
final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration());
KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
final FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(cubeInstance);
final int totalReducer = reducerMapping.getTotalReducerNum();
logger.info("getTotalReducerNum: {}", totalReducer);
logger.info("getCuboidRowCounterReducerNum: {}", reducerMapping.getCuboidRowCounterReducerNum());
logger.info("counter path {}", counterPath);
boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
// calculate source record bytes size
final String bytesWrittenName = "byte-writer-counter";
final String recordCounterName = "record-counter";
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
if (!StringUtil.isEmpty(enableObjectReuseOptValue) &&
enableObjectReuseOptValue.equalsIgnoreCase("true")) {
env.getConfig().enableObjectReuse();
}
DataSet<String[]> recordDataSet = FlinkUtil.readHiveRecords(isSequenceFile, env, inputPath, hiveTable, job);
// read record from flat table
// output:
// 1, statistic
// 2, field value of dict col
// 3, min/max field value of not dict col
DataSet<Tuple2<SelfDefineSortableKey, Text>> flatOutputDataSet = recordDataSet.mapPartition(
new FlatOutputMapPartitionFunction(sConf, cubeName, segmentId, metaUrl, samplingPercent,
bytesWrittenName, recordCounterName));
// repartition data, make each reducer handle only one col data or the statistic data
DataSet<Tuple2<SelfDefineSortableKey, Text>> partitionDataSet = flatOutputDataSet
.partitionCustom(new FactDistinctColumnPartitioner(cubeName, metaUrl, sConf), 0)
.setParallelism(totalReducer);
// multiple output result
// 1, CFG_OUTPUT_COLUMN: field values of dict col, which will not be built in reducer, like globalDictCol
// 2, CFG_OUTPUT_DICT: dictionary object built in reducer
// 3, CFG_OUTPUT_STATISTICS: cube statistic: hll of cuboids ...
// 4, CFG_OUTPUT_PARTITION: dimension value range(min,max)
DataSet<Tuple2<String, Tuple3<Writable, Writable, String>>> outputDataSet = partitionDataSet
.mapPartition(new MultiOutputMapPartitionFunction(sConf, cubeName, segmentId, metaUrl, samplingPercent))
.setParallelism(totalReducer);
// make each reducer output to respective dir
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class,
NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class,
NullWritable.class, ArrayPrimitiveWritable.class);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class,
LongWritable.class, BytesWritable.class);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class,
NullWritable.class, LongWritable.class);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
FileOutputFormat.setCompressOutput(job, false);
// prevent to create zero-sized default output
LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
outputDataSet.output(new HadoopMultipleOutputFormat(new LazyOutputFormat(), job));
JobExecutionResult jobExecutionResult =
env.execute("Fact distinct columns for:" + cubeName + " segment " + segmentId);
Map<String, Object> accumulatorResults = jobExecutionResult.getAllAccumulatorResults();
Long recordCount = (Long) accumulatorResults.get(recordCounterName);
Long bytesWritten = (Long) accumulatorResults.get(bytesWrittenName);
logger.info("Map input records={}", recordCount);
logger.info("HDFS Read: {} HDFS Write", bytesWritten);
logger.info("HDFS: Number of bytes written=" + FlinkBatchCubingJobBuilder2.getFileSize(outputPath, fs));
Map<String, String> counterMap = Maps.newHashMap();
counterMap.put(ExecutableConstants.SOURCE_RECORDS_COUNT, String.valueOf(recordCount));
counterMap.put(ExecutableConstants.SOURCE_RECORDS_SIZE, String.valueOf(bytesWritten));
// save counter to hdfs
HadoopUtil.writeToSequenceFile(job.getConfiguration(), counterPath, counterMap);
}
@Override
protected void execute(OptionsHelper optionsHelper) throws Exception {
String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
String counterPath = optionsHelper.getOptionValue(OPTION_COUNTER_PATH);
Class[] kryoClassArray = new Class[]{Class.forName("scala.reflect.ClassTag$$anon$1"),
Class.forName("org.apache.kylin.engine.mr.steps.SelfDefineSortableKey")};
SparkConf conf = new SparkConf().setAppName("Build uhc dictionary with spark for:" + cubeName + " segment " + segmentId);
//serialization conf
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
KylinSparkJobListener jobListener = new KylinSparkJobListener();
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
sc.sc().addSparkListener(jobListener);
HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
Configuration hadoopConf = sc.hadoopConfiguration();
hadoopConf.set("mapreduce.input.pathFilter.class", "org.apache.kylin.engine.mr.steps.filter.UHCDictPathFilter");
final SerializableConfiguration sConf = new SerializableConfiguration(hadoopConf);
KylinConfig config = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
CubeManager cubeMgr = CubeManager.getInstance(config);
CubeInstance cube = cubeMgr.getCube(cubeName);
final Job job = Job.getInstance(sConf.get());
// calculate source record bytes size
final LongAccumulator bytesWritten = sc.sc().longAccumulator();
String hdfsDir = sc.hadoopConfiguration().get(BatchConstants.CFG_GLOBAL_DICT_BASE_DIR);
List<TblColRef> uhcColumns = cube.getDescriptor().getAllUHCColumns();
int reducerCount = uhcColumns.size();
if (reducerCount == 0) {
return;
}
logger.info("RDD Output path: {}", outputPath);
logger.info("getTotalReducerNum: {}", reducerCount);
logger.info("counter path {}", counterPath);
JavaPairRDD<String, String> wholeSequenceFileNames = null;
for (TblColRef tblColRef : uhcColumns) {
String columnPath = inputPath + "/" + tblColRef.getIdentity();
if (!HadoopUtil.getFileSystem(columnPath).exists(new Path(columnPath))) {
continue;
}
if (wholeSequenceFileNames == null) {
wholeSequenceFileNames = sc.wholeTextFiles(columnPath);
} else {
wholeSequenceFileNames = wholeSequenceFileNames.union(sc.wholeTextFiles(columnPath));
}
}
if (wholeSequenceFileNames == null) {
logger.error("There're no sequence files at " + inputPath + " !");
return;
}
JavaPairRDD<String, Tuple3<Writable, Writable, String>> pairRDD = wholeSequenceFileNames.map(tuple -> tuple._1)
.mapToPair(new InputPathAndFilterAddFunction2(config, uhcColumns))
.filter(tuple -> tuple._1 != -1)
.reduceByKey((list1, list2) -> combineAllColumnDistinctValues(list1, list2))
.mapToPair(new ProcessUHCColumnValues(cubeName, config, hdfsDir, uhcColumns));
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class,
NullWritable.class, ArrayPrimitiveWritable.class);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, outputPath);
//prevent to create zero-sized default output
LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
MultipleOutputsRDD multipleOutputsRDD = MultipleOutputsRDD.rddToMultipleOutputsRDD(pairRDD);
multipleOutputsRDD.saveAsNewAPIHadoopDatasetWithMultipleOutputs(job.getConfiguration());
logger.info("Map input records={}", reducerCount);
logger.info("HDFS Read: {} HDFS Write", bytesWritten.value());
Map<String, String> counterMap = Maps.newHashMap();
counterMap.put(ExecutableConstants.SOURCE_RECORDS_COUNT, String.valueOf(reducerCount));
counterMap.put(ExecutableConstants.SOURCE_RECORDS_SIZE, String.valueOf(bytesWritten.value()));
// save counter to hdfs
HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, counterMap);
HadoopUtil.deleteHDFSMeta(metaUrl);
}
}
@Test(expected = IllegalArgumentException.class)
public void testNoSuchMethod() {
new ValueWritableConverter<>(byte[].class, ArrayPrimitiveWritable.class);
}
public ArrayPrimitiveWritable primitiveArrayAllReduce(ArrayPrimitiveWritable arrayPrimitiveWritable, int rank) throws Mp4jException;
public ArrayPrimitiveWritable arrayAllReduce(ArrayPrimitiveWritable byteSerialArray, int rank) throws Mp4jException;