下面列出了org.openjdk.jmh.annotations.OperationsPerInvocation#org.apache.flink.streaming.api.datastream.DataStreamSource 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameterTool = ExecutionEnvUtil.PARAMETER_TOOL;
//下面这些写死的参数可以放在配置文件中,然后通过 parameterTool 获取
final RMQConnectionConfig connectionConfig = new RMQConnectionConfig
.Builder().setHost("localhost").setVirtualHost("/")
.setPort(5672).setUserName("admin").setPassword("admin")
.build();
DataStreamSource<String> zhisheng = env.addSource(new RMQSource<>(connectionConfig,
"zhisheng",
true,
new SimpleStringSchema()))
.setParallelism(1);
zhisheng.print();
//如果想保证 exactly-once 或 at-least-once 需要把 checkpoint 开启
// env.enableCheckpointing(10000);
env.execute("flink learning connectors rabbitmq");
}
/**
* Creates a new data stream that contains the given elements. The elements must all be of the
* same type, for example, all of the {@link String} or {@link Integer}.
*
* <p>The framework will try and determine the exact type from the elements. In case of generic
* elements, it may be necessary to manually supply the type information via
* {@link #fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)}.
*
* <p>Note that this operation will result in a non-parallel data stream source, i.e. a data
* stream source with a degree of parallelism one.
*
* @param data
* The array of elements to create the data stream from.
* @param <OUT>
* The type of the returned data stream
* @return The data stream representing the given array of elements
*/
@SafeVarargs
public final <OUT> DataStreamSource<OUT> fromElements(OUT... data) {
if (data.length == 0) {
throw new IllegalArgumentException("fromElements needs at least one element as argument");
}
TypeInformation<OUT> typeInfo;
try {
typeInfo = TypeExtractor.getForObject(data[0]);
}
catch (Exception e) {
throw new RuntimeException("Could not create TypeInformation for type " + data[0].getClass().getName()
+ "; please specify the TypeInformation manually via "
+ "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)", e);
}
return fromCollection(Arrays.asList(data), typeInfo);
}
/**
* Tests whether the Elasticsearch sink fails when there is no cluster to connect to.
*/
public void runInvalidElasticsearchClusterTest() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
Map<String, String> userConfig = new HashMap<>();
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
userConfig.put("cluster.name", "invalid-cluster-name");
source.addSink(createElasticsearchSinkForNode(
1,
"invalid-cluster-name",
new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"),
"123.123.123.123")); // incorrect ip address
try {
env.execute("Elasticsearch Sink Test");
} catch (JobExecutionException expectedException) {
// test passes
return;
}
fail();
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(collection);
CassandraSink.addSink(source)
.setQuery(INSERT)
.setClusterBuilder(new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("127.0.0.1").build();
}
})
.build();
env.execute("WriteTupleIntoCassandra");
}
/**
* Creates a new data set that contains the given elements. The framework will determine the type according to the
* based type user supplied. The elements should be the same or be the subclass to the based type.
* The sequence of elements must not be empty.
* Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
* degree of parallelism one.
*
* @param type
* The based class type in the collection.
* @param data
* The array of elements to create the data stream from.
* @param <OUT>
* The type of the returned data stream
* @return The data stream representing the given array of elements
*/
@SafeVarargs
public final <OUT> DataStreamSource<OUT> fromElements(Class<OUT> type, OUT... data) {
if (data.length == 0) {
throw new IllegalArgumentException("fromElements needs at least one element as argument");
}
TypeInformation<OUT> typeInfo;
try {
typeInfo = TypeExtractor.getForClass(type);
}
catch (Exception e) {
throw new RuntimeException("Could not create TypeInformation for type " + type.getName()
+ "; please specify the TypeInformation manually via "
+ "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)", e);
}
return fromCollection(Arrays.asList(data), typeInfo);
}
/**
* Creates a data stream from the given non-empty collection.
*
* <p>Note that this operation will result in a non-parallel data stream source,
* i.e., a data stream source with parallelism one.
*
* @param data
* The collection of elements to create the data stream from
* @param typeInfo
* The TypeInformation for the produced data stream
* @param <OUT>
* The type of the returned data stream
* @return The data stream representing the given collection
*/
public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT> typeInfo) {
Preconditions.checkNotNull(data, "Collection must not be null");
// must not have null elements and mixed elements
FromElementsFunction.checkCollection(data, typeInfo.getTypeClass());
SourceFunction<OUT> function;
try {
function = new FromElementsFunction<>(typeInfo.createSerializer(getConfig()), data);
}
catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
return addSource(function, "Collection Source", typeInfo).setParallelism(1);
}
@Benchmark
public void mapRebalanceMapSink(FlinkEnvironmentContext context) throws Exception {
StreamExecutionEnvironment env = context.env;
env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
env.setParallelism(1);
DataStreamSource<Long> source = env.addSource(new LongSource(RECORDS_PER_INVOCATION));
source
.map(new MultiplyByTwo())
.rebalance()
.map((Long in) -> in)
.addSink(new DiscardingSink<>());
env.execute();
}
@Test
public void testRetractStreamDoesNotOverwriteTableConfig() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> elements = env.fromElements(1, 2, 3);
StreamTableEnvironmentImpl tEnv = getStreamTableEnvironment(env, elements);
Time minRetention = Time.minutes(1);
Time maxRetention = Time.minutes(10);
tEnv.getConfig().setIdleStateRetentionTime(minRetention, maxRetention);
Table table = tEnv.fromDataStream(elements);
tEnv.toRetractStream(table, Row.class);
assertThat(
tEnv.getConfig().getMinIdleStateRetentionTime(),
equalTo(minRetention.toMilliseconds()));
assertThat(
tEnv.getConfig().getMaxIdleStateRetentionTime(),
equalTo(maxRetention.toMilliseconds()));
}
private void createProducerTopology(StreamExecutionEnvironment env, AMQSinkConfig<String> config) {
DataStreamSource<String> stream = env.addSource(new SourceFunction<String>() {
@Override
public void run(SourceContext<String> ctx) throws Exception {
for (int i = 0; i < MESSAGES_NUM; i++) {
ctx.collect("amq-" + i);
}
}
@Override
public void cancel() {}
});
AMQSink<String> sink = new AMQSink<>(config);
stream.addSink(sink);
}
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 构建输入数据
List<Tuple2<String, Long>> data = new ArrayList<>();
Tuple2<String, Long> a = new Tuple2<>("first event", 1L);
Tuple2<String, Long> b = new Tuple2<>("second event", 2L);
data.add(a);
data.add(b);
DataStreamSource<Tuple2<String, Long>> input = env.fromCollection(data);
// 使用 ProcessTime 滑动窗口, 10s 为一个窗口长度, 每 1s 滑动一次
input.keyBy(x -> x.f1)
.timeWindow(Time.seconds(10), Time.seconds(1))
.reduce(new MyWindowFunction());
env.execute();
}
public static void main(String[] args) throws Exception {
final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
Properties props = KafkaConfigUtil.buildKafkaProps(parameterTool);
DataStreamSource<String> data = env.addSource(new FlinkKafkaConsumer011<>(
parameterTool.get(METRICS_TOPIC), //这个 kafka topic 需要和上面的工具类的 topic 一致
new SimpleStringSchema(),
props));
data.map(new MapFunction<String, Object>() {
@Override
public Object map(String string) throws Exception {
writeEventToHbase(string, parameterTool);
return string;
}
}).print();
env.execute("flink learning connectors hbase");
}
/**
* Creates a data stream from the given non-empty collection. The type of the data stream is that of the elements in the collection.
* <p>
* <p>The framework will try and determine the exact type from the collection elements. In case of generic
* elements, it may be necessary to manually supply the type information via {@link #fromCollection(java.util.Collection,
* org.apache.flink.api.common.typeinfo.TypeInformation)}.</p>
* <p>
* <p>Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
* parallelism one.</p>
*
* @param <OUT> The generic type of the returned data stream.
* @param data The collection of elements to startWith the data stream from.
* @param flushWindows Specifies whether open windows should be flushed on termination.
* @return The data stream representing the given collection
*/
public <OUT> DataStreamSource<OUT> fromCollectionWithTimestamp(Collection<StreamRecord<OUT>> data, Boolean flushWindows) {
Preconditions.checkNotNull(data, "Collection must not be null");
if(data.isEmpty()) {
throw new IllegalArgumentException("Collection must not be empty");
}
StreamRecord<OUT> first = data.iterator().next();
if(first == null) {
throw new IllegalArgumentException("Collection must not contain null elements");
}
TypeInformation<OUT> typeInfo;
try {
typeInfo = TypeExtractor.getForObject(first.getValue());
}
catch(Exception e) {
throw new RuntimeException("Could not startWith TypeInformation for type " + first.getClass()
+ "; please specify the TypeInformation manually via "
+ "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)");
}
return fromCollectionWithTimestamp(data, typeInfo, flushWindows);
}
/**
* Creates a data stream from the given non-empty collection. The type of the data stream is that of the
* elements in the collection.
*
* <p>The framework will try and determine the exact type from the collection elements. In case of generic
* elements, it may be necessary to manually supply the type information via
* {@link #fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)}.
*
* <p>Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with
* parallelism one.
*
* @param data
* The collection of elements to create the data stream from.
* @param <OUT>
* The generic type of the returned data stream.
* @return
* The data stream representing the given collection
*/
public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
Preconditions.checkNotNull(data, "Collection must not be null");
if (data.isEmpty()) {
throw new IllegalArgumentException("Collection must not be empty");
}
OUT first = data.iterator().next();
if (first == null) {
throw new IllegalArgumentException("Collection must not contain null elements");
}
TypeInformation<OUT> typeInfo;
try {
typeInfo = TypeExtractor.getForObject(first);
}
catch (Exception e) {
throw new RuntimeException("Could not create TypeInformation for type " + first.getClass()
+ "; please specify the TypeInformation manually via "
+ "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)", e);
}
return fromCollection(data, typeInfo);
}
public static void main(String[] args) throws Exception {
final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
DataStreamSource<MetricEvent> data = KafkaConfigUtil.buildSource(env);
List<HttpHost> esAddresses = ESSinkUtil.getEsAddresses(parameterTool.get(ELASTICSEARCH_HOSTS));
int bulkSize = parameterTool.getInt(ELASTICSEARCH_BULK_FLUSH_MAX_ACTIONS, 40);
int sinkParallelism = parameterTool.getInt(STREAM_SINK_PARALLELISM, 5);
log.info("-----esAddresses = {}, parameterTool = {}, ", esAddresses, parameterTool);
ESSinkUtil.addSink(esAddresses, bulkSize, sinkParallelism, data,
(MetricEvent metric, RuntimeContext runtimeContext, RequestIndexer requestIndexer) -> {
requestIndexer.add(Requests.indexRequest()
.index(ZHISHENG + "_" + metric.getName())
.type(ZHISHENG)
.source(GsonUtil.toJSONBytes(metric), XContentType.JSON));
},
parameterTool);
env.execute("flink learning connectors es6");
}
public static void main(String[] args) throws Exception {
final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
DataStreamSource<MetricEvent> data = KafkaConfigUtil.buildSource(env);
List<KuduColumnInfo> kuduColumnInfos = new ArrayList<>();
KuduColumnInfo columnInfo1 = KuduColumnInfo.Builder.createLong("zhisheng").rangeKey(true).build();
KuduColumnInfo columnInfo2 = KuduColumnInfo.Builder.createFloat("zhisheng").rangeKey(true).build();
KuduColumnInfo columnInfo3 = KuduColumnInfo.Builder.createString("zhisheng").rangeKey(true).build();
kuduColumnInfos.add(columnInfo1);
kuduColumnInfos.add(columnInfo2);
kuduColumnInfos.add(columnInfo3);
KuduTableInfo zhisheng = new KuduTableInfo.Builder("zhisheng")
.replicas(1)
.createIfNotExist(true)
.columns(kuduColumnInfos)
.build();
data.addSink(new KuduSink<>("127.0.0.1", zhisheng, new PojoSerDe<>(MetricEvent.class)).withInsertWriteMode());
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(collection);
CassandraSink.addSink(source)
.setQuery(INSERT)
.setClusterBuilder(new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("127.0.0.1").build();
}
})
.build();
env.execute("WriteTupleIntoCassandra");
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Message> source = env.fromCollection(messages);
CassandraSink.addSink(source)
.setClusterBuilder(new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("127.0.0.1").build();
}
})
.setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
.build();
env.execute("Cassandra Sink example");
}
public static void main(String[] args) throws Exception {
final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
DataStreamSource<MetricEvent> data = KafkaConfigUtil.buildSource(env);
List<HttpHost> esAddresses = ESSinkUtil.getEsAddresses(parameterTool.get(ELASTICSEARCH_HOSTS));
int bulkSize = parameterTool.getInt(ELASTICSEARCH_BULK_FLUSH_MAX_ACTIONS, 40);
int sinkParallelism = parameterTool.getInt(STREAM_SINK_PARALLELISM, 5);
log.info("-----esAddresses = {}, parameterTool = {}, ", esAddresses, parameterTool);
ESSinkUtil.addSink(esAddresses, bulkSize, sinkParallelism, data,
(MetricEvent metric, RuntimeContext runtimeContext, RequestIndexer requestIndexer) -> {
requestIndexer.add(Requests.indexRequest()
.index(ZHISHENG + "_" + metric.getName())
.type(ZHISHENG)
.source(GsonUtil.toJSONBytes(metric), XContentType.JSON));
},
parameterTool);
env.execute("flink learning connectors es6");
}
private StreamTableEnvironmentImpl getStreamTableEnvironment(
StreamExecutionEnvironment env,
DataStreamSource<Integer> elements) {
TableConfig config = new TableConfig();
CatalogManager catalogManager = CatalogManagerMocks.createEmptyCatalogManager();
ModuleManager moduleManager = new ModuleManager();
return new StreamTableEnvironmentImpl(
catalogManager,
moduleManager,
new FunctionCatalog(config, catalogManager, moduleManager),
config,
env,
new TestPlanner(elements.getTransformation()),
new ExecutorMock(),
true
);
}
public static void main(String[] args) throws Exception{
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "metric-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //key 反序列化
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest"); //value 反序列化
DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer011<>(
"metric", //kafka topic
new SimpleStringSchema(), // String 序列化
props)).setParallelism(1);
dataStreamSource.print(); //把从 kafka 读取到的数据打印在控制台
env.execute("Flink add data source");
}
private DataStream<Long> createAsyncOperator(DataStreamSource<Long> source) {
switch (outputMode) {
case ORDERED:
return AsyncDataStream.orderedWait(
source,
new BenchmarkAsyncFunctionExecutor(),
0,
TimeUnit.MILLISECONDS);
case UNORDERED:
return AsyncDataStream.unorderedWait(
source,
new BenchmarkAsyncFunctionExecutor(),
0,
TimeUnit.MILLISECONDS);
default:
throw new UnsupportedOperationException("Unknown mode");
}
}
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 构建输入数据
List<Tuple2<Long, Long>> data = new ArrayList<>();
Tuple2<Long, Long> a = new Tuple2<>(1L, 1L);
Tuple2<Long, Long> b = new Tuple2<>(3L, 1L);
data.add(a);
data.add(b);
DataStreamSource<Tuple2<Long, Long>> input = env.fromCollection(data);
input.keyBy(x -> x.f1)
.timeWindow(Time.seconds(10), Time.seconds(1))
// 第一个Function为 ReduceFunction, 取窗口的最小值
.reduce((r1, r2) -> {
return r1.f0 < r2.f0 ? r1 : r2;
// 第二个Function为 ProcessWindowFunction, 获取窗口的时间信息
}, new ProcessWindowFunction<Tuple2<Long, Long>, String, Long, TimeWindow>() {
@Override
public void process(Long aLong, Context context, Iterable<Tuple2<Long, Long>> elements, Collector<String> out) throws Exception {
out.collect("window: " + context.window());
}
}).print();
env.execute();
}
public static void main(String[] args) throws Exception {
final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
DataStreamSource<MetricEvent> data = KafkaConfigUtil.buildSource(env);
data.addSink(new FlinkKafkaProducer011<>(
parameterTool.get("kafka.sink.brokers"),
parameterTool.get("kafka.sink.topic"),
new MetricSchema()
)).name("flink-connectors-kafka")
.setParallelism(parameterTool.getInt("stream.sink.parallelism"));
env.execute("flink learning connectors kafka");
}
@Test
public void testRedisHashDataTypeWithTTL() throws Exception {
DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionHash());
RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
new RedisAdditionalTTLMapper(RedisCommand.HSET));
source.addSink(redisSink);
env.execute("Test Redis Hash Data Type");
assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
assertEquals(REDIS_TTL_IN_SECS, jedis.ttl(REDIS_ADDITIONAL_KEY));
jedis.del(REDIS_ADDITIONAL_KEY);
}
public static void main(String[] args) throws Exception {
final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
DataStreamSource<MetricEvent> data = KafkaConfigUtil.buildSource(env);
env.execute("flink learning connectors hdfs");
}
public static void main(String[] args) throws Exception {
String input = "file:///D:/imooc/新一代大数据计算引擎 Flink从入门到实战-v/input";
//获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取数据
DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);
//执行转换操作
dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] tokens = value.toLowerCase().split(",");
for (String token : tokens) {
if (token.length() > 0) {
collector.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}).keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.print()
.setParallelism(1);
env.execute("StreamingWCJavaApp");
}
@Benchmark
public void mapSink(FlinkEnvironmentContext context) throws Exception {
StreamExecutionEnvironment env = context.env;
env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
env.setParallelism(1);
DataStreamSource<Long> source = env.addSource(new LongSource(RECORDS_PER_INVOCATION));
source
.map(new MultiplyByTwo())
.addSink(new DiscardingSink<>());
env.execute();
}
public static void main(String[] args) throws Exception {
//获取参数
int port = 0;
try {
ParameterTool tool = ParameterTool.fromArgs(args);
port = tool.getInt("port");
} catch (Exception e) {
System.err.println("Port undefined, use 9999.");
port = 9999;
}
//获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取数据
DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);
//执行转换操作
dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] tokens = value.toLowerCase().split(",");
for (String token : tokens) {
if (token.length() > 0) {
collector.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}).keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.print()
.setParallelism(1);
env.execute("StreamingWCJavaApp");
}
/**
* Ensure that the consumer is properly failing if "auto.offset.reset" is set to "none".
*/
public void runFailOnAutoOffsetResetNone() throws Exception {
final String topic = "auto-offset-reset-none-test";
final int parallelism = 1;
kafkaServer.createTestTopic(topic, parallelism, 1);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
env.getConfig().disableSysoutLogging();
// ----------- add consumer ----------
Properties customProps = new Properties();
customProps.putAll(standardProps);
customProps.putAll(secureProps);
customProps.setProperty("auto.offset.reset", "none"); // test that "none" leads to an exception
FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), customProps);
DataStreamSource<String> consuming = env.addSource(source);
consuming.addSink(new DiscardingSink<String>());
try {
env.execute("Test auto offset reset none");
} catch (Throwable e) {
// check if correct exception has been thrown
if (!e.getCause().getCause().getMessage().contains("Unable to find previous offset") // kafka 0.8
&& !e.getCause().getCause().getMessage().contains("Undefined offset with no reset policy for partition") // kafka 0.9
) {
throw e;
}
}
kafkaServer.deleteTestTopic(topic);
}
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.socketTextStream("127.0.0.1", 9000);
source.addSink(new MySink("6")).setParallelism(5);
env.execute("xxxx");
}