下面列出了org.apache.hadoop.hbase.mapreduce.TableOutputFormat#org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* creates output format to write data from flink DataSet to accumulo
* @return
* @throws AccumuloSecurityException
*/
public HadoopOutputFormat getHadoopOF() throws AccumuloSecurityException, IOException {
if(job == null){
job = Job.getInstance(new Configuration(), jobName);
}
AccumuloOutputFormat.setConnectorInfo(job, accumuloUser, new PasswordToken(accumuloPassword));
ClientConfiguration clientConfig = new ClientConfiguration();
clientConfig.withInstance(accumuloInstanceName);
clientConfig.withZkHosts(accumuloZookeeper);
AccumuloOutputFormat.setZooKeeperInstance(job, clientConfig);
AccumuloOutputFormat.setDefaultTableName(job, outTable);
AccumuloFileOutputFormat.setOutputPath(job,new Path("/tmp"));
HadoopOutputFormat<Text, Mutation> hadoopOF =
new HadoopOutputFormat<>(new AccumuloOutputFormat() , job);
return hadoopOF;
}
public static void writeAvro(DataSet<Tuple2<Void, Person>> data, String outputPath) throws IOException {
// Set up the Hadoop Input Format
Job job = Job.getInstance();
// Set up Hadoop Output Format
HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat(new AvroParquetOutputFormat(), job);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
AvroParquetOutputFormat.setSchema(job, Person.getClassSchema());
ParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
ParquetOutputFormat.setEnableDictionary(job, true);
// Output & Execute
data.output(hadoopOutputFormat);
}
public static void writeThrift(DataSet<Tuple2<Void, Person>> data, String outputPath) throws IOException {
// Set up the Hadoop Input Format
Job job = Job.getInstance();
// Set up Hadoop Output Format
HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat(new ParquetThriftOutputFormat(), job);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
ParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
ParquetOutputFormat.setEnableDictionary(job, true);
ParquetThriftOutputFormat.setThriftClass(job, Person.class);
// Output & Execute
data.output(hadoopOutputFormat);
}
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: WordCount <input path> <result path>");
return;
}
final String inputPath = args[0];
final String outputPath = args[1];
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Set up the Hadoop Input Format
Job job = Job.getInstance();
HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, job);
TextInputFormat.addInputPath(job, new Path(inputPath));
// Create a Flink job with it
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
// Tokenize the line and convert from Writable "Text" to String for better handling
DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer());
// Sum up the words
DataSet<Tuple2<String, Integer>> result = words.groupBy(0).aggregate(Aggregations.SUM, 1);
// Convert String back to Writable "Text" for use with Hadoop Output Format
DataSet<Tuple2<Text, IntWritable>> hadoopResult = result.map(new HadoopDatatypeMapper());
// Set up Hadoop Output Format
HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(), job);
hadoopOutputFormat.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
hadoopOutputFormat.getConfiguration().set("mapred.textoutputformat.separator", " "); // set the value for both, since this test
TextOutputFormat.setOutputPath(job, new Path(outputPath));
// Output & Execute
hadoopResult.output(hadoopOutputFormat);
env.execute("Word Count");
}
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: WordCount <input path> <result path>");
return;
}
final String inputPath = args[0];
final String outputPath = args[1];
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Set up the Hadoop Input Format
Job job = Job.getInstance();
HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, job);
TextInputFormat.addInputPath(job, new Path(inputPath));
// Create a Flink job with it
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
// Tokenize the line and convert from Writable "Text" to String for better handling
DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer());
// Sum up the words
DataSet<Tuple2<String, Integer>> result = words.groupBy(0).aggregate(Aggregations.SUM, 1);
// Convert String back to Writable "Text" for use with Hadoop Output Format
DataSet<Tuple2<Text, IntWritable>> hadoopResult = result.map(new HadoopDatatypeMapper());
// Set up Hadoop Output Format
HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(), job);
hadoopOutputFormat.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
hadoopOutputFormat.getConfiguration().set("mapred.textoutputformat.separator", " "); // set the value for both, since this test
TextOutputFormat.setOutputPath(job, new Path(outputPath));
// Output & Execute
hadoopResult.output(hadoopOutputFormat);
env.execute("Word Count");
}
public static void writeProtobuf(DataSet<Tuple2<Void, Person>> data, String outputPath) throws IOException {
Job job = Job.getInstance();
// Set up Hadoop Output Format
HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat(new ProtoParquetOutputFormat(), job);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
ProtoParquetOutputFormat.setProtobufClass(job, Person.class);
ParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
ParquetOutputFormat.setEnableDictionary(job, true);
// Output & Execute
data.output(hadoopOutputFormat);
}
private static void createLineitems(ExecutionEnvironment env) throws IOException {
DataSet<Tuple2<Void,LineitemTable>> lineitems = getLineitemDataSet(env).map(new LineitemToParquet());
Job job = Job.getInstance();
HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat(new ParquetThriftOutputFormat(), job);
ParquetThriftOutputFormat.setOutputPath(job, new Path(outputPath + "/lineitems"));
ParquetThriftOutputFormat.setThriftClass(job, LineitemTable.class);
ParquetThriftOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
ParquetThriftOutputFormat.setCompressOutput(job, true);
ParquetThriftOutputFormat.setEnableDictionary(job, true);
lineitems.output(hadoopOutputFormat);
}
private static void createOrders(ExecutionEnvironment env) throws IOException {
DataSet<Tuple2<Void,OrderTable>> orders = getOrdersDataSet(env).map(new OrdersToParquet());
Job job = Job.getInstance();
HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat(new ParquetThriftOutputFormat(), job);
ParquetThriftOutputFormat.setOutputPath(job, new Path(outputPath + "/orders"));
ParquetThriftOutputFormat.setThriftClass(job, OrderTable.class);
ParquetThriftOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
ParquetThriftOutputFormat.setCompressOutput(job, true);
ParquetThriftOutputFormat.setEnableDictionary(job, true);
orders.output(hadoopOutputFormat);
}
private static void createCustomers(ExecutionEnvironment env) throws IOException {
DataSet<Tuple2<Void,CustomerTable>> customers = getCustomerDataSet(env).map(new CustomerToParquet());
Job job = Job.getInstance();
HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat(new ParquetThriftOutputFormat(), job);
ParquetThriftOutputFormat.setOutputPath(job, new Path(outputPath + "/cust"));
ParquetThriftOutputFormat.setThriftClass(job, CustomerTable.class);
ParquetThriftOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
ParquetThriftOutputFormat.setCompressOutput(job, true);
ParquetThriftOutputFormat.setEnableDictionary(job, true);
customers.output(hadoopOutputFormat);
}
private static void createDateDim(ExecutionEnvironment env) throws IOException {
DataSet<Tuple2<Void, DateDimTable>> datedims = getDateDimDataSet(env).map(new DateDimToParquet());
Job job = Job.getInstance();
HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat(new ParquetThriftOutputFormat(), job);
ParquetThriftOutputFormat.setOutputPath(job, new Path(outputPath + "/datedim"));
ParquetThriftOutputFormat.setThriftClass(job, DateDimTable.class);
ParquetThriftOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
ParquetThriftOutputFormat.setCompressOutput(job, true);
ParquetThriftOutputFormat.setEnableDictionary(job, false);
datedims.output(hadoopOutputFormat);
}
private static void createItem(ExecutionEnvironment env) throws IOException {
DataSet<Tuple2<Void, ItemTable>> items = getItemDataSet(env).map(new ItemToParquet());
Job job = Job.getInstance();
HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat(new ParquetThriftOutputFormat(), job);
ParquetThriftOutputFormat.setOutputPath(job, new Path(outputPath + "/item"));
ParquetThriftOutputFormat.setThriftClass(job, ItemTable.class);
ParquetThriftOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
ParquetThriftOutputFormat.setCompressOutput(job, true);
ParquetThriftOutputFormat.setEnableDictionary(job, false);
items.output(hadoopOutputFormat);
}
private static void createStoreSales(ExecutionEnvironment env) throws IOException {
DataSet<Tuple2<Void, StoreSalesTable>> storeSales = getStoreSalesDataSet(env).map(new StoreSalesToParquet());
Job job = Job.getInstance();
HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat(new ParquetThriftOutputFormat(), job);
ParquetThriftOutputFormat.setOutputPath(job, new Path(outputPath + "/storesales"));
ParquetThriftOutputFormat.setThriftClass(job, StoreSalesTable.class);
ParquetThriftOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
ParquetThriftOutputFormat.setCompressOutput(job, true);
ParquetThriftOutputFormat.setEnableDictionary(job, false);
storeSales.output(hadoopOutputFormat);
}
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: WordCount <input path> <result path>");
return;
}
final String inputPath = args[0];
final String outputPath = args[1];
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Set up the Hadoop Input Format
Job job = Job.getInstance();
HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, job);
TextInputFormat.addInputPath(job, new Path(inputPath));
// Create a Flink job with it
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
// Tokenize the line and convert from Writable "Text" to String for better handling
DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer());
// Sum up the words
DataSet<Tuple2<String, Integer>> result = words.groupBy(0).aggregate(Aggregations.SUM, 1);
// Convert String back to Writable "Text" for use with Hadoop Output Format
DataSet<Tuple2<Text, IntWritable>> hadoopResult = result.map(new HadoopDatatypeMapper());
// Set up Hadoop Output Format
HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(), job);
hadoopOutputFormat.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
hadoopOutputFormat.getConfiguration().set("mapred.textoutputformat.separator", " "); // set the value for both, since this test
TextOutputFormat.setOutputPath(job, new Path(outputPath));
// Output & Execute
hadoopResult.output(hadoopOutputFormat);
env.execute("Word Count");
}
public static void main(String[] args) throws Exception {
if (!parseParameters(args)) {
return;
}
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input data
DataSet<String> text = getTextDataSet(env);
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word, 1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// emit result
Job job = Job.getInstance();
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName);
// TODO is "mapred.output.dir" really useful?
job.getConfiguration().set("mapred.output.dir", HBaseFlinkTestConstants.TMP_DIR);
counts.map(new RichMapFunction <Tuple2<String, Integer>, Tuple2<Text, Mutation>>() {
private transient Tuple2<Text, Mutation> reuse;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
reuse = new Tuple2<Text, Mutation>();
}
@Override
public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception {
reuse.f0 = new Text(t.f0);
Put put = new Put(t.f0.getBytes(ConfigConstants.DEFAULT_CHARSET));
put.add(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1));
reuse.f1 = put;
return reuse;
}
}).output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<Text>(), job));
// execute program
env.execute("WordCount (HBase sink) Example");
}
public static void main(String[] args) throws Exception {
if (!parseParameters(args)) {
return;
}
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input data
DataSet<String> text = getTextDataSet(env);
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word, 1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// emit result
Job job = Job.getInstance();
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName);
// TODO is "mapred.output.dir" really useful?
job.getConfiguration().set("mapred.output.dir", HBaseFlinkTestConstants.TMP_DIR);
counts.map(new RichMapFunction <Tuple2<String, Integer>, Tuple2<Text, Mutation>>() {
private transient Tuple2<Text, Mutation> reuse;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
reuse = new Tuple2<Text, Mutation>();
}
@Override
public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception {
reuse.f0 = new Text(t.f0);
Put put = new Put(t.f0.getBytes(ConfigConstants.DEFAULT_CHARSET));
put.add(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1));
reuse.f1 = put;
return reuse;
}
}).output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<Text>(), job));
// execute program
env.execute("WordCount (HBase sink) Example");
}
@Override
protected void execute(OptionsHelper optionsHelper) throws Exception {
final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
final String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
final String segmentIds = optionsHelper.getOptionValue(OPTION_MERGE_SEGMENT_IDS);
final String dictOutputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH_DICT);
final String statOutputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH_STAT);
final String enableObjectReuseOptValue = optionsHelper.getOptionValue(OPTION_ENABLE_OBJECT_REUSE);
boolean enableObjectReuse = false;
if (enableObjectReuseOptValue != null && !enableObjectReuseOptValue.isEmpty()) {
enableObjectReuse = true;
}
final Job job = Job.getInstance();
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
if (enableObjectReuse) {
env.getConfig().enableObjectReuse();
}
HadoopUtil.deletePath(job.getConfiguration(), new Path(dictOutputPath));
final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration());
final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
logger.info("Dictionary output path: {}", dictOutputPath);
logger.info("Statistics output path: {}", statOutputPath);
final TblColRef[] tblColRefs = cubeDesc.getAllColumnsNeedDictionaryBuilt().toArray(new TblColRef[0]);
final int columnLength = tblColRefs.length;
List<Integer> indexs = Lists.newArrayListWithCapacity(columnLength);
for (int i = 0; i <= columnLength; i++) {
indexs.add(i);
}
DataSource<Integer> indexDS = env.fromCollection(indexs);
DataSet<Tuple2<Text, Text>> colToDictPathDS = indexDS.map(new MergeDictAndStatsFunction(cubeName,
metaUrl, segmentId, StringUtil.splitByComma(segmentIds), statOutputPath, tblColRefs, sConf));
FlinkUtil.setHadoopConfForCuboid(job, null, null);
HadoopOutputFormat<Text, Text> hadoopOF =
new HadoopOutputFormat<>(new SequenceFileOutputFormat<>(), job);
SequenceFileOutputFormat.setOutputPath(job, new Path(dictOutputPath));
colToDictPathDS.output(hadoopOF).setParallelism(1);
env.execute("Merge dictionary for cube:" + cubeName + ", segment " + segmentId);
}
@Override
protected void execute(OptionsHelper optionsHelper) throws Exception {
final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
final String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
final String segmentIds = optionsHelper.getOptionValue(OPTION_MERGE_SEGMENT_IDS);
final String dictOutputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH_DICT);
final String statOutputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH_STAT);
final String enableObjectReuseOptValue = optionsHelper.getOptionValue(OPTION_ENABLE_OBJECT_REUSE);
boolean enableObjectReuse = false;
if (enableObjectReuseOptValue != null && !enableObjectReuseOptValue.isEmpty()) {
enableObjectReuse = true;
}
final Job job = Job.getInstance();
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
if (enableObjectReuse) {
env.getConfig().enableObjectReuse();
}
HadoopUtil.deletePath(job.getConfiguration(), new Path(dictOutputPath));
final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration());
final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
logger.info("Dictionary output path: {}", dictOutputPath);
logger.info("Statistics output path: {}", statOutputPath);
final TblColRef[] tblColRefs = cubeDesc.getAllColumnsNeedDictionaryBuilt().toArray(new TblColRef[0]);
final int columnLength = tblColRefs.length;
List<Integer> indexs = Lists.newArrayListWithCapacity(columnLength);
for (int i = 0; i <= columnLength; i++) {
indexs.add(i);
}
DataSource<Integer> indexDS = env.fromCollection(indexs);
DataSet<Tuple2<Text, Text>> colToDictPathDS = indexDS.map(new MergeDictAndStatsFunction(cubeName,
metaUrl, segmentId, StringUtil.splitByComma(segmentIds), statOutputPath, tblColRefs, sConf));
FlinkUtil.setHadoopConfForCuboid(job, null, null);
HadoopOutputFormat<Text, Text> hadoopOF =
new HadoopOutputFormat<>(new SequenceFileOutputFormat<>(), job);
SequenceFileOutputFormat.setOutputPath(job, new Path(dictOutputPath));
colToDictPathDS.output(hadoopOF).setParallelism(1);
env.execute("Merge dictionary for cube:" + cubeName + ", segment " + segmentId);
}
public static void main(String[] args) throws Exception {
if (!parseParameters(args)) {
return;
}
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input data
DataSet<String> text = getTextDataSet(env);
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word, 1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// emit result
Job job = Job.getInstance();
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName);
// TODO is "mapred.output.dir" really useful?
job.getConfiguration().set("mapred.output.dir", HBaseFlinkTestConstants.TMP_DIR);
counts.map(new RichMapFunction <Tuple2<String, Integer>, Tuple2<Text, Mutation>>() {
private transient Tuple2<Text, Mutation> reuse;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
reuse = new Tuple2<Text, Mutation>();
}
@Override
public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception {
reuse.f0 = new Text(t.f0);
Put put = new Put(t.f0.getBytes(ConfigConstants.DEFAULT_CHARSET));
put.add(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1));
reuse.f1 = put;
return reuse;
}
}).output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<Text>(), job));
// execute program
env.execute("WordCount (HBase sink) Example");
}