下面列出了org.apache.hadoop.hbase.mapreduce.TableOutputFormat#org.apache.flink.api.java.ExecutionEnvironment 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Source -> Map -> Reduce -> Cross -> Reduce -> Cross -> Reduce ->
* |--------------------------/ /
* |--------------------------------------------/
*
* First cross has SameKeyFirst output contract
*/
@Test
public void testTicket158() {
// construct the plan
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Long> set1 = env.generateSequence(0,1);
set1.map(new IdentityMapper<Long>()).name("Map1")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce1")
.cross(set1).with(new IdentityCrosser<Long>()).withForwardedFieldsFirst("*").name("Cross1")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce2")
.cross(set1).with(new IdentityCrosser<Long>()).name("Cross2")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce3")
.output(new DiscardingOutputFormat<Long>()).name("Sink");
Plan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileNoStats(plan);
JobGraphGenerator jobGen = new JobGraphGenerator();
jobGen.compileJobGraph(oPlan);
}
@Test
public void testSumOfAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo() throws Exception {
/*
* Get the sum of all neighbor values
* for each vertex with id greater than two as well as the same sum multiplied by two.
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithSumOfAllNeighborValues =
graph.groupReduceOnNeighbors(new SumAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.ALL);
List<Tuple2<Long, Long>> result = verticesWithSumOfAllNeighborValues.collect();
expectedResult = "3,12\n" +
"3,24\n" +
"4,8\n" +
"4,16\n" +
"5,8\n" +
"5,16";
compareResultAsTuples(result, expectedResult);
}
@Test
public void testUnionReplacement() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> input1 = env.fromElements("test1");
DataSet<String> input2 = env.fromElements("test2");
DataSet<String> union = input1.union(input2);
union.output(new DiscardingOutputFormat<String>());
union.output(new DiscardingOutputFormat<String>());
Plan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileNoStats(plan);
JobGraphGenerator jobGen = new JobGraphGenerator();
jobGen.compileJobGraph(oPlan);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
/**
* Tests compiler fail for join program with replicated data source behind map and changing parallelism.
*/
@Test(expected = CompilerException.class)
public void checkJoinWithReplicatedSourceInputBehindMapChangingparallelism() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new TupleCsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
.map(new IdMap()).setParallelism(DEFAULT_PARALLELISM+1)
.join(source2).where("*").equalTo("*")
.writeAsText("/some/newpath");
Plan plan = env.createProgramPlan();
// submit the plan to the compiler
OptimizedPlan oPlan = compileNoStats(plan);
}
@Test
public void testRichFilterOnStringTupleField() throws Exception {
/*
* Test filter on String tuple field.
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> ints = CollectionDataSets.getIntegerDataSet(env);
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
filter(new RichFilter1()).withBroadcastSet(ints, "ints");
List<Tuple3<Integer, Long, String>> result = filterDs.collect();
String expected = "1,1,Hi\n" +
"2,2,Hello\n" +
"3,2,Hello world\n" +
"4,3,Hello world, how are you?\n";
compareResultAsTuples(result, expected);
}
public double runSqlForSingleTable(int limit, String sql) throws Throwable {
Stopwatch s = Stopwatch.createStarted();
try (Connection connection = BenchMarkUtil.getDBConnection()) {
String fetchSql = BenchMarkUtil.generateFetchSql("item1", "i_item_sk", limit);
ResultSet resultSet = connection
.createStatement()
.executeQuery(fetchSql);
RowTypeInfo rowTypeInfo = typeOfJdbc(resultSet.getMetaData());
ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
DataSet ds = env.createInput(
JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(BenchMarkUtil.DB_DRIVER)
.setDBUrl(BenchMarkUtil.DB_CONNECTION_URL)
.setQuery(fetchSql)
.setRowTypeInfo(rowTypeInfo)
.finish()
);
ds.collect();
tEnv.registerDataSet("item1", ds);
s.stop();
return s.elapsed(TimeUnit.MICROSECONDS) * 0.001 + sqlQuery(tEnv, sql);
}
}
@Test
public void testCoGroupOnTwoCustomTypeInputsWithKeyExtractors() throws Exception {
/*
* CoGroup on two custom type inputs with key extractors
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
DataSet<CustomType> coGroupDs = ds.coGroup(ds2).where(new KeySelector4()).equalTo(new
KeySelector5()).with(new CustomTypeCoGroup());
List<CustomType> result = coGroupDs.collect();
String expected = "1,0,test\n" +
"2,6,test\n" +
"3,24,test\n" +
"4,60,test\n" +
"5,120,test\n" +
"6,210,test\n";
compareResultAsText(result, expected);
}
@Test
public void testCorrectnessOfDistinctOnAtomicWithSelectAllChar() throws Exception {
/*
* check correctness of distinct on Strings, using Keys.ExpressionKeys.SELECT_ALL_CHAR
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
DataSet<String> reduceDs = ds.union(ds).distinct("*");
List<String> result = reduceDs.collect();
String expected = "I am fine.\n" +
"Luke Skywalker\n" +
"LOL\n" +
"Hello world, how are you?\n" +
"Hi\n" +
"Hello world\n" +
"Hello\n" +
"Random comment\n";
compareResultAsText(result, expected);
}
@Test
public void testPojoSortingNestedParallelism1() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<CollectionDataSets.POJO> ds = CollectionDataSets.getMixedPojoDataSet(env);
ds.writeAsText(resultPath)
.sortLocalOutput("nestedTupleWithCustom.f0", Order.ASCENDING)
.sortLocalOutput("nestedTupleWithCustom.f1.myInt", Order.DESCENDING)
.sortLocalOutput("nestedPojo.longNumber", Order.ASCENDING)
.setParallelism(1);
env.execute();
String expected =
"2 First_ (10,105,1000,One) 10200\n" +
"1 First (10,100,1000,One) 10100\n" +
"4 First_ (11,106,1000,One) 10300\n" +
"5 First (11,102,2000,One) 10100\n" +
"3 First (11,102,3000,One) 10200\n" +
"6 Second_ (20,200,2000,Two) 10100\n" +
"8 Third_ (30,300,1000,Three) 10100\n" +
"7 Third (31,301,2000,Three) 10200\n";
compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
}
@Test
public void testTableInputFormat() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
DataSet<Tuple1<Integer>> result = env
.createInput(new InputFormatForTestTable())
.reduce(new ReduceFunction<Tuple1<Integer>>(){
@Override
public Tuple1<Integer> reduce(Tuple1<Integer> v1, Tuple1<Integer> v2) throws Exception {
return Tuple1.of(v1.f0 + v2.f0);
}
});
List<Tuple1<Integer>> resultSet = result.collect();
assertEquals(1, resultSet.size());
assertEquals(360, (int) resultSet.get(0).f0);
}
@Test
public void testFlatMapWithClassTypeHint() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
DataSet<Integer> identityMapDs = ds
.flatMap(new FlatMapper<Tuple3<Integer, Long, String>, Integer>())
.returns(Integer.class);
List<Integer> result = identityMapDs.collect();
String expectedResult = "2\n" +
"3\n" +
"1\n";
compareResultAsText(result, expectedResult);
}
@Test
public void testJoinWithTiny() throws Exception {
/*
* Join with Tiny
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
DataSet<Tuple2<String, String>> joinDs =
ds1.joinWithTiny(ds2)
.where(1)
.equalTo(1)
.with(new T3T5FlatJoin());
List<Tuple2<String, String>> result = joinDs.collect();
String expected = "Hi,Hallo\n" +
"Hello,Hallo Welt\n" +
"Hello world,Hallo Welt\n";
compareResultAsTuples(result, expected);
}
public static void main(String[] args) throws Exception {
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
String filePath = "file:\\D:\\imooc\\新一代大数据计算引擎 Flink从入门到实战-v\\input\\hello.txt";
//1. 注册一个本地文件
environment.registerCachedFile(filePath, "java-cf");
DataSource<String> data = environment.fromElements("hadoop", "spark", "flink", "pyspark", "storm");
data.map(new RichMapFunction<String, String>() {
List<String> list = new ArrayList<>();
@Override
public void open(Configuration parameters) throws Exception {
File file = getRuntimeContext().getDistributedCache().getFile("java-cf");
List<String> lines = FileUtils.readLines(file);
for (String line : lines) {
System.out.println("line: " + line);
}
}
@Override
public String map(String value) throws Exception {
return value;
}
}).print();
}
@Test
public void testInDegreesWithNoInEdge() throws Exception {
/*
* Test inDegrees() no ingoing edge
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env);
DataSet<Tuple2<Long, LongValue>> data = graph.inDegrees();
List<Tuple2<Long, LongValue>> result = data.collect();
expectedResult = "1,0\n" +
"2,1\n" +
"3,1\n" +
"4,1\n" +
"5,3\n";
compareResultAsTuples(result, expectedResult);
}
@Test
public void testReduce() {
// construct the plan
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Long> set1 = env.generateSequence(0,1);
set1.reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce1")
.output(new DiscardingOutputFormat<Long>()).name("Sink");
Plan plan = env.createProgramPlan();
try {
OptimizedPlan oPlan = compileNoStats(plan);
JobGraphGenerator jobGen = new JobGraphGenerator();
jobGen.compileJobGraph(oPlan);
} catch(CompilerException ce) {
ce.printStackTrace();
fail("The pact compiler is unable to compile this plan correctly");
}
}
@Test
public void testAllInNeighborsWithValueGreaterThanTwo() throws Exception {
/*
* Get the all the in-neighbors for each vertex that have a value greater than two.
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithAllInNeighbors =
graph.groupReduceOnEdges(new SelectInNeighborsValueGreaterThanTwo(), EdgeDirection.IN);
List<Tuple2<Long, Long>> result = verticesWithAllInNeighbors.collect();
expectedResult = "3,1\n" +
"3,2\n" +
"4,3\n" +
"5,3\n" +
"5,4";
compareResultAsTuples(result, expectedResult);
}
@Override
protected void testProgram() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.readTextFile(textPath);
DataSet<WC> counts = text
.flatMap(new Tokenizer())
.groupBy("complex.someTest")
.reduce(new ReduceFunction<WC>() {
private static final long serialVersionUID = 1L;
public WC reduce(WC value1, WC value2) {
return new WC(value1.complex.someTest, value1.count + value2.count);
}
});
counts.writeAsText(resultPath);
env.execute("WordCount with custom data types example");
}
@Before
public void setUp() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.generateSequence(1, 1000).output(new DiscardingOutputFormat<Long>());
Plan plan = env.createProgramPlan();
JobWithJars jobWithJars = new JobWithJars(plan, Collections.<URL>emptyList(), Collections.<URL>emptyList());
program = mock(PackagedProgram.class);
when(program.getPlanWithJars()).thenReturn(jobWithJars);
final int freePort = NetUtils.getAvailablePort();
config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(JobManagerOptions.PORT, freePort);
config.setString(AkkaOptions.ASK_TIMEOUT, AkkaOptions.ASK_TIMEOUT.defaultValue());
}
@Test
public void testUnaryFunctionInPlaceForwardedAnnotation() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, String, Integer>> input = env.fromElements(new Tuple3<Long, String, Integer>(3L, "test", 42));
input.map(new IndividualForwardedMapper<Long, String, Integer>()).output(new DiscardingOutputFormat<Tuple3<Long, String, Integer>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
FieldSet fw2 = semantics.getForwardingTargetFields(0, 2);
assertNotNull(fw1);
assertNotNull(fw2);
assertTrue(fw1.contains(0));
assertTrue(fw2.contains(2));
}
private void modifySavepoint(String savepointPath, String modifyPath) throws Exception {
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> data = bEnv.fromElements(1, 2, 3);
BootstrapTransformation<Integer> transformation = OperatorTransformation
.bootstrapWith(data)
.transform(new ModifyProcessFunction());
Savepoint
.load(bEnv, savepointPath, backend)
.removeOperator(CURRENCY_UID)
.withOperator(MODIFY_UID, transformation)
.write(modifyPath);
bEnv.execute("Modifying");
}
@Test
public void testZipWithUniqueId() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
long expectedSize = 100L;
DataSet<Long> numbers = env.generateSequence(1L, expectedSize);
DataSet<Long> ids = DataSetUtils.zipWithUniqueId(numbers).map(new MapFunction<Tuple2<Long, Long>, Long>() {
@Override
public Long map(Tuple2<Long, Long> value) throws Exception {
return value.f0;
}
});
Set<Long> result = new HashSet<>(ids.collect());
Assert.assertEquals(expectedSize, result.size());
}
@Test
public void testCorrectnessOfCoGroupIfUDFReturnsRightInputObjects() throws Exception {
/*
* check correctness of cogroup if UDF returns right input objects
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
DataSet<Tuple5<Integer, Long, Integer, String, Long>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5ReturnRight());
List<Tuple5<Integer, Long, Integer, String, Long>> result = coGroupDs.collect();
String expected = "1,1,0,Hallo,1\n" +
"2,2,1,Hallo Welt,2\n" +
"2,3,2,Hallo Welt wie,1\n" +
"3,4,3,Hallo Welt wie gehts?,2\n" +
"3,5,4,ABC,2\n" +
"3,6,5,BCD,3\n";
compareResultAsTuples(result, expected);
}
@Test
public void testGetDegrees() throws Exception {
/*
* Test getDegrees()
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, LongValue>> data = graph.getDegrees();
List<Tuple2<Long, LongValue>> result = data.collect();
expectedResult = "1,3\n" +
"2,2\n" +
"3,4\n" +
"4,2\n" +
"5,3\n";
compareResultAsTuples(result, expectedResult);
}
@Test
public void testWithStringValue() throws Exception {
/*
* Test mapEdges() and change the value type to String
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Edge<Long, String>> mappedEdges = graph.mapEdges(new ToStringMapper()).getEdges();
List<Edge<Long, String>> result = mappedEdges.collect();
expectedResult = "1,2,string(12)\n" +
"1,3,string(13)\n" +
"2,3,string(23)\n" +
"3,4,string(34)\n" +
"3,5,string(35)\n" +
"4,5,string(45)\n" +
"5,1,string(51)\n";
compareResultAsTuples(result, expectedResult);
}
@Test
public void testCrossProjection24() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
// should work
try {
ds1.cross(ds2)
.projectFirst(0, 2)
.projectSecond(1, 4)
.projectFirst(1);
} catch (Exception e) {
Assert.fail();
}
}
@Test
public void testFromTuple2() throws Exception {
/*
* Test graph creation with fromTuple2DataSet
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Long, Long>> edges = TestGraphUtils.getLongLongTuple2Data(env);
Graph<Long, NullValue, NullValue> graph = Graph.fromTuple2DataSet(edges, env);
List<Vertex<Long, NullValue>> result = graph.getVertices().collect();
expectedResult = "1,(null)\n" +
"2,(null)\n" +
"3,(null)\n" +
"4,(null)\n" +
"6,(null)\n" +
"10,(null)\n" +
"20,(null)\n" +
"30,(null)\n" +
"40,(null)\n" +
"60,(null)\n";
compareResultAsTuples(result, expectedResult);
}
@Test
public void testSumOfOutNeighborsNoValue() throws Exception {
/*
* Get the sum of out-neighbor values
* for each vertex
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
graph.reduceOnNeighbors(new SumNeighbors(), EdgeDirection.OUT);
List<Tuple2<Long, Long>> result = verticesWithSumOfOutNeighborValues.collect();
expectedResult = "1,5\n" +
"2,3\n" +
"3,9\n" +
"4,5\n" +
"5,1\n";
compareResultAsTuples(result, expectedResult);
}
@Test
public void testSelectingMultipleFieldsUsingExpressionLanguage() throws Exception {
/*
* selecting multiple fields using expression language
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> joinDs =
ds1.join(ds2).where("nestedPojo.longNumber", "number", "str").equalTo("f6", "f0", "f1");
env.setParallelism(1);
List<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> result = joinDs.collect();
String expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
"2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
"3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
compareResultAsTuples(result, expected);
}
@Test
public void testFunctionForwardedAnnotationPrecedence() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, String, Integer>> input = env.fromElements(Tuple3.of(3L, "test", 42));
input
.map(new WildcardForwardedMapperWithForwardAnnotation<Tuple3<Long, String, Integer>>())
.output(new DiscardingOutputFormat<Tuple3<Long, String, Integer>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
FieldSet fw2 = semantics.getForwardingTargetFields(0, 1);
FieldSet fw3 = semantics.getForwardingTargetFields(0, 2);
assertNotNull(fw1);
assertNotNull(fw2);
assertNotNull(fw3);
assertTrue(fw1.contains(0));
assertFalse(fw2.contains(1));
assertFalse(fw3.contains(2));
}
@Test
public void testCoGroupKeyMixing2() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
// should work
try {
ds1.coGroup(ds2)
.where(3)
.equalTo(
new KeySelector<CustomType, Long>() {
@Override
public Long getKey(CustomType value) {
return value.myLong;
}
}
);
} catch (Exception e) {
Assert.fail();
}
}