下面列出了org.apache.hadoop.hbase.mapreduce.TableOutputFormat#org.apache.flink.api.java.DataSet 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test(expected = InvalidProgramException.class)
public void testCoGroupKeyMixing3() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
// should not work, incompatible types
ds1.coGroup(ds2)
.where(2)
.equalTo(
new KeySelector<CustomType, Long>() {
@Override
public Long getKey(CustomType value) {
return value.myLong;
}
}
);
}
@Test
public void testTimeFilter() throws Exception{
TimeFilter tfAll = new TimeFilter(Long.MIN_VALUE,Long.MAX_VALUE);
TimeFilter tf = new TimeFilter(0,1000);
TimeFilter tfNone = new TimeFilter(5,0);
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ByteBuffer bb = ByteBuffer.allocate(Long.BYTES +Integer.BYTES);
bb.putLong(100).putInt(123);
Key k1 = new Key(new Text(bb.array()));
ByteBuffer bb2 = ByteBuffer.allocate(Long.BYTES +Integer.BYTES);
bb2.putLong(2000).putInt(1234);
Key k2 = new Key(new Text(bb2.array()));
DataSet<Tuple2<Key,Value>> data = env.fromElements(new Tuple2<>(k1, new Value()),
new Tuple2<>(k2, new Value()));
assertEquals(2,data.count());
assertEquals(2,data.filter(tfAll).count());
assertEquals(1,data.filter(tf).count());
assertEquals(0,data.filter(tfNone).count());
}
@SuppressWarnings("serial")
@Test
public void testFilterVertices() throws Exception {
/*
* Test filterOnVertices:
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Edge<Long, Long>> data = graph.filterOnVertices(new FilterFunction<Vertex<Long, Long>>() {
public boolean filter(Vertex<Long, Long> vertex) throws Exception {
return (vertex.getValue() > 2);
}
}).getEdges();
List<Edge<Long, Long>> result = data.collect();
expectedResult = "3,4,34\n" +
"3,5,35\n" +
"4,5,45\n";
compareResultAsTuples(result, expectedResult);
}
@Test
public void testAddVertexExisting() throws Exception {
/*
* Test addVertex() -- add an existing vertex
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
graph = graph.addVertex(new Vertex<>(1L, 1L));
DataSet<Vertex<Long, Long>> data = graph.getVertices();
List<Vertex<Long, Long>> result = data.collect();
expectedResult = "1,1\n" +
"2,2\n" +
"3,3\n" +
"4,4\n" +
"5,5\n";
compareResultAsTuples(result, expectedResult);
}
@Test
public void testOutDegreesWithNoOutEdges() throws Exception {
/*
* Test outDegrees() no outgoing edges
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env);
DataSet<Tuple2<Long, LongValue>> data = graph.outDegrees();
List<Tuple2<Long, LongValue>> result = data.collect();
expectedResult = "1,3\n" +
"2,1\n" +
"3,1\n" +
"4,1\n" +
"5,0\n";
compareResultAsTuples(result, expectedResult);
}
private DataSet<Tuple> translateMerge(List<DataSet<Tuple>> inputs, FlowNode node) {
DataSet<Tuple> unioned = null;
TypeInformation<Tuple> type = null;
int maxDop = -1;
for(DataSet<Tuple> input : inputs) {
maxDop = Math.max(maxDop, ((Operator)input).getParallelism());
if(unioned == null) {
unioned = input;
type = input.getType();
}
else {
unioned = unioned.union(input);
}
}
return unioned.map(new IdMapper())
.returns(type)
.setParallelism(maxDop);
}
/**
* Generates {@link Vertex Vertices} with sequential, numerical labels.
*
* @param env the Flink execution environment.
* @param parallelism operator parallelism
* @param vertexCount number of sequential vertex labels
* @return {@link DataSet} of sequentially labeled {@link Vertex vertices}
*/
public static DataSet<Vertex<LongValue, NullValue>> vertexSequence(ExecutionEnvironment env, int parallelism, long vertexCount) {
Preconditions.checkArgument(vertexCount >= 0, "Vertex count must be non-negative");
if (vertexCount == 0) {
return env
.fromCollection(Collections.emptyList(), TypeInformation.of(new TypeHint<Vertex<LongValue, NullValue>>(){}))
.setParallelism(parallelism)
.name("Empty vertex set");
} else {
LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, vertexCount - 1);
DataSource<LongValue> vertexLabels = env
.fromParallelCollection(iterator, LongValue.class)
.setParallelism(parallelism)
.name("Vertex indices");
return vertexLabels
.map(new CreateVertex())
.setParallelism(parallelism)
.name("Vertex sequence");
}
}
@Test
public void testRangePartitionWithKeyExpression() throws Exception {
/*
* Test range partition with key expression
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env);
DataSet<Long> uniqLongs = ds
.partitionByRange("nestedPojo.longNumber").setParallelism(4)
.mapPartition(new UniqueNestedPojoLongMapper());
List<Long> result = uniqLongs.collect();
String expected = "10000\n" +
"20000\n" +
"30000\n";
compareResultAsText(result, expected);
}
@Test
public void testGroupByKeyExpressions1() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
this.customTypeData.add(new CustomType());
DataSet<CustomType> ds = env.fromCollection(customTypeData);
// should work
try {
ds.groupBy("myInt");
} catch (Exception e) {
Assert.fail();
}
}
@Override
public Graph<K, VV, EV> runInternal(Graph<K, VV, EV> input)
throws Exception {
// Edges
DataSet<Edge<K, EV>> edges = input
.getEdges()
.flatMap(new SymmetrizeAndRemoveSelfLoops<>(clipAndFlip))
.setParallelism(parallelism)
.name("Remove self-loops")
.distinct(0, 1)
.setCombineHint(CombineHint.NONE)
.setParallelism(parallelism)
.name("Remove duplicate edges");
// Graph
return Graph.fromDataSet(input.getVertices(), edges, input.getContext());
}
@Test
public void testValidateWithInvalidIds() throws Exception {
/*
* Test validate() - invalid vertex ids
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Vertex<Long, Long>> vertices = TestGraphUtils.getLongLongInvalidVertexData(env);
DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env);
Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);
Boolean valid = graph.validate(new InvalidVertexIdsValidator<>());
String res = valid.toString(); //env.fromElements(valid);
List<String> result = new LinkedList<>();
result.add(res);
expectedResult = "false\n";
compareResultAsText(result, expectedResult);
}
@Override
protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
DataSet<Long> input = env.generateSequence(1, 10);
DataSet<Long> bc1 = env.generateSequence(1, 5);
DataSet<Long> bc2 = env.generateSequence(6, 10);
List<Long> result = input
.map(new Mapper())
.withBroadcastSet(bc1.union(bc2), BC_NAME)
.reduce(new Reducer())
.collect();
Assert.assertEquals(Long.valueOf(3025), result.get(0));
}
@Test
public void testOutDegreesWithNoOutEdges() throws Exception {
/*
* Test outDegrees() no outgoing edges
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env);
DataSet<Tuple2<Long, LongValue>> data = graph.outDegrees();
List<Tuple2<Long, LongValue>> result = data.collect();
expectedResult = "1,3\n" +
"2,1\n" +
"3,1\n" +
"4,1\n" +
"5,0\n";
compareResultAsTuples(result, expectedResult);
}
@Test
public void testCorrectnessOfCrossWithTiny() throws Exception {
/*
* check correctness of crossWithTiny (only correctness of result -> should be the same as with normal cross)
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
DataSet<Tuple2<Integer, String>> crossDs = ds.crossWithTiny(ds2).with(new Tuple5Cross());
List<Tuple2<Integer, String>> result = crossDs.collect();
String expected = "0,HalloHallo\n" +
"1,HalloHallo Welt\n" +
"2,HalloHallo Welt wie\n" +
"1,Hallo WeltHallo\n" +
"2,Hallo WeltHallo Welt\n" +
"3,Hallo WeltHallo Welt wie\n" +
"2,Hallo Welt wieHallo\n" +
"3,Hallo Welt wieHallo Welt\n" +
"4,Hallo Welt wieHallo Welt wie\n";
compareResultAsTuples(result, expected);
}
@Test
public void testFirstNOnUngroupedDS() throws Exception {
/*
* First-n on ungrouped data set
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple1<Integer>> seven = ds.first(7).map(new OneMapper()).sum(0);
List<Tuple1<Integer>> result = seven.collect();
String expected = "(7)\n";
compareResultAsText(result, expected);
}
private void executeTaskWithGenerator(
JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joiner,
int keys, int vals, int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Integer, Integer>> input1 = env.createInput(new UniformIntTupleGeneratorInputFormat(keys, vals));
DataSet<Tuple2<Integer, Integer>> input2 = env.createInput(new UniformIntTupleGeneratorInputFormat(keys, vals));
input1.join(input2, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE)
.where(0)
.equalTo(0)
.with(joiner)
.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
env.setParallelism(PARALLELISM);
runAndCancelJob(env.createProgramPlan(), msecsTillCanceling, maxTimeTillCanceled);
}
@Test
public void testWithSimpleGraph() throws Exception {
String expectedResult =
"(0,2)\n" +
"(1,3)\n" +
"(2,3)\n" +
"(3,4)\n" +
"(4,1)\n" +
"(5,1)";
DataSet<Vertex<IntValue, LongValue>> degreeOnSourceId = undirectedSimpleGraph
.run(new VertexDegree<IntValue, NullValue, NullValue>()
.setReduceOnTargetId(false));
TestBaseUtils.compareResultAsText(degreeOnSourceId.collect(), expectedResult);
DataSet<Vertex<IntValue, LongValue>> degreeOnTargetId = undirectedSimpleGraph
.run(new VertexDegree<IntValue, NullValue, NullValue>()
.setReduceOnTargetId(true));
TestBaseUtils.compareResultAsText(degreeOnTargetId.collect(), expectedResult);
}
@Test
public void testOnSourceWithCustom() throws Exception {
/*
* Test joinWithEdgesOnSource with a DataSet containing custom parametrised type input values
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
Graph<Long, Long, Long> res = graph.joinWithEdgesOnSource(TestGraphUtils.getLongCustomTuple2SourceData(env),
new CustomValueMapper());
DataSet<Edge<Long, Long>> data = res.getEdges();
List<Edge<Long, Long>> result = data.collect();
expectedResult = "1,2,10\n" +
"1,3,10\n" +
"2,3,30\n" +
"3,4,40\n" +
"3,5,40\n" +
"4,5,45\n" +
"5,1,51\n";
compareResultAsTuples(result, expectedResult);
}
@Test
public void testUnaryFunctionInPlaceForwardedAnnotation() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, String, Integer>> input = env.fromElements(new Tuple3<Long, String, Integer>(3L, "test", 42));
input.map(new IndividualForwardedMapper<Long, String, Integer>()).output(new DiscardingOutputFormat<Tuple3<Long, String, Integer>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
FieldSet fw2 = semantics.getForwardingTargetFields(0, 2);
assertNotNull(fw1);
assertNotNull(fw2);
assertTrue(fw1.contains(0));
assertTrue(fw2.contains(2));
}
@Override
public DataSet<Edge<K, Tuple2<EV, LongValue>>> runInternal(Graph<K, VV, EV> input)
throws Exception {
// s, d(s)
DataSet<Vertex<K, LongValue>> vertexDegrees = input
.run(new VertexDegree<K, VV, EV>()
.setReduceOnTargetId(reduceOnTargetId.get())
.setParallelism(parallelism));
// s, t, d(s)
return input.getEdges()
.join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
.where(0)
.equalTo(0)
.with(new JoinEdgeWithVertexDegree<>())
.setParallelism(parallelism)
.name("Edge source degree");
}
@Test
public void testCorrectnessOfGroupReduceOnTuplesWithKeyExtractor() throws Exception {
/*
* check correctness of groupReduce on tuples with key extractor
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple2<Integer, Long>> reduceDs = ds.
groupBy(new KeySelector1()).reduceGroup(new Tuple3GroupReduce());
List<Tuple2<Integer, Long>> result = reduceDs.collect();
String expected = "1,1\n" +
"5,2\n" +
"15,3\n" +
"34,4\n" +
"65,5\n" +
"111,6\n";
compareResultAsTuples(result, expected);
}
@Test
public void testSumMaxAndProject() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple2<Integer, Long>> sumDs = ds
.sum(0)
.andMax(1)
.project(0, 1);
List<Tuple2<Integer, Long>> result = sumDs.collect();
String expected = "231,6\n";
compareResultAsTuples(result, expected);
}
@Override
public VertexMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
throws Exception {
super.run(input);
DataSet<Vertex<K, Degrees>> vertexDegree = input
.run(new VertexDegrees<K, VV, EV>()
.setIncludeZeroDegreeVertices(includeZeroDegreeVertices)
.setParallelism(parallelism));
vertexMetricsHelper = new VertexMetricsHelper<>();
vertexDegree
.output(vertexMetricsHelper)
.name("Vertex metrics");
return this;
}
@Test
public void testAsFromPrivateFieldsPojo() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env, config());
List<PrivateSmallPojo> data = new ArrayList<>();
data.add(new PrivateSmallPojo("Peter", 28, 4000.00, "Sales"));
data.add(new PrivateSmallPojo("Anna", 56, 10000.00, "Engineering"));
data.add(new PrivateSmallPojo("Lucy", 42, 6000.00, "HR"));
Table table = tableEnv
.fromDataSet(env.fromCollection(data),
$("department").as("a"),
$("age").as("b"),
$("salary").as("c"),
$("name").as("d"))
.select($("a"), $("b"), $("c"), $("d"));
DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
List<Row> results = ds.collect();
String expected =
"Sales,28,4000.0,Peter\n" +
"Engineering,56,10000.0,Anna\n" +
"HR,42,6000.0,Lucy\n";
compareResultAsText(results, expected);
}
@Test
public void testWithTuple1Type() throws Exception {
/*
* Test mapEdges() and change the value type to a Tuple1
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Edge<Long, Tuple1<Long>>> mappedEdges = graph.mapEdges(new ToTuple1Mapper()).getEdges();
List<Edge<Long, Tuple1<Long>>> result = mappedEdges.collect();
expectedResult = "1,2,(12)\n" +
"1,3,(13)\n" +
"2,3,(23)\n" +
"3,4,(34)\n" +
"3,5,(35)\n" +
"4,5,(45)\n" +
"5,1,(51)\n";
compareResultAsTuples(result, expectedResult);
}
@Test
public void testWithtuple1Value() throws Exception {
/*
* Test mapVertices() and change the value type to a Tuple1
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Vertex<Long, Tuple1<Long>>> mappedVertices = graph.mapVertices(new ToTuple1Mapper()).getVertices();
List<Vertex<Long, Tuple1<Long>>> result = mappedVertices.collect();
expectedResult = "1,(1)\n" +
"2,(2)\n" +
"3,(3)\n" +
"4,(4)\n" +
"5,(5)\n";
compareResultAsTuples(result, expectedResult);
}
/**
* Source -> Map -> Reduce -> Cross -> Reduce -> Cross -> Reduce ->
* |--------------------------/ /
* |--------------------------------------------/
*
* First cross has SameKeyFirst output contract
*/
@Test
public void testTicket158() {
// construct the plan
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Long> set1 = env.generateSequence(0,1);
set1.map(new IdentityMapper<Long>()).name("Map1")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce1")
.cross(set1).with(new IdentityCrosser<Long>()).withForwardedFieldsFirst("*").name("Cross1")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce2")
.cross(set1).with(new IdentityCrosser<Long>()).name("Cross2")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce3")
.output(new DiscardingOutputFormat<Long>()).name("Sink");
Plan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileNoStats(plan);
JobGraphGenerator jobGen = new JobGraphGenerator();
jobGen.compileJobGraph(oPlan);
}
@Override
protected void testProgram() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.readTextFile(textPath);
DataSet<WC> counts = text
.flatMap(new Tokenizer())
.groupBy("word")
.reduce(new ReduceFunction<WC>() {
private static final long serialVersionUID = 1L;
public WC reduce(WC value1, WC value2) {
return new WC(value1.word, value1.count + value2.count);
}
});
counts.writeAsText(resultPath);
env.execute("WordCount with custom data types example");
}
/**
* Write out a new or updated savepoint.
* @param path The path to where the savepoint should be written.
*/
public final void write(String path) {
final Path savepointPath = new Path(path);
List<BootstrapTransformationWithID<?>> newOperatorTransformations = metadata.getNewOperators();
DataSet<OperatorState> newOperatorStates = writeOperatorStates(newOperatorTransformations, savepointPath);
List<OperatorState> existingOperators = metadata.getExistingOperators();
DataSet<OperatorState> finalOperatorStates = unionOperatorStates(newOperatorStates, existingOperators);
finalOperatorStates
.reduceGroup(new MergeOperatorStates(metadata.getMasterStates()))
.name("reduce(OperatorState)")
.output(new SavepointOutputFormat(savepointPath))
.name(path);
}
@Test
public void testWithEdgesOnTarget() throws Exception {
/*
* Test joinWithEdgesOnTarget with the input DataSet parameter identical
* to the edge DataSet
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
Graph<Long, Long, Long> res = graph.joinWithEdgesOnTarget(graph.getEdges()
.map(new ProjectTargetAndValueMapper()), new AddValuesMapper());
DataSet<Edge<Long, Long>> data = res.getEdges();
List<Edge<Long, Long>> result = data.collect();
expectedResult = "1,2,24\n" +
"1,3,26\n" +
"2,3,36\n" +
"3,4,68\n" +
"3,5,70\n" +
"4,5,80\n" +
"5,1,102\n";
compareResultAsTuples(result, expectedResult);
}