org.apache.hadoop.hbase.mapreduce.TableOutputFormat#org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat源码实例Demo

下面列出了org.apache.hadoop.hbase.mapreduce.TableOutputFormat#org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: OSTMap   文件: FlinkEnvManager.java
/**
 * creates output format to write data from flink DataSet to accumulo
 * @return
 * @throws AccumuloSecurityException
 */
public HadoopOutputFormat getHadoopOF() throws AccumuloSecurityException, IOException {

    if(job == null){
        job = Job.getInstance(new Configuration(), jobName);
    }
    AccumuloOutputFormat.setConnectorInfo(job, accumuloUser, new PasswordToken(accumuloPassword));
    ClientConfiguration clientConfig = new ClientConfiguration();
    clientConfig.withInstance(accumuloInstanceName);
    clientConfig.withZkHosts(accumuloZookeeper);
    AccumuloOutputFormat.setZooKeeperInstance(job, clientConfig);
    AccumuloOutputFormat.setDefaultTableName(job, outTable);
    AccumuloFileOutputFormat.setOutputPath(job,new Path("/tmp"));

    HadoopOutputFormat<Text, Mutation> hadoopOF =
            new HadoopOutputFormat<>(new AccumuloOutputFormat() , job);
    return hadoopOF;
}
 
源代码2 项目: parquet-flinktacular   文件: ParquetAvroExample.java
public static void writeAvro(DataSet<Tuple2<Void, Person>> data, String outputPath) throws IOException {
	// Set up the Hadoop Input Format
	Job job = Job.getInstance();

	// Set up Hadoop Output Format
	HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat(new AvroParquetOutputFormat(), job);

	FileOutputFormat.setOutputPath(job, new Path(outputPath));

	AvroParquetOutputFormat.setSchema(job, Person.getClassSchema());
	ParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
	ParquetOutputFormat.setEnableDictionary(job, true);

	// Output & Execute
	data.output(hadoopOutputFormat);
}
 
public static void writeThrift(DataSet<Tuple2<Void, Person>> data, String outputPath) throws IOException {
	// Set up the Hadoop Input Format
	Job job = Job.getInstance();

	// Set up Hadoop Output Format
	HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat(new ParquetThriftOutputFormat(), job);

	FileOutputFormat.setOutputPath(job, new Path(outputPath));

	ParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
	ParquetOutputFormat.setEnableDictionary(job, true);

	ParquetThriftOutputFormat.setThriftClass(job, Person.class);

	// Output & Execute
	data.output(hadoopOutputFormat);
}
 
源代码4 项目: Flink-CEPplus   文件: WordCount.java
public static void main(String[] args) throws Exception {
	if (args.length < 2) {
		System.err.println("Usage: WordCount <input path> <result path>");
		return;
	}

	final String inputPath = args[0];
	final String outputPath = args[1];

	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	// Set up the Hadoop Input Format
	Job job = Job.getInstance();
	HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, job);
	TextInputFormat.addInputPath(job, new Path(inputPath));

	// Create a Flink job with it
	DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);

	// Tokenize the line and convert from Writable "Text" to String for better handling
	DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer());

	// Sum up the words
	DataSet<Tuple2<String, Integer>> result = words.groupBy(0).aggregate(Aggregations.SUM, 1);

	// Convert String back to Writable "Text" for use with Hadoop Output Format
	DataSet<Tuple2<Text, IntWritable>> hadoopResult = result.map(new HadoopDatatypeMapper());

	// Set up Hadoop Output Format
	HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(), job);
	hadoopOutputFormat.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
	hadoopOutputFormat.getConfiguration().set("mapred.textoutputformat.separator", " "); // set the value for both, since this test
	TextOutputFormat.setOutputPath(job, new Path(outputPath));

	// Output & Execute
	hadoopResult.output(hadoopOutputFormat);
	env.execute("Word Count");
}
 
源代码5 项目: flink   文件: WordCount.java
public static void main(String[] args) throws Exception {
	if (args.length < 2) {
		System.err.println("Usage: WordCount <input path> <result path>");
		return;
	}

	final String inputPath = args[0];
	final String outputPath = args[1];

	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	// Set up the Hadoop Input Format
	Job job = Job.getInstance();
	HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, job);
	TextInputFormat.addInputPath(job, new Path(inputPath));

	// Create a Flink job with it
	DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);

	// Tokenize the line and convert from Writable "Text" to String for better handling
	DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer());

	// Sum up the words
	DataSet<Tuple2<String, Integer>> result = words.groupBy(0).aggregate(Aggregations.SUM, 1);

	// Convert String back to Writable "Text" for use with Hadoop Output Format
	DataSet<Tuple2<Text, IntWritable>> hadoopResult = result.map(new HadoopDatatypeMapper());

	// Set up Hadoop Output Format
	HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(), job);
	hadoopOutputFormat.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
	hadoopOutputFormat.getConfiguration().set("mapred.textoutputformat.separator", " "); // set the value for both, since this test
	TextOutputFormat.setOutputPath(job, new Path(outputPath));

	// Output & Execute
	hadoopResult.output(hadoopOutputFormat);
	env.execute("Word Count");
}
 
public static void writeProtobuf(DataSet<Tuple2<Void, Person>> data, String outputPath) throws IOException {
	Job job = Job.getInstance();

	// Set up Hadoop Output Format
	HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat(new ProtoParquetOutputFormat(), job);

	FileOutputFormat.setOutputPath(job, new Path(outputPath));

	ProtoParquetOutputFormat.setProtobufClass(job, Person.class);
	ParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
	ParquetOutputFormat.setEnableDictionary(job, true);

	// Output & Execute
	data.output(hadoopOutputFormat);
}
 
源代码7 项目: parquet-flinktacular   文件: CSVToParquet.java
private static void createLineitems(ExecutionEnvironment env) throws IOException {
    DataSet<Tuple2<Void,LineitemTable>> lineitems = getLineitemDataSet(env).map(new LineitemToParquet());

    Job job = Job.getInstance();
    
    HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat(new ParquetThriftOutputFormat(), job);

    ParquetThriftOutputFormat.setOutputPath(job, new Path(outputPath + "/lineitems"));
    ParquetThriftOutputFormat.setThriftClass(job, LineitemTable.class);
    ParquetThriftOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
    ParquetThriftOutputFormat.setCompressOutput(job, true);
    ParquetThriftOutputFormat.setEnableDictionary(job, true);

    lineitems.output(hadoopOutputFormat);
}
 
源代码8 项目: parquet-flinktacular   文件: CSVToParquet.java
private static void createOrders(ExecutionEnvironment env) throws IOException {
    DataSet<Tuple2<Void,OrderTable>> orders = getOrdersDataSet(env).map(new OrdersToParquet());

    Job job = Job.getInstance();

    HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat(new ParquetThriftOutputFormat(), job);

    ParquetThriftOutputFormat.setOutputPath(job, new Path(outputPath + "/orders"));
    ParquetThriftOutputFormat.setThriftClass(job, OrderTable.class);
    ParquetThriftOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
    ParquetThriftOutputFormat.setCompressOutput(job, true);
    ParquetThriftOutputFormat.setEnableDictionary(job, true);

    orders.output(hadoopOutputFormat);
}
 
源代码9 项目: parquet-flinktacular   文件: CSVToParquet.java
private static void createCustomers(ExecutionEnvironment env) throws IOException {
    DataSet<Tuple2<Void,CustomerTable>> customers = getCustomerDataSet(env).map(new CustomerToParquet());

    Job job = Job.getInstance();

    HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat(new ParquetThriftOutputFormat(), job);

    ParquetThriftOutputFormat.setOutputPath(job, new Path(outputPath + "/cust"));
    ParquetThriftOutputFormat.setThriftClass(job, CustomerTable.class);
    ParquetThriftOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
    ParquetThriftOutputFormat.setCompressOutput(job, true);
    ParquetThriftOutputFormat.setEnableDictionary(job, true);

    customers.output(hadoopOutputFormat);
}
 
源代码10 项目: parquet-flinktacular   文件: CSVToParquet.java
private static void createDateDim(ExecutionEnvironment env) throws IOException {
	DataSet<Tuple2<Void, DateDimTable>> datedims = getDateDimDataSet(env).map(new DateDimToParquet());

	Job job = Job.getInstance();

	HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat(new ParquetThriftOutputFormat(), job);

	ParquetThriftOutputFormat.setOutputPath(job, new Path(outputPath + "/datedim"));
	ParquetThriftOutputFormat.setThriftClass(job, DateDimTable.class);
	ParquetThriftOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
	ParquetThriftOutputFormat.setCompressOutput(job, true);
	ParquetThriftOutputFormat.setEnableDictionary(job, false);

	datedims.output(hadoopOutputFormat);
}
 
源代码11 项目: parquet-flinktacular   文件: CSVToParquet.java
private static void createItem(ExecutionEnvironment env) throws IOException {
	DataSet<Tuple2<Void, ItemTable>> items = getItemDataSet(env).map(new ItemToParquet());

	Job job = Job.getInstance();

	HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat(new ParquetThriftOutputFormat(), job);

	ParquetThriftOutputFormat.setOutputPath(job, new Path(outputPath + "/item"));
	ParquetThriftOutputFormat.setThriftClass(job, ItemTable.class);
	ParquetThriftOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
	ParquetThriftOutputFormat.setCompressOutput(job, true);
	ParquetThriftOutputFormat.setEnableDictionary(job, false);

	items.output(hadoopOutputFormat);
}
 
源代码12 项目: parquet-flinktacular   文件: CSVToParquet.java
private static void createStoreSales(ExecutionEnvironment env) throws IOException {
	DataSet<Tuple2<Void, StoreSalesTable>> storeSales = getStoreSalesDataSet(env).map(new StoreSalesToParquet());

	Job job = Job.getInstance();

	HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat(new ParquetThriftOutputFormat(), job);

	ParquetThriftOutputFormat.setOutputPath(job, new Path(outputPath + "/storesales"));
	ParquetThriftOutputFormat.setThriftClass(job, StoreSalesTable.class);
	ParquetThriftOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
	ParquetThriftOutputFormat.setCompressOutput(job, true);
	ParquetThriftOutputFormat.setEnableDictionary(job, false);

	storeSales.output(hadoopOutputFormat);
}
 
源代码13 项目: flink   文件: WordCount.java
public static void main(String[] args) throws Exception {
	if (args.length < 2) {
		System.err.println("Usage: WordCount <input path> <result path>");
		return;
	}

	final String inputPath = args[0];
	final String outputPath = args[1];

	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	// Set up the Hadoop Input Format
	Job job = Job.getInstance();
	HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, job);
	TextInputFormat.addInputPath(job, new Path(inputPath));

	// Create a Flink job with it
	DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);

	// Tokenize the line and convert from Writable "Text" to String for better handling
	DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer());

	// Sum up the words
	DataSet<Tuple2<String, Integer>> result = words.groupBy(0).aggregate(Aggregations.SUM, 1);

	// Convert String back to Writable "Text" for use with Hadoop Output Format
	DataSet<Tuple2<Text, IntWritable>> hadoopResult = result.map(new HadoopDatatypeMapper());

	// Set up Hadoop Output Format
	HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(), job);
	hadoopOutputFormat.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
	hadoopOutputFormat.getConfiguration().set("mapred.textoutputformat.separator", " "); // set the value for both, since this test
	TextOutputFormat.setOutputPath(job, new Path(outputPath));

	// Output & Execute
	hadoopResult.output(hadoopOutputFormat);
	env.execute("Word Count");
}
 
源代码14 项目: Flink-CEPplus   文件: HBaseWriteExample.java
public static void main(String[] args) throws Exception {

		if (!parseParameters(args)) {
			return;
		}

		// set up the execution environment
		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		// get input data
		DataSet<String> text = getTextDataSet(env);

		DataSet<Tuple2<String, Integer>> counts =
				// split up the lines in pairs (2-tuples) containing: (word, 1)
				text.flatMap(new Tokenizer())
				// group by the tuple field "0" and sum up tuple field "1"
				.groupBy(0)
				.sum(1);

		// emit result
		Job job = Job.getInstance();
		job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName);
		// TODO is "mapred.output.dir" really useful?
		job.getConfiguration().set("mapred.output.dir", HBaseFlinkTestConstants.TMP_DIR);
		counts.map(new RichMapFunction <Tuple2<String, Integer>, Tuple2<Text, Mutation>>() {
			private transient Tuple2<Text, Mutation> reuse;

			@Override
			public void open(Configuration parameters) throws Exception {
				super.open(parameters);
				reuse = new Tuple2<Text, Mutation>();
			}

			@Override
			public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception {
				reuse.f0 = new Text(t.f0);
				Put put = new Put(t.f0.getBytes(ConfigConstants.DEFAULT_CHARSET));
				put.add(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1));
				reuse.f1 = put;
				return reuse;
			}
		}).output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<Text>(), job));

		// execute program
		env.execute("WordCount (HBase sink) Example");
	}
 
源代码15 项目: flink   文件: HBaseWriteExample.java
public static void main(String[] args) throws Exception {

		if (!parseParameters(args)) {
			return;
		}

		// set up the execution environment
		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		// get input data
		DataSet<String> text = getTextDataSet(env);

		DataSet<Tuple2<String, Integer>> counts =
				// split up the lines in pairs (2-tuples) containing: (word, 1)
				text.flatMap(new Tokenizer())
				// group by the tuple field "0" and sum up tuple field "1"
				.groupBy(0)
				.sum(1);

		// emit result
		Job job = Job.getInstance();
		job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName);
		// TODO is "mapred.output.dir" really useful?
		job.getConfiguration().set("mapred.output.dir", HBaseFlinkTestConstants.TMP_DIR);
		counts.map(new RichMapFunction <Tuple2<String, Integer>, Tuple2<Text, Mutation>>() {
			private transient Tuple2<Text, Mutation> reuse;

			@Override
			public void open(Configuration parameters) throws Exception {
				super.open(parameters);
				reuse = new Tuple2<Text, Mutation>();
			}

			@Override
			public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception {
				reuse.f0 = new Text(t.f0);
				Put put = new Put(t.f0.getBytes(ConfigConstants.DEFAULT_CHARSET));
				put.add(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1));
				reuse.f1 = put;
				return reuse;
			}
		}).output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<Text>(), job));

		// execute program
		env.execute("WordCount (HBase sink) Example");
	}
 
@Override
protected void execute(OptionsHelper optionsHelper) throws Exception {
    final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
    final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
    final String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
    final String segmentIds = optionsHelper.getOptionValue(OPTION_MERGE_SEGMENT_IDS);
    final String dictOutputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH_DICT);
    final String statOutputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH_STAT);
    final String enableObjectReuseOptValue = optionsHelper.getOptionValue(OPTION_ENABLE_OBJECT_REUSE);

    boolean enableObjectReuse = false;
    if (enableObjectReuseOptValue != null && !enableObjectReuseOptValue.isEmpty()) {
        enableObjectReuse = true;
    }

    final Job job = Job.getInstance();

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    if (enableObjectReuse) {
        env.getConfig().enableObjectReuse();
    }

    HadoopUtil.deletePath(job.getConfiguration(), new Path(dictOutputPath));

    final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration());
    final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);

    final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
    final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());

    logger.info("Dictionary output path: {}", dictOutputPath);
    logger.info("Statistics output path: {}", statOutputPath);

    final TblColRef[] tblColRefs = cubeDesc.getAllColumnsNeedDictionaryBuilt().toArray(new TblColRef[0]);
    final int columnLength = tblColRefs.length;

    List<Integer> indexs = Lists.newArrayListWithCapacity(columnLength);

    for (int i = 0; i <= columnLength; i++) {
        indexs.add(i);
    }

    DataSource<Integer> indexDS = env.fromCollection(indexs);

    DataSet<Tuple2<Text, Text>> colToDictPathDS = indexDS.map(new MergeDictAndStatsFunction(cubeName,
            metaUrl, segmentId, StringUtil.splitByComma(segmentIds), statOutputPath, tblColRefs, sConf));

    FlinkUtil.setHadoopConfForCuboid(job, null, null);
    HadoopOutputFormat<Text, Text> hadoopOF =
            new HadoopOutputFormat<>(new SequenceFileOutputFormat<>(), job);
    SequenceFileOutputFormat.setOutputPath(job, new Path(dictOutputPath));

    colToDictPathDS.output(hadoopOF).setParallelism(1);

    env.execute("Merge dictionary for cube:" + cubeName + ", segment " + segmentId);
}
 
源代码17 项目: kylin   文件: FlinkMergingDictionary.java
@Override
protected void execute(OptionsHelper optionsHelper) throws Exception {
    final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
    final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
    final String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
    final String segmentIds = optionsHelper.getOptionValue(OPTION_MERGE_SEGMENT_IDS);
    final String dictOutputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH_DICT);
    final String statOutputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH_STAT);
    final String enableObjectReuseOptValue = optionsHelper.getOptionValue(OPTION_ENABLE_OBJECT_REUSE);

    boolean enableObjectReuse = false;
    if (enableObjectReuseOptValue != null && !enableObjectReuseOptValue.isEmpty()) {
        enableObjectReuse = true;
    }

    final Job job = Job.getInstance();

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    if (enableObjectReuse) {
        env.getConfig().enableObjectReuse();
    }

    HadoopUtil.deletePath(job.getConfiguration(), new Path(dictOutputPath));

    final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration());
    final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);

    final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
    final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());

    logger.info("Dictionary output path: {}", dictOutputPath);
    logger.info("Statistics output path: {}", statOutputPath);

    final TblColRef[] tblColRefs = cubeDesc.getAllColumnsNeedDictionaryBuilt().toArray(new TblColRef[0]);
    final int columnLength = tblColRefs.length;

    List<Integer> indexs = Lists.newArrayListWithCapacity(columnLength);

    for (int i = 0; i <= columnLength; i++) {
        indexs.add(i);
    }

    DataSource<Integer> indexDS = env.fromCollection(indexs);

    DataSet<Tuple2<Text, Text>> colToDictPathDS = indexDS.map(new MergeDictAndStatsFunction(cubeName,
            metaUrl, segmentId, StringUtil.splitByComma(segmentIds), statOutputPath, tblColRefs, sConf));

    FlinkUtil.setHadoopConfForCuboid(job, null, null);
    HadoopOutputFormat<Text, Text> hadoopOF =
            new HadoopOutputFormat<>(new SequenceFileOutputFormat<>(), job);
    SequenceFileOutputFormat.setOutputPath(job, new Path(dictOutputPath));

    colToDictPathDS.output(hadoopOF).setParallelism(1);

    env.execute("Merge dictionary for cube:" + cubeName + ", segment " + segmentId);
}
 
源代码18 项目: flink   文件: HBaseWriteExample.java
public static void main(String[] args) throws Exception {

		if (!parseParameters(args)) {
			return;
		}

		// set up the execution environment
		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		// get input data
		DataSet<String> text = getTextDataSet(env);

		DataSet<Tuple2<String, Integer>> counts =
				// split up the lines in pairs (2-tuples) containing: (word, 1)
				text.flatMap(new Tokenizer())
				// group by the tuple field "0" and sum up tuple field "1"
				.groupBy(0)
				.sum(1);

		// emit result
		Job job = Job.getInstance();
		job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName);
		// TODO is "mapred.output.dir" really useful?
		job.getConfiguration().set("mapred.output.dir", HBaseFlinkTestConstants.TMP_DIR);
		counts.map(new RichMapFunction <Tuple2<String, Integer>, Tuple2<Text, Mutation>>() {
			private transient Tuple2<Text, Mutation> reuse;

			@Override
			public void open(Configuration parameters) throws Exception {
				super.open(parameters);
				reuse = new Tuple2<Text, Mutation>();
			}

			@Override
			public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception {
				reuse.f0 = new Text(t.f0);
				Put put = new Put(t.f0.getBytes(ConfigConstants.DEFAULT_CHARSET));
				put.add(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1));
				reuse.f1 = put;
				return reuse;
			}
		}).output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<Text>(), job));

		// execute program
		env.execute("WordCount (HBase sink) Example");
	}