下面列出了com.codahale.metrics.SlidingWindowReservoir#org.apache.flink.streaming.api.functions.source.SourceFunction 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void exampleUsage() {
JsonNode ingressDefinition =
loadAsJsonFromClassResource(
getClass().getClassLoader(), "routable-protobuf-kafka-ingress.yaml");
JsonIngressSpec<?> spec =
new JsonIngressSpec<>(
ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE,
new IngressIdentifier<>(Message.class, "foo", "bar"),
ingressDefinition);
RoutableProtobufKafkaSourceProvider provider = new RoutableProtobufKafkaSourceProvider();
SourceFunction<?> source = provider.forSpec(spec);
assertThat(source, instanceOf(FlinkKafkaConsumer.class));
}
/**
* Constructor.
*
* @param streams list of streams to fetch data
* @param sourceContext source context
* @param runtimeContext runtime context
* @param configProps config properties
* @param deserializationSchema deserialization schema
* @param shardAssigner shard assigner
*/
public DynamoDBStreamsDataFetcher(List<String> streams,
SourceFunction.SourceContext<T> sourceContext,
RuntimeContext runtimeContext,
Properties configProps,
KinesisDeserializationSchema<T> deserializationSchema,
KinesisShardAssigner shardAssigner) {
super(streams,
sourceContext,
sourceContext.getCheckpointLock(),
runtimeContext,
configProps,
deserializationSchema,
shardAssigner,
null,
null,
new AtomicReference<>(),
new ArrayList<>(),
createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
// use DynamoDBStreamsProxy
DynamoDBStreamsProxy::create);
}
private DecoratedSource sourceFromSpec(IngressIdentifier<?> key, IngressSpec<?> spec) {
SourceProvider provider = universe.sources().get(spec.type());
if (provider == null) {
throw new IllegalStateException(
"Unable to find a source translation for ingress of type "
+ spec.type()
+ ", which is bound for key "
+ key);
}
SourceFunction<?> source = provider.forSpec(spec);
if (source == null) {
throw new NullPointerException(
"A source provider for type " + spec.type() + ", has produced a NULL source.");
}
return DecoratedSource.of(spec, source);
}
/**
* Creates a data stream from the given non-empty collection.
*
* <p>Note that this operation will result in a non-parallel data stream source,
* i.e., a data stream source with parallelism one.
*
* @param data
* The collection of elements to create the data stream from
* @param typeInfo
* The TypeInformation for the produced data stream
* @param <OUT>
* The type of the returned data stream
* @return The data stream representing the given collection
*/
public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT> typeInfo) {
Preconditions.checkNotNull(data, "Collection must not be null");
// must not have null elements and mixed elements
FromElementsFunction.checkCollection(data, typeInfo.getTypeClass());
SourceFunction<OUT> function;
try {
function = new FromElementsFunction<>(typeInfo.createSerializer(getConfig()), data);
}
catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
return addSource(function, "Collection Source", typeInfo).setParallelism(1);
}
/**
* Creates a Kinesis Data Fetcher.
*
* @param streams the streams to subscribe to
* @param sourceContext context of the source function
* @param runtimeContext this subtask's runtime context
* @param configProps the consumer configuration properties
* @param deserializationSchema deserialization schema
*/
public KinesisDataFetcher(List<String> streams,
SourceFunction.SourceContext<T> sourceContext,
RuntimeContext runtimeContext,
Properties configProps,
KinesisDeserializationSchema<T> deserializationSchema,
KinesisShardAssigner shardAssigner,
AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
WatermarkTracker watermarkTracker) {
this(streams,
sourceContext,
sourceContext.getCheckpointLock(),
runtimeContext,
configProps,
deserializationSchema,
shardAssigner,
periodicWatermarkAssigner,
watermarkTracker,
new AtomicReference<>(),
new ArrayList<>(),
createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
KinesisProxy::create);
}
public void doTestPropagationFromCheckpointConfig(boolean failTaskOnCheckpointErrors) {
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecutionEnvironment.setParallelism(1);
streamExecutionEnvironment.getCheckpointConfig().setCheckpointInterval(1000);
streamExecutionEnvironment.getCheckpointConfig().setFailOnCheckpointingErrors(failTaskOnCheckpointErrors);
streamExecutionEnvironment.addSource(new SourceFunction<Integer>() {
@Override
public void run(SourceContext<Integer> ctx) {
}
@Override
public void cancel() {
}
}).addSink(new DiscardingSink<>());
}
public void doTestPropagationFromCheckpointConfig(boolean failTaskOnCheckpointErrors) throws Exception {
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecutionEnvironment.setParallelism(1);
streamExecutionEnvironment.getCheckpointConfig().setCheckpointInterval(1000);
streamExecutionEnvironment.getCheckpointConfig().setFailOnCheckpointingErrors(failTaskOnCheckpointErrors);
streamExecutionEnvironment.addSource(new SourceFunction<Integer>() {
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
}
@Override
public void cancel() {
}
}).addSink(new DiscardingSink<>());
StreamGraph streamGraph = streamExecutionEnvironment.getStreamGraph();
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
SerializedValue<ExecutionConfig> serializedExecutionConfig = jobGraph.getSerializedExecutionConfig();
ExecutionConfig executionConfig =
serializedExecutionConfig.deserializeValue(Thread.currentThread().getContextClassLoader());
Assert.assertEquals(failTaskOnCheckpointErrors, executionConfig.isFailTaskOnCheckpointError());
}
public PulsarRowFetcher(
SourceFunction.SourceContext<Row> sourceContext,
Map<String, MessageId> seedTopicsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<Row>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<Row>> watermarksPunctuated,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
StreamingRuntimeContext runtimeContext,
ClientConfigurationData clientConf,
Map<String, Object> readerConf,
int pollTimeoutMs,
DeserializationSchema<Row> deserializer,
PulsarMetadataReader metadataReader) throws Exception {
super(sourceContext, seedTopicsWithInitialOffsets, watermarksPeriodic, watermarksPunctuated, processingTimeProvider, autoWatermarkInterval, userCodeClassLoader, runtimeContext, clientConf, readerConf, pollTimeoutMs, deserializer, metadataReader);
}
public TestingFetcher(
SourceFunction.SourceContext<T> sourceContext,
Map<String, MessageId> seedTopicsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval) throws Exception {
super(
sourceContext,
seedTopicsWithInitialOffsets,
watermarksPeriodic,
watermarksPunctuated,
processingTimeProvider,
autoWatermarkInterval,
TestingFetcher.class.getClassLoader(),
null,
null,
null,
0,
null,
null);
}
public static void main(String[] args) throws Exception {
//创建流运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));
env.setParallelism(1);
env.addSource(new SourceFunction<Long>() {
@Override
public void run(SourceContext<Long> sourceContext) throws Exception {
while (true) {
sourceContext.collect(System.currentTimeMillis());
}
}
@Override
public void cancel() {
}
})
.map((MapFunction<Long, Long>) aLong -> aLong / 1000).setParallelism(3)
.print();
env.execute("zhisheng RestartStrategy example");
}
static SourceFunction<Event> createEventSource(ParameterTool pt) {
return new SequenceGeneratorSource(
pt.getInt(
SEQUENCE_GENERATOR_SRC_KEYSPACE.key(),
SEQUENCE_GENERATOR_SRC_KEYSPACE.defaultValue()),
pt.getInt(
SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE.key(),
SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE.defaultValue()),
pt.getLong(
SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.key(),
SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.defaultValue()),
pt.getLong(
SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS.key(),
SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS.defaultValue()),
pt.getLong(
SEQUENCE_GENERATOR_SRC_SLEEP_TIME.key(),
SEQUENCE_GENERATOR_SRC_SLEEP_TIME.defaultValue()),
pt.getLong(
SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.key(),
SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.defaultValue()));
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));
//每隔 10s 重启一次,如果两分钟内重启过三次则停止 Job
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(2), Time.seconds(10)));
env.addSource(new SourceFunction<Long>() {
@Override
public void run(SourceContext<Long> sourceContext) throws Exception {
while (true) {
sourceContext.collect(null);
}
}
@Override
public void cancel() {
}
})
.map((MapFunction<Long, Long>) aLong -> aLong / 1)
.print();
env.execute("zhisheng failureRate Restart Strategy example");
}
public static void main(String[] args) throws Exception {
//创建流运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));
//每隔 5s 重启一次,尝试三次如果 Job 还没有起来则停止
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
env.addSource(new SourceFunction<Long>() {
@Override
public void run(SourceContext<Long> sourceContext) throws Exception {
while (true) {
sourceContext.collect(null);
}
}
@Override
public void cancel() {
}
})
.map((MapFunction<Long, Long>) aLong -> aLong / 1)
.print();
env.execute("zhisheng fixedDelay Restart Strategy example");
}
public static void main(String[] args) throws Exception {
//创建流运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));
//每隔 5s 重启一次,尝试三次如果 Job 还没有起来则停止
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
env.addSource(new SourceFunction<Long>() {
@Override
public void run(SourceContext<Long> sourceContext) throws Exception {
while (true) {
sourceContext.collect(System.currentTimeMillis());
}
}
@Override
public void cancel() {
}
})
.map((MapFunction<Long, Long>) aLong -> aLong / 0)
.print();
env.execute("zhisheng RestartStrategy example");
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("Data for Flink")
.requestBatchCount(5)
.buildConfig();
SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
DataStream<NiFiDataPacket> streamSource = env.addSource(nifiSource).setParallelism(2);
DataStream<String> dataStream = streamSource.map(new MapFunction<NiFiDataPacket, String>() {
@Override
public String map(NiFiDataPacket value) throws Exception {
return new String(value.getContent(), Charset.defaultCharset());
}
});
dataStream.print();
env.execute();
}
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 添加数组作为数据输入源
String[] elementInput = new String[]{"hello Flink, 17788900", "Second Line, 17788923"};
DataStream<String> text = env.addSource(new SourceFunction<String>() {
@Override
public void run(SourceContext<String> ctx) throws Exception {
for (String s : elementInput) {
// 切割每一条数据
String[] inp = s.split(",");
Long timestamp = new Long(inp[1]);
// 生成 event time 时间戳
ctx.collectWithTimestamp(s, timestamp);
// 调用 emitWatermark() 方法生成 watermark, 最大延迟设定为 2
ctx.emitWatermark(new Watermark(timestamp - 2));
}
// 设定默认 watermark
ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
}
@Override
public void cancel() {
}
});
text.print();
env.execute();
}
private void processRecord(SourceFunction.SourceContext<OUT> ctx, ReaderRunner<OUT, CURSOR> readerRunner) {
synchronized (ctx.getCheckpointLock()) {
Tuple3<OUT, Long, Long> record = readerRunner.pollRecord();
if (record != null) {
ctx.collectWithTimestamp(record.f0, record.f1);
tpsMetric.markEvent();
if (record.f2 > 0) {
partitionLatency.update(record.f2);
}
}
}
}
@Test
public void testSources() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
}
@Override
public void cancel() {
}
};
DataStreamSource<Integer> src1 = env.addSource(srcFun);
src1.addSink(new DiscardingSink<Integer>());
assertEquals(srcFun, getFunctionFromDataSource(src1));
List<Long> list = Arrays.asList(0L, 1L, 2L);
DataStreamSource<Long> src2 = env.generateSequence(0, 2);
assertTrue(getFunctionFromDataSource(src2) instanceof StatefulSequenceSource);
DataStreamSource<Long> src3 = env.fromElements(0L, 1L, 2L);
assertTrue(getFunctionFromDataSource(src3) instanceof FromElementsFunction);
DataStreamSource<Long> src4 = env.fromCollection(list);
assertTrue(getFunctionFromDataSource(src4) instanceof FromElementsFunction);
}
public static DecoratedSource of(IngressSpec<?> spec, SourceFunction<?> source) {
IngressIdentifier<?> identifier = spec.id();
String name = String.format("%s-%s-ingress", identifier.namespace(), identifier.name());
String uid =
String.format(
"%s-%s-%s-%s-ingress",
spec.type().namespace(), spec.type().type(), identifier.namespace(), identifier.name());
return new DecoratedSource(name, uid, source);
}
private static <T> AbstractStreamOperatorTestHarness<T> createTestHarness(
SourceFunction<T> source, int numSubtasks, int subtaskIndex) throws Exception {
AbstractStreamOperatorTestHarness<T> testHarness =
new AbstractStreamOperatorTestHarness<>(
new StreamSource<>(source), maxParallelism, numSubtasks, subtaskIndex);
testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
return testHarness;
}
@Before
public void setUp() throws Exception {
jobExecuteThread = new CheckedThread() {
@Override
public void go() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new SourceFunction<String>() {
@Override
public void run(SourceContext<String> ctx) throws Exception {
sync.block();
}
@Override
public void cancel() {
sync.releaseBlocker();
}
}).addSink(new PrintSinkFunction());
env.execute();
}
};
jobExecuteThread.start();
sync.awaitBlocker();
}
@VisibleForTesting
protected KinesisDataFetcher(List<String> streams,
SourceFunction.SourceContext<T> sourceContext,
Object checkpointLock,
RuntimeContext runtimeContext,
Properties configProps,
KinesisDeserializationSchema<T> deserializationSchema,
KinesisShardAssigner shardAssigner,
AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
WatermarkTracker watermarkTracker,
AtomicReference<Throwable> error,
List<KinesisStreamShardState> subscribedShardsState,
HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds,
FlinkKinesisProxyFactory kinesisProxyFactory) {
this.streams = checkNotNull(streams);
this.configProps = checkNotNull(configProps);
this.sourceContext = checkNotNull(sourceContext);
this.checkpointLock = checkNotNull(checkpointLock);
this.runtimeContext = checkNotNull(runtimeContext);
this.totalNumberOfConsumerSubtasks = runtimeContext.getNumberOfParallelSubtasks();
this.indexOfThisConsumerSubtask = runtimeContext.getIndexOfThisSubtask();
this.deserializationSchema = checkNotNull(deserializationSchema);
this.shardAssigner = checkNotNull(shardAssigner);
this.periodicWatermarkAssigner = periodicWatermarkAssigner;
this.watermarkTracker = watermarkTracker;
this.kinesisProxyFactory = checkNotNull(kinesisProxyFactory);
this.kinesis = kinesisProxyFactory.create(configProps);
this.consumerMetricGroup = runtimeContext.getMetricGroup()
.addGroup(KinesisConsumerMetricConstants.KINESIS_CONSUMER_METRICS_GROUP);
this.error = checkNotNull(error);
this.subscribedShardsState = checkNotNull(subscribedShardsState);
this.subscribedStreamsToLastDiscoveredShardIds = checkNotNull(subscribedStreamsToLastDiscoveredShardIds);
this.shardConsumersExecutor =
createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks());
this.recordEmitter = createRecordEmitter(configProps);
}
/** This method is exposed for tests that need to mock the KinesisDataFetcher in the consumer. */
protected KinesisDataFetcher<T> createFetcher(
List<String> streams,
SourceFunction.SourceContext<T> sourceContext,
RuntimeContext runtimeContext,
Properties configProps,
KinesisDeserializationSchema<T> deserializationSchema) {
return new KinesisDataFetcher<>(streams, sourceContext, runtimeContext, configProps, deserializationSchema, shardAssigner, periodicWatermarkAssigner, watermarkTracker);
}
@Override
protected KinesisDataFetcher<T> createFetcher(
List<String> streams,
SourceFunction.SourceContext<T> sourceContext,
RuntimeContext runtimeContext,
Properties configProps,
KinesisDeserializationSchema<T> deserializationSchema) {
return fetcher;
}
public TestFetcher(
List<String> streams,
SourceFunction.SourceContext<T> sourceContext,
RuntimeContext runtimeContext,
Properties configProps,
KinesisDeserializationSchema<T> deserializationSchema,
HashMap<StreamShardMetadata, SequenceNumber> testStateSnapshot,
List<StreamShardHandle> testInitialDiscoveryShards) {
super(streams, sourceContext, runtimeContext, configProps, deserializationSchema, DEFAULT_SHARD_ASSIGNER, null, null);
this.testStateSnapshot = testStateSnapshot;
this.testInitialDiscoveryShards = testInitialDiscoveryShards;
}
private void runTest(
SourceFunction<SessionEvent<Integer, TestEventPayload>> dataSource,
WindowFunction<SessionEvent<Integer, TestEventPayload>,
String, Tuple, TimeWindow> windowFunction) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
WindowedStream<SessionEvent<Integer, TestEventPayload>, Tuple, TimeWindow> windowedStream =
env.addSource(dataSource).keyBy("sessionKey")
.window(EventTimeSessionWindows.withGap(Time.milliseconds(MAX_SESSION_EVENT_GAP_MS)));
if (ALLOWED_LATENESS_MS != Long.MAX_VALUE) {
windowedStream = windowedStream.allowedLateness(Time.milliseconds(ALLOWED_LATENESS_MS));
}
if (PURGE_WINDOW_ON_FIRE) {
windowedStream = windowedStream.trigger(PurgingTrigger.of(EventTimeTrigger.create()));
}
windowedStream.apply(windowFunction).print();
JobExecutionResult result = env.execute();
// check that overall event counts match with our expectations. remember that late events within lateness will
// each trigger a window!
Assert.assertEquals(
(LATE_EVENTS_PER_SESSION + 1) * NUMBER_OF_SESSIONS * EVENTS_PER_SESSION,
(long) result.getAccumulatorResult(SESSION_COUNTER_ON_TIME_KEY));
Assert.assertEquals(
NUMBER_OF_SESSIONS * (LATE_EVENTS_PER_SESSION * (LATE_EVENTS_PER_SESSION + 1) / 2),
(long) result.getAccumulatorResult(SESSION_COUNTER_LATE_KEY));
}
private static <T extends Serializable> List<T> runNonRichSourceFunction(SourceFunction<T> sourceFunction) {
final List<T> outputs = new ArrayList<>();
try {
SourceFunction.SourceContext<T> ctx = new CollectingSourceContext<T>(new Object(), outputs);
sourceFunction.run(ctx);
} catch (Exception e) {
throw new RuntimeException("Cannot invoke source.", e);
}
return outputs;
}
public static <T extends Serializable> List<T> runSourceFunction(SourceFunction<T> sourceFunction) throws Exception {
if (sourceFunction instanceof RichFunction) {
return runRichSourceFunction(sourceFunction);
}
else {
return runNonRichSourceFunction(sourceFunction);
}
}
public TestableKinesisDataFetcherForShardConsumerException(final List<String> fakeStreams,
final SourceFunction.SourceContext<T> sourceContext,
final Properties fakeConfiguration,
final KinesisDeserializationSchema<T> deserializationSchema,
final int fakeTotalCountOfSubtasks,
final int fakeIndexOfThisSubtask,
final AtomicReference<Throwable> thrownErrorUnderTest,
final LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest,
final HashMap<String, String> subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
final KinesisProxyInterface fakeKinesis) {
super(fakeStreams, sourceContext, fakeConfiguration, deserializationSchema, fakeTotalCountOfSubtasks,
fakeIndexOfThisSubtask, thrownErrorUnderTest, subscribedShardsStateUnderTest,
subscribedStreamsToLastDiscoveredShardIdsStateUnderTest, fakeKinesis);
}
@Before
public void setUp() throws Exception {
jobExecuteThread = new CheckedThread() {
@Override
public void go() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new SourceFunction<String>() {
@Override
public void run(SourceContext<String> ctx) throws Exception {
sync.block();
}
@Override
public void cancel() {
sync.releaseBlocker();
}
}).addSink(new PrintSinkFunction());
env.execute();
}
};
jobExecuteThread.start();
sync.awaitBlocker();
}