org.openjdk.jmh.annotations.OperationsPerInvocation#org.apache.flink.streaming.api.datastream.DataStreamSource源码实例Demo

下面列出了org.openjdk.jmh.annotations.OperationsPerInvocation#org.apache.flink.streaming.api.datastream.DataStreamSource 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: flink-learning   文件: Main.java
public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        ParameterTool parameterTool = ExecutionEnvUtil.PARAMETER_TOOL;

        //下面这些写死的参数可以放在配置文件中,然后通过 parameterTool 获取
        final RMQConnectionConfig connectionConfig = new RMQConnectionConfig
                .Builder().setHost("localhost").setVirtualHost("/")
                .setPort(5672).setUserName("admin").setPassword("admin")
                .build();

        DataStreamSource<String> zhisheng = env.addSource(new RMQSource<>(connectionConfig,
                "zhisheng",
                true,
                new SimpleStringSchema()))
                .setParallelism(1);
        zhisheng.print();

        //如果想保证 exactly-once 或 at-least-once 需要把 checkpoint 开启
//        env.enableCheckpointing(10000);
        env.execute("flink learning connectors rabbitmq");
    }
 
源代码2 项目: flink   文件: StreamExecutionEnvironment.java
/**
 * Creates a new data stream that contains the given elements. The elements must all be of the
 * same type, for example, all of the {@link String} or {@link Integer}.
 *
 * <p>The framework will try and determine the exact type from the elements. In case of generic
 * elements, it may be necessary to manually supply the type information via
 * {@link #fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)}.
 *
 * <p>Note that this operation will result in a non-parallel data stream source, i.e. a data
 * stream source with a degree of parallelism one.
 *
 * @param data
 * 		The array of elements to create the data stream from.
 * @param <OUT>
 * 		The type of the returned data stream
 * @return The data stream representing the given array of elements
 */
@SafeVarargs
public final <OUT> DataStreamSource<OUT> fromElements(OUT... data) {
	if (data.length == 0) {
		throw new IllegalArgumentException("fromElements needs at least one element as argument");
	}

	TypeInformation<OUT> typeInfo;
	try {
		typeInfo = TypeExtractor.getForObject(data[0]);
	}
	catch (Exception e) {
		throw new RuntimeException("Could not create TypeInformation for type " + data[0].getClass().getName()
				+ "; please specify the TypeInformation manually via "
				+ "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)", e);
	}
	return fromCollection(Arrays.asList(data), typeInfo);
}
 
源代码3 项目: Flink-CEPplus   文件: ElasticsearchSinkTestBase.java
/**
 * Tests whether the Elasticsearch sink fails when there is no cluster to connect to.
 */
public void runInvalidElasticsearchClusterTest() throws Exception {
	final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

	DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());

	Map<String, String> userConfig = new HashMap<>();
	userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
	userConfig.put("cluster.name", "invalid-cluster-name");

	source.addSink(createElasticsearchSinkForNode(
			1,
			"invalid-cluster-name",
			new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"),
			"123.123.123.123")); // incorrect ip address

	try {
		env.execute("Elasticsearch Sink Test");
	} catch (JobExecutionException expectedException) {
		// test passes
		return;
	}

	fail();
}
 
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(collection);

    CassandraSink.addSink(source)
            .setQuery(INSERT)
            .setClusterBuilder(new ClusterBuilder() {
                @Override
                protected Cluster buildCluster(Cluster.Builder builder) {
                    return builder.addContactPoint("127.0.0.1").build();
                }
            })
            .build();

    env.execute("WriteTupleIntoCassandra");
}
 
/**
 * Creates a new data set that contains the given elements. The framework will determine the type according to the
 * based type user supplied. The elements should be the same or be the subclass to the based type.
 * The sequence of elements must not be empty.
 * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
 * degree of parallelism one.
 *
 * @param type
 * 		The based class type in the collection.
 * @param data
 * 		The array of elements to create the data stream from.
 * @param <OUT>
 * 		The type of the returned data stream
 * @return The data stream representing the given array of elements
 */
@SafeVarargs
public final <OUT> DataStreamSource<OUT> fromElements(Class<OUT> type, OUT... data) {
	if (data.length == 0) {
		throw new IllegalArgumentException("fromElements needs at least one element as argument");
	}

	TypeInformation<OUT> typeInfo;
	try {
		typeInfo = TypeExtractor.getForClass(type);
	}
	catch (Exception e) {
		throw new RuntimeException("Could not create TypeInformation for type " + type.getName()
				+ "; please specify the TypeInformation manually via "
				+ "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)", e);
	}
	return fromCollection(Arrays.asList(data), typeInfo);
}
 
/**
 * Creates a data stream from the given non-empty collection.
 *
 * <p>Note that this operation will result in a non-parallel data stream source,
 * i.e., a data stream source with parallelism one.
 *
 * @param data
 * 		The collection of elements to create the data stream from
 * @param typeInfo
 * 		The TypeInformation for the produced data stream
 * @param <OUT>
 * 		The type of the returned data stream
 * @return The data stream representing the given collection
 */
public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT> typeInfo) {
	Preconditions.checkNotNull(data, "Collection must not be null");

	// must not have null elements and mixed elements
	FromElementsFunction.checkCollection(data, typeInfo.getTypeClass());

	SourceFunction<OUT> function;
	try {
		function = new FromElementsFunction<>(typeInfo.createSerializer(getConfig()), data);
	}
	catch (IOException e) {
		throw new RuntimeException(e.getMessage(), e);
	}
	return addSource(function, "Collection Source", typeInfo).setParallelism(1);
}
 
源代码7 项目: flink-benchmarks   文件: InputBenchmark.java
@Benchmark
public void mapRebalanceMapSink(FlinkEnvironmentContext context) throws Exception {

	StreamExecutionEnvironment env = context.env;
	env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
	env.setParallelism(1);

	DataStreamSource<Long> source = env.addSource(new LongSource(RECORDS_PER_INVOCATION));
	source
		.map(new MultiplyByTwo())
		.rebalance()
		.map((Long in) -> in)
		.addSink(new DiscardingSink<>());

	env.execute();
}
 
源代码8 项目: flink   文件: StreamTableEnvironmentImplTest.java
@Test
public void testRetractStreamDoesNotOverwriteTableConfig() {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	DataStreamSource<Integer> elements = env.fromElements(1, 2, 3);

	StreamTableEnvironmentImpl tEnv = getStreamTableEnvironment(env, elements);

	Time minRetention = Time.minutes(1);
	Time maxRetention = Time.minutes(10);
	tEnv.getConfig().setIdleStateRetentionTime(minRetention, maxRetention);
	Table table = tEnv.fromDataStream(elements);
	tEnv.toRetractStream(table, Row.class);

	assertThat(
		tEnv.getConfig().getMinIdleStateRetentionTime(),
		equalTo(minRetention.toMilliseconds()));
	assertThat(
		tEnv.getConfig().getMaxIdleStateRetentionTime(),
		equalTo(maxRetention.toMilliseconds()));
}
 
源代码9 项目: bahir-flink   文件: ActiveMQConnectorITCase.java
private void createProducerTopology(StreamExecutionEnvironment env, AMQSinkConfig<String> config) {
    DataStreamSource<String> stream = env.addSource(new SourceFunction<String>() {
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            for (int i = 0; i < MESSAGES_NUM; i++) {
                ctx.collect("amq-" + i);
            }
        }

        @Override
        public void cancel() {}
    });


    AMQSink<String> sink = new AMQSink<>(config);
    stream.addSink(sink);
}
 
源代码10 项目: flink-simple-tutorial   文件: SlidingWindow.java
public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 构建输入数据
        List<Tuple2<String, Long>> data = new ArrayList<>();
        Tuple2<String, Long> a = new Tuple2<>("first event", 1L);
        Tuple2<String, Long> b = new Tuple2<>("second event", 2L);
        data.add(a);
        data.add(b);
        DataStreamSource<Tuple2<String, Long>> input = env.fromCollection(data);

        // 使用 ProcessTime 滑动窗口, 10s 为一个窗口长度, 每 1s 滑动一次
        input.keyBy(x -> x.f1)
                .timeWindow(Time.seconds(10), Time.seconds(1))
                .reduce(new MyWindowFunction());

        env.execute();
    }
 
源代码11 项目: flink-learning   文件: Main.java
public static void main(String[] args) throws Exception {
    final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
    StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
    Properties props = KafkaConfigUtil.buildKafkaProps(parameterTool);

    DataStreamSource<String> data = env.addSource(new FlinkKafkaConsumer011<>(
            parameterTool.get(METRICS_TOPIC),   //这个 kafka topic 需要和上面的工具类的 topic 一致
            new SimpleStringSchema(),
            props));

    data.map(new MapFunction<String, Object>() {
        @Override
        public Object map(String string) throws Exception {
            writeEventToHbase(string, parameterTool);
            return string;
        }
    }).print();

    env.execute("flink learning connectors hbase");
}
 
源代码12 项目: flink-spector   文件: DataStreamTestEnvironment.java
/**
 * Creates a data stream from the given non-empty collection. The type of the data stream is that of the elements in the collection.
 * <p>
 * <p>The framework will try and determine the exact type from the collection elements. In case of generic
 * elements, it may be necessary to manually supply the type information via {@link #fromCollection(java.util.Collection,
 * org.apache.flink.api.common.typeinfo.TypeInformation)}.</p>
 * <p>
 * <p>Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
 * parallelism one.</p>
 *
 * @param <OUT>        The generic type of the returned data stream.
 * @param data         The collection of elements to startWith the data stream from.
 * @param flushWindows Specifies whether open windows should be flushed on termination.
 * @return The data stream representing the given collection
 */
public <OUT> DataStreamSource<OUT> fromCollectionWithTimestamp(Collection<StreamRecord<OUT>> data, Boolean flushWindows) {
	Preconditions.checkNotNull(data, "Collection must not be null");
	if(data.isEmpty()) {
		throw new IllegalArgumentException("Collection must not be empty");
	}

	StreamRecord<OUT> first = data.iterator().next();
	if(first == null) {
		throw new IllegalArgumentException("Collection must not contain null elements");
	}

	TypeInformation<OUT> typeInfo;
	try {
		typeInfo = TypeExtractor.getForObject(first.getValue());
	}
	catch(Exception e) {
		throw new RuntimeException("Could not startWith TypeInformation for type " + first.getClass()
				+ "; please specify the TypeInformation manually via "
				+ "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)");
	}
	return fromCollectionWithTimestamp(data, typeInfo, flushWindows);
}
 
源代码13 项目: flink   文件: StreamExecutionEnvironment.java
/**
 * Creates a data stream from the given non-empty collection. The type of the data stream is that of the
 * elements in the collection.
 *
 * <p>The framework will try and determine the exact type from the collection elements. In case of generic
 * elements, it may be necessary to manually supply the type information via
 * {@link #fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)}.
 *
 * <p>Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with
 * parallelism one.
 *
 * @param data
 * 		The collection of elements to create the data stream from.
 * @param <OUT>
 *     The generic type of the returned data stream.
 * @return
 *     The data stream representing the given collection
 */
public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
	Preconditions.checkNotNull(data, "Collection must not be null");
	if (data.isEmpty()) {
		throw new IllegalArgumentException("Collection must not be empty");
	}

	OUT first = data.iterator().next();
	if (first == null) {
		throw new IllegalArgumentException("Collection must not contain null elements");
	}

	TypeInformation<OUT> typeInfo;
	try {
		typeInfo = TypeExtractor.getForObject(first);
	}
	catch (Exception e) {
		throw new RuntimeException("Could not create TypeInformation for type " + first.getClass()
				+ "; please specify the TypeInformation manually via "
				+ "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)", e);
	}
	return fromCollection(data, typeInfo);
}
 
源代码14 项目: flink-learning   文件: Sink2ES6Main.java
public static void main(String[] args) throws Exception {
    final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
    StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
    DataStreamSource<MetricEvent> data = KafkaConfigUtil.buildSource(env);

    List<HttpHost> esAddresses = ESSinkUtil.getEsAddresses(parameterTool.get(ELASTICSEARCH_HOSTS));
    int bulkSize = parameterTool.getInt(ELASTICSEARCH_BULK_FLUSH_MAX_ACTIONS, 40);
    int sinkParallelism = parameterTool.getInt(STREAM_SINK_PARALLELISM, 5);

    log.info("-----esAddresses = {}, parameterTool = {}, ", esAddresses, parameterTool);

    ESSinkUtil.addSink(esAddresses, bulkSize, sinkParallelism, data,
            (MetricEvent metric, RuntimeContext runtimeContext, RequestIndexer requestIndexer) -> {
                requestIndexer.add(Requests.indexRequest()
                        .index(ZHISHENG + "_" + metric.getName())
                        .type(ZHISHENG)
                        .source(GsonUtil.toJSONBytes(metric), XContentType.JSON));
            },
            parameterTool);
    env.execute("flink learning connectors es6");
}
 
源代码15 项目: flink-learning   文件: KuduSinkTest.java
public static void main(String[] args) throws Exception {
    final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
    StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
    DataStreamSource<MetricEvent> data = KafkaConfigUtil.buildSource(env);

    List<KuduColumnInfo> kuduColumnInfos = new ArrayList<>();
    KuduColumnInfo columnInfo1 = KuduColumnInfo.Builder.createLong("zhisheng").rangeKey(true).build();
    KuduColumnInfo columnInfo2 = KuduColumnInfo.Builder.createFloat("zhisheng").rangeKey(true).build();
    KuduColumnInfo columnInfo3 = KuduColumnInfo.Builder.createString("zhisheng").rangeKey(true).build();
    kuduColumnInfos.add(columnInfo1);
    kuduColumnInfos.add(columnInfo2);
    kuduColumnInfos.add(columnInfo3);

    KuduTableInfo zhisheng = new KuduTableInfo.Builder("zhisheng")
            .replicas(1)
            .createIfNotExist(true)
            .columns(kuduColumnInfos)
            .build();

    data.addSink(new KuduSink<>("127.0.0.1", zhisheng, new PojoSerDe<>(MetricEvent.class)).withInsertWriteMode());
}
 
源代码16 项目: flink-learning   文件: CassandraTupleSinkExample.java
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(collection);

    CassandraSink.addSink(source)
            .setQuery(INSERT)
            .setClusterBuilder(new ClusterBuilder() {
                @Override
                protected Cluster buildCluster(Cluster.Builder builder) {
                    return builder.addContactPoint("127.0.0.1").build();
                }
            })
            .build();

    env.execute("WriteTupleIntoCassandra");
}
 
源代码17 项目: flink-learning   文件: CassandraPojoSinkExample.java
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStreamSource<Message> source = env.fromCollection(messages);

    CassandraSink.addSink(source)
            .setClusterBuilder(new ClusterBuilder() {
                @Override
                protected Cluster buildCluster(Cluster.Builder builder) {
                    return builder.addContactPoint("127.0.0.1").build();
                }
            })
            .setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
            .build();

    env.execute("Cassandra Sink example");
}
 
源代码18 项目: flink-learning   文件: Sink2ES6Main.java
public static void main(String[] args) throws Exception {
    final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
    StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
    DataStreamSource<MetricEvent> data = KafkaConfigUtil.buildSource(env);

    List<HttpHost> esAddresses = ESSinkUtil.getEsAddresses(parameterTool.get(ELASTICSEARCH_HOSTS));
    int bulkSize = parameterTool.getInt(ELASTICSEARCH_BULK_FLUSH_MAX_ACTIONS, 40);
    int sinkParallelism = parameterTool.getInt(STREAM_SINK_PARALLELISM, 5);

    log.info("-----esAddresses = {}, parameterTool = {}, ", esAddresses, parameterTool);

    ESSinkUtil.addSink(esAddresses, bulkSize, sinkParallelism, data,
            (MetricEvent metric, RuntimeContext runtimeContext, RequestIndexer requestIndexer) -> {
                requestIndexer.add(Requests.indexRequest()
                        .index(ZHISHENG + "_" + metric.getName())
                        .type(ZHISHENG)
                        .source(GsonUtil.toJSONBytes(metric), XContentType.JSON));
            },
            parameterTool);
    env.execute("flink learning connectors es6");
}
 
源代码19 项目: flink   文件: StreamTableEnvironmentImplTest.java
private StreamTableEnvironmentImpl getStreamTableEnvironment(
		StreamExecutionEnvironment env,
		DataStreamSource<Integer> elements) {
	TableConfig config = new TableConfig();
	CatalogManager catalogManager = CatalogManagerMocks.createEmptyCatalogManager();
	ModuleManager moduleManager = new ModuleManager();
	return new StreamTableEnvironmentImpl(
		catalogManager,
		moduleManager,
		new FunctionCatalog(config, catalogManager, moduleManager),
		config,
		env,
		new TestPlanner(elements.getTransformation()),
		new ExecutorMock(),
		true
	);
}
 
源代码20 项目: flink-learning   文件: Main.java
public static void main(String[] args) throws Exception{
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("zookeeper.connect", "localhost:2181");
    props.put("group.id", "metric-group");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  //key 反序列化
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("auto.offset.reset", "latest"); //value 反序列化

    DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer011<>(
            "metric",  //kafka topic
            new SimpleStringSchema(),  // String 序列化
            props)).setParallelism(1);

    dataStreamSource.print(); //把从 kafka 读取到的数据打印在控制台

    env.execute("Flink add data source");
}
 
private DataStream<Long> createAsyncOperator(DataStreamSource<Long> source) {
	switch (outputMode) {
		case ORDERED:
			return AsyncDataStream.orderedWait(
					source,
					new BenchmarkAsyncFunctionExecutor(),
					0,
					TimeUnit.MILLISECONDS);
		case UNORDERED:
			return AsyncDataStream.unorderedWait(
					source,
					new BenchmarkAsyncFunctionExecutor(),
					0,
					TimeUnit.MILLISECONDS);
		default:
			throw new UnsupportedOperationException("Unknown mode");
	}
}
 
public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 构建输入数据
        List<Tuple2<Long, Long>> data = new ArrayList<>();
        Tuple2<Long, Long> a = new Tuple2<>(1L, 1L);
        Tuple2<Long, Long> b = new Tuple2<>(3L, 1L);
        data.add(a);
        data.add(b);
        DataStreamSource<Tuple2<Long, Long>> input = env.fromCollection(data);


        input.keyBy(x -> x.f1)
                .timeWindow(Time.seconds(10), Time.seconds(1))
                // 第一个Function为 ReduceFunction, 取窗口的最小值
                .reduce((r1, r2) -> {
                    return r1.f0 < r2.f0 ? r1 : r2;
                    // 第二个Function为 ProcessWindowFunction, 获取窗口的时间信息
                }, new ProcessWindowFunction<Tuple2<Long, Long>, String, Long, TimeWindow>() {
                    @Override
                    public void process(Long aLong, Context context, Iterable<Tuple2<Long, Long>> elements, Collector<String> out) throws Exception {
                        out.collect("window: " + context.window());
                    }
                }).print();

        env.execute();
    }
 
源代码23 项目: flink-learning   文件: Main.java
public static void main(String[] args) throws Exception {
    final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
    StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
    DataStreamSource<MetricEvent> data = KafkaConfigUtil.buildSource(env);

    data.addSink(new FlinkKafkaProducer011<>(
            parameterTool.get("kafka.sink.brokers"),
            parameterTool.get("kafka.sink.topic"),
            new MetricSchema()
    )).name("flink-connectors-kafka")
            .setParallelism(parameterTool.getInt("stream.sink.parallelism"));

    env.execute("flink learning connectors kafka");
}
 
源代码24 项目: bahir-flink   文件: RedisSinkITCase.java
@Test
public void testRedisHashDataTypeWithTTL() throws Exception {
    DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionHash());
    RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
            new RedisAdditionalTTLMapper(RedisCommand.HSET));

    source.addSink(redisSink);
    env.execute("Test Redis Hash Data Type");

    assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
    assertEquals(REDIS_TTL_IN_SECS, jedis.ttl(REDIS_ADDITIONAL_KEY));

    jedis.del(REDIS_ADDITIONAL_KEY);
}
 
源代码25 项目: flink-learning   文件: Main.java
public static void main(String[] args) throws Exception {
    final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
    StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
    DataStreamSource<MetricEvent> data = KafkaConfigUtil.buildSource(env);


    env.execute("flink learning connectors hdfs");
}
 
源代码26 项目: 163-bigdate-note   文件: StreamingWCJavaApp.java
public static void main(String[] args) throws Exception {
    String input = "file:///D:/imooc/新一代大数据计算引擎 Flink从入门到实战-v/input";
    
    //获取执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    //读取数据
    DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);

    //执行转换操作
    dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
            String[] tokens = value.toLowerCase().split(",");
            for (String token : tokens) {
                if (token.length() > 0) {
                    collector.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }).keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1)
    .print()
    .setParallelism(1);

    env.execute("StreamingWCJavaApp");
}
 
源代码27 项目: flink-benchmarks   文件: InputBenchmark.java
@Benchmark
public void mapSink(FlinkEnvironmentContext context) throws Exception {

	StreamExecutionEnvironment env = context.env;
	env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
	env.setParallelism(1);

	DataStreamSource<Long> source = env.addSource(new LongSource(RECORDS_PER_INVOCATION));
	source
		.map(new MultiplyByTwo())
		.addSink(new DiscardingSink<>());

	env.execute();
}
 
源代码28 项目: 163-bigdate-note   文件: StreamingWCJava02App.java
public static void main(String[] args) throws Exception {
    //获取参数
    int port = 0;
    try {
        ParameterTool tool = ParameterTool.fromArgs(args);
        port = tool.getInt("port");
    } catch (Exception e) {
        System.err.println("Port undefined, use 9999.");
        port = 9999;
    }
    
    //获取执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    //读取数据
    DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);

    //执行转换操作
    dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
            String[] tokens = value.toLowerCase().split(",");
            for (String token : tokens) {
                if (token.length() > 0) {
                    collector.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }).keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1)
    .print()
    .setParallelism(1);

    env.execute("StreamingWCJavaApp");
}
 
源代码29 项目: flink   文件: KafkaShortRetentionTestBase.java
/**
 * Ensure that the consumer is properly failing if "auto.offset.reset" is set to "none".
 */
public void runFailOnAutoOffsetResetNone() throws Exception {
	final String topic = "auto-offset-reset-none-test";
	final int parallelism = 1;

	kafkaServer.createTestTopic(topic, parallelism, 1);

	final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setParallelism(parallelism);
	env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
	env.getConfig().disableSysoutLogging();

	// ----------- add consumer ----------

	Properties customProps = new Properties();
	customProps.putAll(standardProps);
	customProps.putAll(secureProps);
	customProps.setProperty("auto.offset.reset", "none"); // test that "none" leads to an exception
	FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), customProps);

	DataStreamSource<String> consuming = env.addSource(source);
	consuming.addSink(new DiscardingSink<String>());

	try {
		env.execute("Test auto offset reset none");
	} catch (Throwable e) {
		// check if correct exception has been thrown
		if (!e.getCause().getCause().getMessage().contains("Unable to find previous offset")  // kafka 0.8
			&& !e.getCause().getCause().getMessage().contains("Undefined offset with no reset policy for partition") // kafka 0.9
				) {
			throw e;
		}
	}

	kafkaServer.deleteTestTopic(topic);
}
 
源代码30 项目: flink-learning   文件: Main2.java
public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStreamSource<String> source = env.socketTextStream("127.0.0.1", 9000);
    source.addSink(new MySink("6")).setParallelism(5);
    env.execute("xxxx");
}