org.apache.hadoop.hbase.mapreduce.TableOutputFormat#org.apache.flink.api.java.DataSet源码实例Demo

下面列出了org.apache.hadoop.hbase.mapreduce.TableOutputFormat#org.apache.flink.api.java.DataSet 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: flink   文件: CoGroupOperatorTest.java
@Test(expected = InvalidProgramException.class)
public void testCoGroupKeyMixing3() {

	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
	DataSet<CustomType> ds2 = env.fromCollection(customTypeData);

	// should not work, incompatible types
	ds1.coGroup(ds2)
	.where(2)
	.equalTo(
			new KeySelector<CustomType, Long>() {

					@Override
					public Long getKey(CustomType value) {
						return value.myLong;
					}
				}
			);
}
 
源代码2 项目: OSTMap   文件: TimeFilterTest.java
@Test
public void testTimeFilter() throws Exception{
    TimeFilter tfAll = new TimeFilter(Long.MIN_VALUE,Long.MAX_VALUE);
    TimeFilter tf = new TimeFilter(0,1000);
    TimeFilter tfNone = new TimeFilter(5,0);

    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    ByteBuffer bb = ByteBuffer.allocate(Long.BYTES +Integer.BYTES);
    bb.putLong(100).putInt(123);
    Key k1 = new Key(new Text(bb.array()));

    ByteBuffer bb2 = ByteBuffer.allocate(Long.BYTES +Integer.BYTES);
    bb2.putLong(2000).putInt(1234);
    Key k2 = new Key(new Text(bb2.array()));

    DataSet<Tuple2<Key,Value>> data = env.fromElements(new Tuple2<>(k1, new Value()),
            new Tuple2<>(k2, new Value()));

    assertEquals(2,data.count());
    assertEquals(2,data.filter(tfAll).count());
    assertEquals(1,data.filter(tf).count());
    assertEquals(0,data.filter(tfNone).count());

}
 
源代码3 项目: flink   文件: GraphOperationsITCase.java
@SuppressWarnings("serial")
@Test
public void testFilterVertices() throws Exception {
	/*
	 * Test filterOnVertices:
	 */
	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
		TestGraphUtils.getLongLongEdgeData(env), env);

	DataSet<Edge<Long, Long>> data = graph.filterOnVertices(new FilterFunction<Vertex<Long, Long>>() {
		public boolean filter(Vertex<Long, Long> vertex) throws Exception {
			return (vertex.getValue() > 2);
		}
	}).getEdges();

	List<Edge<Long, Long>> result = data.collect();

	expectedResult = "3,4,34\n" +
		"3,5,35\n" +
		"4,5,45\n";

	compareResultAsTuples(result, expectedResult);
}
 
源代码4 项目: flink   文件: GraphMutationsITCase.java
@Test
public void testAddVertexExisting() throws Exception {
	/*
	 * Test addVertex() -- add an existing vertex
	 */
	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
			TestGraphUtils.getLongLongEdgeData(env), env);

	graph = graph.addVertex(new Vertex<>(1L, 1L));

	DataSet<Vertex<Long, Long>> data = graph.getVertices();
	List<Vertex<Long, Long>> result = data.collect();

	expectedResult = "1,1\n" +
			"2,2\n" +
			"3,3\n" +
			"4,4\n" +
			"5,5\n";

	compareResultAsTuples(result, expectedResult);
}
 
源代码5 项目: flink   文件: DegreesITCase.java
@Test
public void testOutDegreesWithNoOutEdges() throws Exception {
	/*
	 * Test outDegrees() no outgoing edges
	 */
	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
		TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env);

	DataSet<Tuple2<Long, LongValue>> data = graph.outDegrees();
	List<Tuple2<Long, LongValue>> result = data.collect();

	expectedResult = "1,3\n" +
		"2,1\n" +
		"3,1\n" +
		"4,1\n" +
		"5,0\n";

	compareResultAsTuples(result, expectedResult);
}
 
源代码6 项目: cascading-flink   文件: FlinkFlowStep.java
private DataSet<Tuple> translateMerge(List<DataSet<Tuple>> inputs, FlowNode node) {

		DataSet<Tuple> unioned = null;
		TypeInformation<Tuple> type = null;

		int maxDop = -1;

		for(DataSet<Tuple> input : inputs) {
			maxDop = Math.max(maxDop, ((Operator)input).getParallelism());
			if(unioned == null) {
				unioned = input;
				type = input.getType();
			}
			else {
				unioned = unioned.union(input);
			}
		}
		return unioned.map(new IdMapper())
				.returns(type)
				.setParallelism(maxDop);

	}
 
源代码7 项目: flink   文件: GraphGeneratorUtils.java
/**
 * Generates {@link Vertex Vertices} with sequential, numerical labels.
 *
 * @param env the Flink execution environment.
 * @param parallelism operator parallelism
 * @param vertexCount number of sequential vertex labels
 * @return {@link DataSet} of sequentially labeled {@link Vertex vertices}
 */
public static DataSet<Vertex<LongValue, NullValue>> vertexSequence(ExecutionEnvironment env, int parallelism, long vertexCount) {
	Preconditions.checkArgument(vertexCount >= 0, "Vertex count must be non-negative");

	if (vertexCount == 0) {
		return env
			.fromCollection(Collections.emptyList(), TypeInformation.of(new TypeHint<Vertex<LongValue, NullValue>>(){}))
				.setParallelism(parallelism)
				.name("Empty vertex set");
	} else {
		LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, vertexCount - 1);

		DataSource<LongValue> vertexLabels = env
			.fromParallelCollection(iterator, LongValue.class)
				.setParallelism(parallelism)
				.name("Vertex indices");

		return vertexLabels
			.map(new CreateVertex())
				.setParallelism(parallelism)
				.name("Vertex sequence");
	}
}
 
源代码8 项目: Flink-CEPplus   文件: PartitionITCase.java
@Test
public void testRangePartitionWithKeyExpression() throws Exception {
	/*
	 * Test range partition with key expression
	 */

	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	env.setParallelism(3);

	DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env);
	DataSet<Long> uniqLongs = ds
		.partitionByRange("nestedPojo.longNumber").setParallelism(4)
		.mapPartition(new UniqueNestedPojoLongMapper());
	List<Long> result = uniqLongs.collect();

	String expected = "10000\n" +
		"20000\n" +
		"30000\n";

	compareResultAsText(result, expected);
}
 
源代码9 项目: Flink-CEPplus   文件: GroupingTest.java
@Test
public void testGroupByKeyExpressions1() {

	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	this.customTypeData.add(new CustomType());

	DataSet<CustomType> ds = env.fromCollection(customTypeData);

	// should work
	try {
		ds.groupBy("myInt");
	} catch (Exception e) {
		Assert.fail();
	}
}
 
源代码10 项目: flink   文件: Simplify.java
@Override
public Graph<K, VV, EV> runInternal(Graph<K, VV, EV> input)
		throws Exception {
	// Edges
	DataSet<Edge<K, EV>> edges = input
		.getEdges()
		.flatMap(new SymmetrizeAndRemoveSelfLoops<>(clipAndFlip))
			.setParallelism(parallelism)
			.name("Remove self-loops")
		.distinct(0, 1)
			.setCombineHint(CombineHint.NONE)
			.setParallelism(parallelism)
			.name("Remove duplicate edges");

	// Graph
	return Graph.fromDataSet(input.getVertices(), edges, input.getContext());
}
 
源代码11 项目: Flink-CEPplus   文件: GraphCreationITCase.java
@Test
public void testValidateWithInvalidIds() throws Exception {
	/*
	 * Test validate() - invalid vertex ids
	 */
	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	DataSet<Vertex<Long, Long>> vertices = TestGraphUtils.getLongLongInvalidVertexData(env);
	DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env);

	Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);
	Boolean valid = graph.validate(new InvalidVertexIdsValidator<>());

	String res = valid.toString(); //env.fromElements(valid);
	List<String> result = new LinkedList<>();
	result.add(res);

	expectedResult = "false\n";

	compareResultAsText(result, expectedResult);
}
 
源代码12 项目: flink   文件: BroadcastUnionITCase.java
@Override
protected void testProgram() throws Exception {
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	env.setParallelism(4);

	DataSet<Long> input = env.generateSequence(1, 10);
	DataSet<Long> bc1 = env.generateSequence(1, 5);
	DataSet<Long> bc2 = env.generateSequence(6, 10);

	List<Long> result = input
			.map(new Mapper())
			.withBroadcastSet(bc1.union(bc2), BC_NAME)
			.reduce(new Reducer())
			.collect();

	Assert.assertEquals(Long.valueOf(3025), result.get(0));
}
 
源代码13 项目: flink   文件: DegreesITCase.java
@Test
public void testOutDegreesWithNoOutEdges() throws Exception {
	/*
	 * Test outDegrees() no outgoing edges
	 */
	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
		TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env);

	DataSet<Tuple2<Long, LongValue>> data = graph.outDegrees();
	List<Tuple2<Long, LongValue>> result = data.collect();

	expectedResult = "1,3\n" +
		"2,1\n" +
		"3,1\n" +
		"4,1\n" +
		"5,0\n";

	compareResultAsTuples(result, expectedResult);
}
 
源代码14 项目: Flink-CEPplus   文件: CrossITCase.java
@Test
public void testCorrectnessOfCrossWithTiny() throws Exception {
	/*
	 * check correctness of crossWithTiny (only correctness of result -> should be the same as with normal cross)
	 */

	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
	DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
	DataSet<Tuple2<Integer, String>> crossDs = ds.crossWithTiny(ds2).with(new Tuple5Cross());

	List<Tuple2<Integer, String>> result = crossDs.collect();

	String expected = "0,HalloHallo\n" +
			"1,HalloHallo Welt\n" +
			"2,HalloHallo Welt wie\n" +
			"1,Hallo WeltHallo\n" +
			"2,Hallo WeltHallo Welt\n" +
			"3,Hallo WeltHallo Welt wie\n" +
			"2,Hallo Welt wieHallo\n" +
			"3,Hallo Welt wieHallo Welt\n" +
			"4,Hallo Welt wieHallo Welt wie\n";

	compareResultAsTuples(result, expected);
}
 
源代码15 项目: Flink-CEPplus   文件: FirstNITCase.java
@Test
public void testFirstNOnUngroupedDS() throws Exception {
	/*
	 * First-n on ungrouped data set
	 */

	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
	DataSet<Tuple1<Integer>> seven = ds.first(7).map(new OneMapper()).sum(0);

	List<Tuple1<Integer>> result = seven.collect();

	String expected = "(7)\n";

	compareResultAsText(result, expected);
}
 
源代码16 项目: flink   文件: JoinCancelingITCase.java
private void executeTaskWithGenerator(
		JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joiner,
		int keys, int vals, int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	DataSet<Tuple2<Integer, Integer>> input1 = env.createInput(new UniformIntTupleGeneratorInputFormat(keys, vals));
	DataSet<Tuple2<Integer, Integer>> input2 = env.createInput(new UniformIntTupleGeneratorInputFormat(keys, vals));

	input1.join(input2, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE)
			.where(0)
			.equalTo(0)
			.with(joiner)
			.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());

	env.setParallelism(PARALLELISM);

	runAndCancelJob(env.createProgramPlan(), msecsTillCanceling, maxTimeTillCanceled);
}
 
源代码17 项目: flink   文件: VertexDegreeTest.java
@Test
public void testWithSimpleGraph() throws Exception {
	String expectedResult =
		"(0,2)\n" +
		"(1,3)\n" +
		"(2,3)\n" +
		"(3,4)\n" +
		"(4,1)\n" +
		"(5,1)";

	DataSet<Vertex<IntValue, LongValue>> degreeOnSourceId = undirectedSimpleGraph
		.run(new VertexDegree<IntValue, NullValue, NullValue>()
			.setReduceOnTargetId(false));

	TestBaseUtils.compareResultAsText(degreeOnSourceId.collect(), expectedResult);

	DataSet<Vertex<IntValue, LongValue>> degreeOnTargetId = undirectedSimpleGraph
		.run(new VertexDegree<IntValue, NullValue, NullValue>()
			.setReduceOnTargetId(true));

	TestBaseUtils.compareResultAsText(degreeOnTargetId.collect(), expectedResult);
}
 
源代码18 项目: flink   文件: JoinWithEdgesITCase.java
@Test
public void testOnSourceWithCustom() throws Exception {
	/*
     * Test joinWithEdgesOnSource with a DataSet containing custom parametrised type input values
	 */
	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
		TestGraphUtils.getLongLongEdgeData(env), env);

	Graph<Long, Long, Long> res = graph.joinWithEdgesOnSource(TestGraphUtils.getLongCustomTuple2SourceData(env),
		new CustomValueMapper());

	DataSet<Edge<Long, Long>> data = res.getEdges();
	List<Edge<Long, Long>> result = data.collect();

	expectedResult = "1,2,10\n" +
		"1,3,10\n" +
		"2,3,30\n" +
		"3,4,40\n" +
		"3,5,40\n" +
		"4,5,45\n" +
		"5,1,51\n";

	compareResultAsTuples(result, expectedResult);
}
 
@Test
public void testUnaryFunctionInPlaceForwardedAnnotation() {
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	@SuppressWarnings("unchecked")
	DataSet<Tuple3<Long, String, Integer>> input = env.fromElements(new Tuple3<Long, String, Integer>(3L, "test", 42));
	input.map(new IndividualForwardedMapper<Long, String, Integer>()).output(new DiscardingOutputFormat<Tuple3<Long, String, Integer>>());
	Plan plan = env.createProgramPlan();

	GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
	MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();

	SingleInputSemanticProperties semantics = mapper.getSemanticProperties();

	FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
	FieldSet fw2 = semantics.getForwardingTargetFields(0, 2);
	assertNotNull(fw1);
	assertNotNull(fw2);
	assertTrue(fw1.contains(0));
	assertTrue(fw2.contains(2));
}
 
源代码20 项目: flink   文件: EdgeSourceDegree.java
@Override
public DataSet<Edge<K, Tuple2<EV, LongValue>>> runInternal(Graph<K, VV, EV> input)
		throws Exception {
	// s, d(s)
	DataSet<Vertex<K, LongValue>> vertexDegrees = input
		.run(new VertexDegree<K, VV, EV>()
			.setReduceOnTargetId(reduceOnTargetId.get())
			.setParallelism(parallelism));

	// s, t, d(s)
	return input.getEdges()
		.join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
		.where(0)
		.equalTo(0)
		.with(new JoinEdgeWithVertexDegree<>())
			.setParallelism(parallelism)
			.name("Edge source degree");
}
 
源代码21 项目: flink   文件: GroupReduceITCase.java
@Test
public void testCorrectnessOfGroupReduceOnTuplesWithKeyExtractor() throws Exception {
	/*
	 * check correctness of groupReduce on tuples with key extractor
	 */

	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
	DataSet<Tuple2<Integer, Long>> reduceDs = ds.
			groupBy(new KeySelector1()).reduceGroup(new Tuple3GroupReduce());

	List<Tuple2<Integer, Long>> result = reduceDs.collect();

	String expected = "1,1\n" +
			"5,2\n" +
			"15,3\n" +
			"34,4\n" +
			"65,5\n" +
			"111,6\n";

	compareResultAsTuples(result, expected);
}
 
源代码22 项目: flink   文件: SumMinMaxITCase.java
@Test
public void testSumMaxAndProject() throws Exception {
	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
	DataSet<Tuple2<Integer, Long>> sumDs = ds
			.sum(0)
			.andMax(1)
			.project(0, 1);

	List<Tuple2<Integer, Long>> result = sumDs.collect();

	String expected = "231,6\n";

	compareResultAsTuples(result, expected);
}
 
源代码23 项目: flink   文件: VertexMetrics.java
@Override
public VertexMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
		throws Exception {
	super.run(input);

	DataSet<Vertex<K, Degrees>> vertexDegree = input
		.run(new VertexDegrees<K, VV, EV>()
			.setIncludeZeroDegreeVertices(includeZeroDegreeVertices)
			.setParallelism(parallelism));

	vertexMetricsHelper = new VertexMetricsHelper<>();

	vertexDegree
		.output(vertexMetricsHelper)
			.name("Vertex metrics");

	return this;
}
 
源代码24 项目: flink   文件: JavaTableEnvironmentITCase.java
@Test
public void testAsFromPrivateFieldsPojo() throws Exception {
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env, config());

	List<PrivateSmallPojo> data = new ArrayList<>();
	data.add(new PrivateSmallPojo("Peter", 28, 4000.00, "Sales"));
	data.add(new PrivateSmallPojo("Anna", 56, 10000.00, "Engineering"));
	data.add(new PrivateSmallPojo("Lucy", 42, 6000.00, "HR"));

	Table table = tableEnv
		.fromDataSet(env.fromCollection(data),
			$("department").as("a"),
			$("age").as("b"),
			$("salary").as("c"),
			$("name").as("d"))
		.select($("a"), $("b"), $("c"), $("d"));

	DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
	List<Row> results = ds.collect();
	String expected =
		"Sales,28,4000.0,Peter\n" +
		"Engineering,56,10000.0,Anna\n" +
		"HR,42,6000.0,Lucy\n";
	compareResultAsText(results, expected);
}
 
源代码25 项目: flink   文件: MapEdgesITCase.java
@Test
public void testWithTuple1Type() throws Exception {
	/*
	 * Test mapEdges() and change the value type to a Tuple1
	 */
	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
		TestGraphUtils.getLongLongEdgeData(env), env);

	DataSet<Edge<Long, Tuple1<Long>>> mappedEdges = graph.mapEdges(new ToTuple1Mapper()).getEdges();
	List<Edge<Long, Tuple1<Long>>> result = mappedEdges.collect();

	expectedResult = "1,2,(12)\n" +
		"1,3,(13)\n" +
		"2,3,(23)\n" +
		"3,4,(34)\n" +
		"3,5,(35)\n" +
		"4,5,(45)\n" +
		"5,1,(51)\n";

	compareResultAsTuples(result, expectedResult);
}
 
源代码26 项目: flink   文件: MapVerticesITCase.java
@Test
public void testWithtuple1Value() throws Exception {
	/*
	 * Test mapVertices() and change the value type to a Tuple1
	 */
	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
		TestGraphUtils.getLongLongEdgeData(env), env);

	DataSet<Vertex<Long, Tuple1<Long>>> mappedVertices = graph.mapVertices(new ToTuple1Mapper()).getVertices();
	List<Vertex<Long, Tuple1<Long>>> result = mappedVertices.collect();

	expectedResult = "1,(1)\n" +
		"2,(2)\n" +
		"3,(3)\n" +
		"4,(4)\n" +
		"5,(5)\n";

	compareResultAsTuples(result, expectedResult);
}
 
源代码27 项目: flink   文件: HardPlansCompilationTest.java
/**
 * Source -> Map -> Reduce -> Cross -> Reduce -> Cross -> Reduce ->
 * |--------------------------/                  /
 * |--------------------------------------------/
 * 
 * First cross has SameKeyFirst output contract
 */
@Test
public void testTicket158() {
	// construct the plan
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	env.setParallelism(DEFAULT_PARALLELISM);
	DataSet<Long> set1 = env.generateSequence(0,1);

	set1.map(new IdentityMapper<Long>()).name("Map1")
			.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce1")
			.cross(set1).with(new IdentityCrosser<Long>()).withForwardedFieldsFirst("*").name("Cross1")
			.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce2")
			.cross(set1).with(new IdentityCrosser<Long>()).name("Cross2")
			.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce3")
			.output(new DiscardingOutputFormat<Long>()).name("Sink");

	Plan plan = env.createProgramPlan();
	OptimizedPlan oPlan = compileNoStats(plan);

	JobGraphGenerator jobGen = new JobGraphGenerator();
	jobGen.compileJobGraph(oPlan);
}
 
源代码28 项目: flink   文件: WordCountSimplePOJOITCase.java
@Override
protected void testProgram() throws Exception {
	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	DataSet<String> text = env.readTextFile(textPath);

	DataSet<WC> counts = text
			.flatMap(new Tokenizer())
			.groupBy("word")
			.reduce(new ReduceFunction<WC>() {
				private static final long serialVersionUID = 1L;

				public WC reduce(WC value1, WC value2) {
					return new WC(value1.word, value1.count + value2.count);
				}
			});

	counts.writeAsText(resultPath);

	env.execute("WordCount with custom data types example");
}
 
源代码29 项目: flink   文件: WritableSavepoint.java
/**
 * Write out a new or updated savepoint.
 * @param path The path to where the savepoint should be written.
 */
public final void write(String path) {
	final Path savepointPath = new Path(path);

	List<BootstrapTransformationWithID<?>> newOperatorTransformations = metadata.getNewOperators();
	DataSet<OperatorState> newOperatorStates = writeOperatorStates(newOperatorTransformations, savepointPath);

	List<OperatorState> existingOperators = metadata.getExistingOperators();

	DataSet<OperatorState> finalOperatorStates = unionOperatorStates(newOperatorStates, existingOperators);

	finalOperatorStates
		.reduceGroup(new MergeOperatorStates(metadata.getMasterStates()))
		.name("reduce(OperatorState)")
		.output(new SavepointOutputFormat(savepointPath))
		.name(path);
}
 
源代码30 项目: flink   文件: JoinWithEdgesITCase.java
@Test
public void testWithEdgesOnTarget() throws Exception {
	/*
	 * Test joinWithEdgesOnTarget with the input DataSet parameter identical
	 * to the edge DataSet
	 */
	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
		TestGraphUtils.getLongLongEdgeData(env), env);

	Graph<Long, Long, Long> res = graph.joinWithEdgesOnTarget(graph.getEdges()
		.map(new ProjectTargetAndValueMapper()), new AddValuesMapper());

	DataSet<Edge<Long, Long>> data = res.getEdges();
	List<Edge<Long, Long>> result = data.collect();

	expectedResult = "1,2,24\n" +
		"1,3,26\n" +
		"2,3,36\n" +
		"3,4,68\n" +
		"3,5,70\n" +
		"4,5,80\n" +
		"5,1,102\n";

	compareResultAsTuples(result, expectedResult);
}