下面列出了org.openjdk.jmh.annotations.OperationsPerInvocation#org.apache.flink.api.common.ExecutionConfig 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testHashCodeEquals() throws Exception {
final String name = "testName";
ListStateDescriptor<String> original = new ListStateDescriptor<>(name, String.class);
ListStateDescriptor<String> same = new ListStateDescriptor<>(name, String.class);
ListStateDescriptor<String> sameBySerializer = new ListStateDescriptor<>(name, StringSerializer.INSTANCE);
// test that hashCode() works on state descriptors with initialized and uninitialized serializers
assertEquals(original.hashCode(), same.hashCode());
assertEquals(original.hashCode(), sameBySerializer.hashCode());
assertEquals(original, same);
assertEquals(original, sameBySerializer);
// equality with a clone
ListStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(original);
assertEquals(original, clone);
// equality with an initialized
clone.initializeSerializerUnlessSet(new ExecutionConfig());
assertEquals(original, clone);
original.initializeSerializerUnlessSet(new ExecutionConfig());
assertEquals(original, same);
}
protected DriverTestBase(ExecutionConfig executionConfig, long memory, int maxNumSorters, long perSortMemory) {
if (memory < 0 || maxNumSorters < 0 || perSortMemory < 0) {
throw new IllegalArgumentException();
}
final long totalMem = Math.max(memory, 0) + (Math.max(maxNumSorters, 0) * perSortMemory);
this.perSortMem = perSortMemory;
this.perSortFractionMem = (double)perSortMemory/totalMem;
this.ioManager = new IOManagerAsync();
this.memManager = totalMem > 0 ? MemoryManagerBuilder.newBuilder().setMemorySize(totalMem).build() : null;
this.inputs = new ArrayList<MutableObjectIterator<Record>>();
this.comparators = new ArrayList<TypeComparator<Record>>();
this.sorters = new ArrayList<UnilateralSortMerger<Record>>();
this.owner = new DummyInvokable();
this.taskConfig = new TaskConfig(new Configuration());
this.executionConfig = executionConfig;
this.taskManageInfo = new TestingTaskManagerRuntimeInfo();
}
public <K> HeapKeyedStateBackend<K> createKeyedBackend(
TypeSerializer<K> keySerializer,
Collection<KeyedStateHandle> stateHandles) throws Exception {
final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 15);
final int numKeyGroups = keyGroupRange.getNumberOfKeyGroups();
ExecutionConfig executionConfig = new ExecutionConfig();
return new HeapKeyedStateBackendBuilder<>(
mock(TaskKvStateRegistry.class),
keySerializer,
HeapStateBackendTestBase.class.getClassLoader(),
numKeyGroups,
keyGroupRange,
executionConfig,
TtlTimeProvider.DEFAULT,
stateHandles,
AbstractStateBackend.getCompressionDecorator(executionConfig),
TestLocalRecoveryConfig.disabled(),
new HeapPriorityQueueSetFactory(keyGroupRange, numKeyGroups, 128),
async,
new CloseableRegistry()).build();
}
@Override
protected List<OUT> executeOnCollections(List<IN1> inputData1, List<IN2> inputData2, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
CrossFunction<IN1, IN2, OUT> function = this.userFunction.getUserCodeObject();
FunctionUtils.setFunctionRuntimeContext(function, ctx);
FunctionUtils.openFunction(function, this.parameters);
ArrayList<OUT> result = new ArrayList<OUT>(inputData1.size() * inputData2.size());
TypeSerializer<IN1> inSerializer1 = getOperatorInfo().getFirstInputType().createSerializer(executionConfig);
TypeSerializer<IN2> inSerializer2 = getOperatorInfo().getSecondInputType().createSerializer(executionConfig);
TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
for (IN1 element1 : inputData1) {
for (IN2 element2 : inputData2) {
IN1 copy1 = inSerializer1.copy(element1);
IN2 copy2 = inSerializer2.copy(element2);
OUT o = function.cross(copy1, copy2);
result.add(outSerializer.copy(o));
}
}
FunctionUtils.closeFunction(function);
return result;
}
@Parameterized.Parameters
public static Collection<Object[]> getConfigurations() {
LinkedList<Object[]> configs = new LinkedList<Object[]>();
ExecutionConfig withReuse = new ExecutionConfig();
withReuse.enableObjectReuse();
ExecutionConfig withoutReuse = new ExecutionConfig();
withoutReuse.disableObjectReuse();
Object[] a = { withoutReuse };
configs.add(a);
Object[] b = { withReuse };
configs.add(b);
return configs;
}
private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy, SlotProvider slotProvider) throws IOException {
final ExecutionGraph executionGraph = new ExecutionGraph(
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
new JobID(),
"Test job",
new Configuration(),
new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
restartStrategy,
slotProvider);
executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
return executionGraph;
}
@Nonnull
private JobGraph createJobGraph(long delay, int parallelism) throws IOException {
SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
final JobVertex source = new JobVertex("source");
source.setInvokableClass(OneTimeFailingInvokable.class);
source.setParallelism(parallelism);
source.setSlotSharingGroup(slotSharingGroup);
final JobVertex sink = new JobVertex("sink");
sink.setInvokableClass(NoOpInvokable.class);
sink.setParallelism(parallelism);
sink.setSlotSharingGroup(slotSharingGroup);
sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
JobGraph jobGraph = new JobGraph(source, sink);
jobGraph.setScheduleMode(ScheduleMode.EAGER);
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, delay));
jobGraph.setExecutionConfig(executionConfig);
return jobGraph;
}
private <T extends Tuple> void runTests(int length, T... instances) {
try {
TupleTypeInfo<T> tupleTypeInfo = (TupleTypeInfo<T>) TypeExtractor.getForObject(instances[0]);
TypeSerializer<T> serializer = tupleTypeInfo.createSerializer(new ExecutionConfig());
Class<T> tupleClass = tupleTypeInfo.getTypeClass();
if(tupleClass == Tuple0.class) {
length = 1;
}
SerializerTestInstance<T> test = new SerializerTestInstance<>(serializer, tupleClass, length, instances);
test.testAll();
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
/**
* This method returns the bytes of a serialized {@link KryoSerializerSnapshot}, that contains a Kryo registration
* of a class that does not exists in the current classpath.
*/
private static byte[] unLoadableSnapshotBytes() throws IOException {
final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
final ClassLoaderUtils.ObjectAndClassLoader<Serializable> outsideClassLoading = ClassLoaderUtils.createSerializableObjectFromNewClassLoader();
try {
Thread.currentThread().setContextClassLoader(outsideClassLoading.getClassLoader());
ExecutionConfig conf = new ExecutionConfig();
conf.registerKryoType(outsideClassLoading.getObject().getClass());
KryoSerializer<Animal> previousSerializer = new KryoSerializer<>(Animal.class, conf);
TypeSerializerSnapshot<Animal> previousSnapshot = previousSerializer.snapshotConfiguration();
DataOutputSerializer out = new DataOutputSerializer(4096);
TypeSerializerSnapshot.writeVersionedSnapshot(out, previousSnapshot);
return out.getCopyOfBuffer();
}
finally {
Thread.currentThread().setContextClassLoader(originalClassLoader);
}
}
@SuppressWarnings("unchecked")
private <T> TypeComparator<T> getTypeComparator(ExecutionConfig executionConfig, TypeInformation<T> inputType, int[] inputKeys, boolean[] inputSortDirections) {
if (inputType instanceof CompositeType) {
return ((CompositeType<T>) inputType).createComparator(inputKeys, inputSortDirections, 0, executionConfig);
} else if (inputType instanceof AtomicType) {
return ((AtomicType<T>) inputType).createComparator(inputSortDirections[0], executionConfig);
}
throw new InvalidProgramException("Input type of coGroup must be one of composite types or atomic types.");
}
/**
* Constructor to create a new {@link PojoSerializer}.
*/
@SuppressWarnings("unchecked")
public PojoSerializer(
Class<T> clazz,
TypeSerializer<?>[] fieldSerializers,
Field[] fields,
ExecutionConfig executionConfig) {
this.clazz = checkNotNull(clazz);
this.fieldSerializers = (TypeSerializer<Object>[]) checkNotNull(fieldSerializers);
this.fields = checkNotNull(fields);
this.numFields = fieldSerializers.length;
this.executionConfig = checkNotNull(executionConfig);
for (int i = 0; i < numFields; i++) {
this.fields[i].setAccessible(true);
}
this.cl = Thread.currentThread().getContextClassLoader();
// We only want those classes that are not our own class and are actually sub-classes.
LinkedHashSet<Class<?>> registeredSubclasses =
getRegisteredSubclassesFromExecutionConfig(clazz, executionConfig);
this.registeredClasses = createRegisteredSubclassTags(registeredSubclasses);
this.registeredSerializers = createRegisteredSubclassSerializers(registeredSubclasses, executionConfig);
this.subclassSerializerCache = new HashMap<>();
}
/**
* The main constructor for creating a FlinkKafkaProducer.
*
* @param defaultTopicId The default topic to write data to
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner.
*/
public FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
requireNonNull(defaultTopicId, "TopicID not set");
requireNonNull(serializationSchema, "serializationSchema not set");
requireNonNull(producerConfig, "producerConfig not set");
ClosureCleaner.clean(customPartitioner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
ClosureCleaner.ensureSerializable(serializationSchema);
this.defaultTopicId = defaultTopicId;
this.schema = serializationSchema;
this.producerConfig = producerConfig;
this.flinkKafkaPartitioner = customPartitioner;
// set the producer configuration properties for kafka record key value serializers.
if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
} else {
LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
}
if (!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
} else {
LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
}
// eagerly ensure that bootstrap servers are set.
if (!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
throw new IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be supplied in the producer config properties.");
}
this.topicPartitionsMap = new HashMap<>();
}
@Test
public void testReduceDriverImmutableEmpty() {
try {
TestTaskContext<ReduceFunction<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
new TestTaskContext<ReduceFunction<Tuple2<String,Integer>>, Tuple2<String,Integer>>();
List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
TupleTypeInfo<Tuple2<String, Integer>> typeInfo = (TupleTypeInfo<Tuple2<String, Integer>>) TypeExtractor.getForObject(data.get(0));
MutableObjectIterator<Tuple2<String, Integer>> input = EmptyMutableObjectIterator.get();
context.setDriverStrategy(DriverStrategy.SORTED_REDUCE);
TypeComparator<Tuple2<String, Integer>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, new ExecutionConfig());
GatheringCollector<Tuple2<String, Integer>> result = new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer(new ExecutionConfig()));
context.setInput1(input, typeInfo.createSerializer(new ExecutionConfig()));
context.setComparator1(comparator);
context.setCollector(result);
ReduceDriver<Tuple2<String, Integer>> driver = new ReduceDriver<Tuple2<String,Integer>>();
driver.setup(context);
driver.prepare();
driver.run();
Assert.assertEquals(0, result.getList().size());
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
@Test
public void testHashCodeEquals() throws Exception {
final String name = "testName";
MapStateDescriptor<String, String> original = new MapStateDescriptor<>(name, String.class, String.class);
MapStateDescriptor<String, String> same = new MapStateDescriptor<>(name, String.class, String.class);
MapStateDescriptor<String, String> sameBySerializer =
new MapStateDescriptor<>(name, StringSerializer.INSTANCE, StringSerializer.INSTANCE);
// test that hashCode() works on state descriptors with initialized and uninitialized serializers
assertEquals(original.hashCode(), same.hashCode());
assertEquals(original.hashCode(), sameBySerializer.hashCode());
assertEquals(original, same);
assertEquals(original, sameBySerializer);
// equality with a clone
MapStateDescriptor<String, String> clone = CommonTestUtils.createCopySerializable(original);
assertEquals(original, clone);
// equality with an initialized
clone.initializeSerializerUnlessSet(new ExecutionConfig());
assertEquals(original, clone);
original.initializeSerializerUnlessSet(new ExecutionConfig());
assertEquals(original, same);
}
public static void recursivelyRegisterType(Class<?> type, ExecutionConfig config, Set<Class<?>> alreadySeen) {
// don't register or remember primitives
if (type == null || type.isPrimitive() || type == Object.class) {
return;
}
// prevent infinite recursion for recursive types
if (!alreadySeen.add(type)) {
return;
}
if (type.isArray()) {
recursivelyRegisterType(type.getComponentType(), config, alreadySeen);
}
else {
config.registerKryoType(type);
// add serializers for Avro type if necessary
AvroUtils.getAvroUtils().addAvroSerializersIfRequired(config, type);
Field[] fields = type.getDeclaredFields();
for (Field field : fields) {
if (Modifier.isStatic(field.getModifiers()) || Modifier.isTransient(field.getModifiers())) {
continue;
}
Type fieldType = field.getGenericType();
recursivelyRegisterGenericType(fieldType, config, alreadySeen);
}
}
}
@Test
public void testClassTimestampAssignerUsingSupplier() {
WatermarkStrategy<Object> wmStrategy = WatermarkStrategy
.forMonotonousTimestamps()
.withTimestampAssigner((context) -> new TestTimestampAssigner());
// ensure that the closure can be cleaned through the WatermarkStategies
ClosureCleaner.clean(wmStrategy, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
TimestampAssigner<Object> timestampAssigner = wmStrategy
.createTimestampAssigner(assignerContext());
assertThat(timestampAssigner.extractTimestamp(null, 13L), is(42L));
}
@Override
@SuppressWarnings("unchecked")
public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
if (this.writerTemplate instanceof InputTypeConfigurable) {
((InputTypeConfigurable) writerTemplate).setInputType(type, executionConfig);
}
}
@Test
public void testFoldingStateInstantiation() throws Exception {
final ExecutionConfig config = new ExecutionConfig();
config.registerKryoType(Path.class);
final AtomicReference<Object> descriptorCapture = new AtomicReference<>();
StreamingRuntimeContext context = new StreamingRuntimeContext(
createDescriptorCapturingMockOp(descriptorCapture, config),
createMockEnvironment(),
Collections.<String, Accumulator<?, ?>>emptyMap());
@SuppressWarnings("unchecked")
FoldFunction<String, TaskInfo> folder = (FoldFunction<String, TaskInfo>) mock(FoldFunction.class);
FoldingStateDescriptor<String, TaskInfo> descr =
new FoldingStateDescriptor<>("name", null, folder, TaskInfo.class);
context.getFoldingState(descr);
FoldingStateDescriptor<?, ?> descrIntercepted = (FoldingStateDescriptor<?, ?>) descriptorCapture.get();
TypeSerializer<?> serializer = descrIntercepted.getSerializer();
// check that the Path class is really registered, i.e., the execution config was applied
assertTrue(serializer instanceof KryoSerializer);
assertTrue(((KryoSerializer<?>) serializer).getKryo().getRegistration(Path.class).getId() > 0);
}
@Override
@SuppressWarnings("unchecked")
public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
if (this.writerTemplate instanceof InputTypeConfigurable) {
((InputTypeConfigurable) writerTemplate).setInputType(type, executionConfig);
}
}
@Test
public void testImmutableEmpty() {
try {
TestTaskContext<ReduceFunction<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
new TestTaskContext<ReduceFunction<Tuple2<String,Integer>>, Tuple2<String,Integer>>(1024 * 1024);
context.getTaskConfig().setRelativeMemoryDriver(0.5);
List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
Collections.shuffle(data);
TupleTypeInfo<Tuple2<String, Integer>> typeInfo = (TupleTypeInfo<Tuple2<String, Integer>>) TypeExtractor.getForObject(data.get(0));
MutableObjectIterator<Tuple2<String, Integer>> input = EmptyMutableObjectIterator.get();
context.setDriverStrategy(DriverStrategy.SORTED_PARTIAL_REDUCE);
TypeComparator<Tuple2<String, Integer>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, new ExecutionConfig());
GatheringCollector<Tuple2<String, Integer>> result = new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer(new ExecutionConfig()));
context.setInput1(input, typeInfo.createSerializer(new ExecutionConfig()));
context.setComparator1(comparator);
context.setCollector(result);
ReduceCombineDriver<Tuple2<String, Integer>> driver = new ReduceCombineDriver<Tuple2<String,Integer>>();
driver.setup(context);
driver.prepare();
driver.run();
Assert.assertEquals(0, result.getList().size());
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
@VisibleForTesting
public MaterializedCollectStreamResult(
TypeInformation<Row> outputType,
ExecutionConfig config,
InetAddress gatewayAddress,
int gatewayPort,
int maxRowCount,
int overcommitThreshold) {
super(outputType, config, gatewayAddress, gatewayPort);
if (maxRowCount <= 0) {
this.maxRowCount = Integer.MAX_VALUE;
} else {
this.maxRowCount = maxRowCount;
}
this.overcommitThreshold = overcommitThreshold;
// prepare for materialization
final int initialCapacity = computeMaterializedTableCapacity(maxRowCount); // avoid frequent resizing
materializedTable = new ArrayList<>(initialCapacity);
rowPositionCache = new HashMap<>(initialCapacity);
snapshot = new ArrayList<>();
validRowPosition = 0;
isLastSnapshot = false;
pageCount = 0;
}
@Test
public void testBroadcastVariableSimple() {
try {
RuntimeUDFContext ctx = new RuntimeUDFContext(
taskInfo, getClass().getClassLoader(), new ExecutionConfig(),
new HashMap<>(),
new HashMap<>(),
new UnregisteredMetricsGroup());
ctx.setBroadcastVariable("name1", Arrays.asList(1, 2, 3, 4));
ctx.setBroadcastVariable("name2", Arrays.asList(1.0, 2.0, 3.0, 4.0));
assertTrue(ctx.hasBroadcastVariable("name1"));
assertTrue(ctx.hasBroadcastVariable("name2"));
List<Integer> list1 = ctx.getBroadcastVariable("name1");
List<Double> list2 = ctx.getBroadcastVariable("name2");
assertEquals(Arrays.asList(1, 2, 3, 4), list1);
assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list2);
// access again
List<Integer> list3 = ctx.getBroadcastVariable("name1");
List<Double> list4 = ctx.getBroadcastVariable("name2");
assertEquals(Arrays.asList(1, 2, 3, 4), list3);
assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list4);
// and again ;-)
List<Integer> list5 = ctx.getBroadcastVariable("name1");
List<Double> list6 = ctx.getBroadcastVariable("name2");
assertEquals(Arrays.asList(1, 2, 3, 4), list5);
assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list6);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testProject() throws Exception {
TypeInformation<Tuple5<Integer, String, Integer, String, Integer>> inType = TypeExtractor
.getForObject(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4));
int[] fields = new int[]{4, 4, 3};
TupleSerializer<Tuple3<Integer, Integer, String>> serializer =
new TupleTypeInfo<Tuple3<Integer, Integer, String>>(StreamProjection.extractFieldTypes(fields, inType))
.createSerializer(new ExecutionConfig());
@SuppressWarnings("unchecked")
StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> operator =
new StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(
fields, serializer);
OneInputStreamOperatorTestHarness<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> testHarness = new OneInputStreamOperatorTestHarness<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(operator);
long initialTime = 0L;
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
testHarness.open();
testHarness.processElement(new StreamRecord<Tuple5<Integer, String, Integer, String, Integer>>(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4), initialTime + 1));
testHarness.processElement(new StreamRecord<Tuple5<Integer, String, Integer, String, Integer>>(new Tuple5<Integer, String, Integer, String, Integer>(2, "s", 3, "c", 2), initialTime + 2));
testHarness.processElement(new StreamRecord<Tuple5<Integer, String, Integer, String, Integer>>(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "c", 2), initialTime + 3));
testHarness.processWatermark(new Watermark(initialTime + 2));
testHarness.processElement(new StreamRecord<Tuple5<Integer, String, Integer, String, Integer>>(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "a", 7), initialTime + 4));
expectedOutput.add(new StreamRecord<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(4, 4, "b"), initialTime + 1));
expectedOutput.add(new StreamRecord<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(2, 2, "c"), initialTime + 2));
expectedOutput.add(new StreamRecord<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(2, 2, "c"), initialTime + 3));
expectedOutput.add(new Watermark(initialTime + 2));
expectedOutput.add(new StreamRecord<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(7, 7, "a"), initialTime + 4));
TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
}
ReadOnlyContextImpl(
final ExecutionConfig executionConfig,
final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates,
final TimerService timerService) {
function.super();
this.config = Preconditions.checkNotNull(executionConfig);
this.states = Preconditions.checkNotNull(broadcastStates);
this.timerService = Preconditions.checkNotNull(timerService);
}
@SuppressWarnings("unchecked")
private static <T> boolean trySetOutputType(
Function userFunction,
TypeInformation<T> outTypeInfo,
ExecutionConfig executionConfig) {
Preconditions.checkNotNull(outTypeInfo);
Preconditions.checkNotNull(executionConfig);
if (OutputTypeConfigurable.class.isAssignableFrom(userFunction.getClass())) {
((OutputTypeConfigurable<T>) userFunction).setOutputType(outTypeInfo, executionConfig);
return true;
}
return false;
}
private JobGraph createSingleVertexJobWithRestartStrategy() throws IOException {
final JobGraph jobGraph = createSingleVertexJobGraph();
final ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
jobGraph.setExecutionConfig(executionConfig);
return jobGraph;
}
/**
* Creates a result. Might start threads or opens sockets so every created result must be closed.
*/
public <T> DynamicResult<T> createResult(
Environment env,
TableSchema schema,
ExecutionConfig config,
ClassLoader classLoader) {
if (env.getExecution().inStreamingMode()) {
// determine gateway address (and port if possible)
final InetAddress gatewayAddress = getGatewayAddress(env.getDeployment());
final int gatewayPort = getGatewayPort(env.getDeployment());
if (env.getExecution().isChangelogMode() || env.getExecution().isTableauMode()) {
return new ChangelogCollectStreamResult<>(
schema,
config,
gatewayAddress,
gatewayPort,
classLoader);
} else {
return new MaterializedCollectStreamResult<>(
schema,
config,
gatewayAddress,
gatewayPort,
env.getExecution().getMaxTableResultRows(),
classLoader);
}
} else {
// Batch Execution
if (env.getExecution().isTableMode() || env.getExecution().isTableauMode()) {
return new MaterializedCollectBatchResult<>(schema, config, classLoader);
} else {
throw new SqlExecutionException(
"Results of batch queries can only be served in table or tableau mode.");
}
}
}
ReadOnlyContextImpl(
final ExecutionConfig executionConfig,
final BroadcastProcessFunction<IN1, IN2, OUT> function,
final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates,
final ProcessingTimeService timerService) {
function.super();
this.config = Preconditions.checkNotNull(executionConfig);
this.states = Preconditions.checkNotNull(broadcastStates);
this.timerService = Preconditions.checkNotNull(timerService);
}
/**
* Ensure that we get some output from the given operator when pushing in an element and
* setting watermark and processing time to {@code Long.MAX_VALUE}.
*/
private static <K, IN, OUT> void processElementAndEnsureOutput(
OneInputStreamOperator<IN, OUT> operator,
KeySelector<IN, K> keySelector,
TypeInformation<K> keyType,
IN element) throws Exception {
KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(
operator,
keySelector,
keyType);
if (operator instanceof OutputTypeConfigurable) {
// use a dummy type since window functions just need the ExecutionConfig
// this is also only needed for Fold, which we're getting rid off soon.
((OutputTypeConfigurable) operator).setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
}
testHarness.open();
testHarness.setProcessingTime(0);
testHarness.processWatermark(Long.MIN_VALUE);
testHarness.processElement(new StreamRecord<>(element, 0));
// provoke any processing-time/event-time triggers
testHarness.setProcessingTime(Long.MAX_VALUE);
testHarness.processWatermark(Long.MAX_VALUE);
// we at least get the two watermarks and should also see an output element
assertTrue(testHarness.getOutput().size() >= 3);
testHarness.close();
}
@Test
public void testHashCodeEquals() throws Exception {
final String name = "testName";
MapStateDescriptor<String, String> original = new MapStateDescriptor<>(name, String.class, String.class);
MapStateDescriptor<String, String> same = new MapStateDescriptor<>(name, String.class, String.class);
MapStateDescriptor<String, String> sameBySerializer =
new MapStateDescriptor<>(name, StringSerializer.INSTANCE, StringSerializer.INSTANCE);
// test that hashCode() works on state descriptors with initialized and uninitialized serializers
assertEquals(original.hashCode(), same.hashCode());
assertEquals(original.hashCode(), sameBySerializer.hashCode());
assertEquals(original, same);
assertEquals(original, sameBySerializer);
// equality with a clone
MapStateDescriptor<String, String> clone = CommonTestUtils.createCopySerializable(original);
assertEquals(original, clone);
// equality with an initialized
clone.initializeSerializerUnlessSet(new ExecutionConfig());
assertEquals(original, clone);
original.initializeSerializerUnlessSet(new ExecutionConfig());
assertEquals(original, same);
}