org.springframework.validation.beanvalidation.CustomValidatorBean#org.springframework.cloud.stream.config.BindingServiceProperties源码实例Demo

下面列出了org.springframework.validation.beanvalidation.CustomValidatorBean#org.springframework.cloud.stream.config.BindingServiceProperties 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

public KafkaStreamsFunctionProcessor(BindingServiceProperties bindingServiceProperties,
									KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
									KeyValueSerdeResolver keyValueSerdeResolver,
									KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
									KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate,
									CleanupConfig cleanupConfig,
									StreamFunctionProperties streamFunctionProperties,
									KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
									StreamsBuilderFactoryBeanCustomizer customizer, ConfigurableEnvironment environment) {
	super(bindingServiceProperties, kafkaStreamsBindingInformationCatalogue, kafkaStreamsExtendedBindingProperties,
			keyValueSerdeResolver, cleanupConfig);
	this.bindingServiceProperties = bindingServiceProperties;
	this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
	this.keyValueSerdeResolver = keyValueSerdeResolver;
	this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
	this.kafkaStreamsMessageConversionDelegate = kafkaStreamsMessageConversionDelegate;
	this.streamFunctionProperties = streamFunctionProperties;
	this.kafkaStreamsBinderConfigurationProperties = kafkaStreamsBinderConfigurationProperties;
	this.customizer = customizer;
	this.environment = environment;
}
 
private static Map<String, BinderConfiguration> getBinderConfigurations(
		BindingServiceProperties properties) {

	Map<String, BinderConfiguration> binderConfigurations = new HashMap<>();
	Map<String, BinderProperties> declaredBinders = properties.getBinders();

	for (Map.Entry<String, BinderProperties> binderEntry : declaredBinders
			.entrySet()) {
		BinderProperties binderProperties = binderEntry.getValue();
		binderConfigurations.put(binderEntry.getKey(),
				new BinderConfiguration(binderProperties.getType(),
						binderProperties.getEnvironment(),
						binderProperties.isInheritEnvironment(),
						binderProperties.isDefaultCandidate()));
	}
	return binderConfigurations;
}
 
@Bean
public KafkaStreamsStreamListenerSetupMethodOrchestrator kafkaStreamsStreamListenerSetupMethodOrchestrator(
		BindingServiceProperties bindingServiceProperties,
		KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
		KeyValueSerdeResolver keyValueSerdeResolver,
		KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
		KStreamStreamListenerParameterAdapter kafkaStreamListenerParameterAdapter,
		Collection<StreamListenerResultAdapter> streamListenerResultAdapters,
		ObjectProvider<CleanupConfig> cleanupConfig,
		ObjectProvider<StreamsBuilderFactoryBeanCustomizer> customizerProvider, ConfigurableEnvironment environment) {
	return new KafkaStreamsStreamListenerSetupMethodOrchestrator(
			bindingServiceProperties, kafkaStreamsExtendedBindingProperties,
			keyValueSerdeResolver, kafkaStreamsBindingInformationCatalogue,
			kafkaStreamListenerParameterAdapter, streamListenerResultAdapters,
			cleanupConfig.getIfUnique(), customizerProvider.getIfUnique(), environment);
}
 
@Bean
@Conditional(FunctionDetectorCondition.class)
public KafkaStreamsFunctionProcessor kafkaStreamsFunctionProcessor(BindingServiceProperties bindingServiceProperties,
																KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
																KeyValueSerdeResolver keyValueSerdeResolver,
																KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
																KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate,
																ObjectProvider<CleanupConfig> cleanupConfig,
																StreamFunctionProperties streamFunctionProperties,
																@Qualifier("binderConfigurationProperties") KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
																ObjectProvider<StreamsBuilderFactoryBeanCustomizer> customizerProvider, ConfigurableEnvironment environment) {
	return new KafkaStreamsFunctionProcessor(bindingServiceProperties, kafkaStreamsExtendedBindingProperties,
			keyValueSerdeResolver, kafkaStreamsBindingInformationCatalogue, kafkaStreamsMessageConversionDelegate,
			cleanupConfig.getIfUnique(), streamFunctionProperties, kafkaStreamsBinderConfigurationProperties,
			customizerProvider.getIfUnique(), environment);
}
 
KafkaStreamsStreamListenerSetupMethodOrchestrator(
		BindingServiceProperties bindingServiceProperties,
		KafkaStreamsExtendedBindingProperties extendedBindingProperties,
		KeyValueSerdeResolver keyValueSerdeResolver,
		KafkaStreamsBindingInformationCatalogue bindingInformationCatalogue,
		StreamListenerParameterAdapter streamListenerParameterAdapter,
		Collection<StreamListenerResultAdapter> listenerResultAdapters,
		CleanupConfig cleanupConfig,
		StreamsBuilderFactoryBeanCustomizer customizer,
		ConfigurableEnvironment environment) {
	super(bindingServiceProperties, bindingInformationCatalogue, extendedBindingProperties, keyValueSerdeResolver, cleanupConfig);
	this.bindingServiceProperties = bindingServiceProperties;
	this.kafkaStreamsExtendedBindingProperties = extendedBindingProperties;
	this.keyValueSerdeResolver = keyValueSerdeResolver;
	this.kafkaStreamsBindingInformationCatalogue = bindingInformationCatalogue;
	this.streamListenerParameterAdapter = streamListenerParameterAdapter;
	this.streamListenerResultAdapters = listenerResultAdapters;
	this.customizer = customizer;
	this.environment = environment;
}
 
源代码6 项目: spring-cloud-stream   文件: StreamBridge.java
/**
 *
 * @param functionCatalog instance of {@link FunctionCatalog}
 * @param functionRegistry instance of {@link FunctionRegistry}
 * @param bindingServiceProperties instance of {@link BindingServiceProperties}
 * @param applicationContext instance of {@link ConfigurableApplicationContext}
 */
@SuppressWarnings("serial")
StreamBridge(FunctionCatalog functionCatalog, FunctionRegistry functionRegistry,
		BindingServiceProperties bindingServiceProperties, ConfigurableApplicationContext applicationContext) {
	this.functionCatalog = functionCatalog;
	this.functionRegistry = functionRegistry;
	this.applicationContext = applicationContext;
	this.bindingServiceProperties = bindingServiceProperties;
	this.channelCache = new LinkedHashMap<String, SubscribableChannel>() {
		@Override
		protected boolean removeEldestEntry(Map.Entry<String, SubscribableChannel> eldest) {
			boolean remove = size() > bindingServiceProperties.getDynamicDestinationCacheSize();
			if (remove && logger.isDebugEnabled()) {
				logger.debug("Removing message channel from cache " + eldest.getKey());
			}
			return remove;
		}
	};
}
 
@Test
public void testEmptyConfiguration() {
	TestChannelBinderConfiguration.applicationContextRunner(SampleConfiguration.class)
			.withPropertyValues(
					"spring.jmx.enabled=false",
					"spring.cloud.stream.bindings.fooDestination.producer.partitionKeyExtractorName=keyExtractor")
			.run(context -> {
				InputDestination input = context.getBean(InputDestination.class);
				input.send(new GenericMessage<String>("fooDestination"));

				BindingServiceProperties serviceProperties = context.getBean(BindingServiceProperties.class);
				assertThat("keyExtractor").isEqualTo(
						serviceProperties.getProducerProperties("fooDestination").getPartitionKeyExtractorName());

				OutputDestination output = context.getBean(OutputDestination.class);
				assertThat(output.receive(1000).getPayload()).isEqualTo("fooDestination".getBytes());
			});
}
 
public void testConfigureOutputChannelWithBadContentType() {
	BindingServiceProperties props = new BindingServiceProperties();
	BindingProperties bindingProps = new BindingProperties();
	bindingProps.setContentType("application/json");
	props.setBindings(Collections.singletonMap("foo", bindingProps));
	CompositeMessageConverterFactory converterFactory = new CompositeMessageConverterFactory(
			Collections.<MessageConverter>emptyList(), null);
	MessageConverterConfigurer configurer = new MessageConverterConfigurer(props,
			converterFactory.getMessageConverterForAllRegistered());
	QueueChannel out = new QueueChannel();
	configurer.configureOutputChannel(out, "foo");
	out.send(new GenericMessage<Foo>(new Foo(), Collections
			.<String, Object>singletonMap(MessageHeaders.CONTENT_TYPE, "bad/ct")));
	Message<?> received = out.receive(0);
	assertThat(received).isNotNull();
	assertThat(received.getPayload()).isInstanceOf(Foo.class);
}
 
源代码9 项目: spring-cloud-stream   文件: BindingServiceTests.java
@Test
public void testProducerPropertiesValidation() {
	BindingServiceProperties serviceProperties = new BindingServiceProperties();
	Map<String, BindingProperties> bindingProperties = new HashMap<>();
	BindingProperties props = new BindingProperties();
	ProducerProperties producerProperties = new ProducerProperties();
	producerProperties.setPartitionCount(0);
	props.setDestination("foo");
	props.setProducer(producerProperties);
	final String outputChannelName = "output";
	bindingProperties.put(outputChannelName, props);
	serviceProperties.setBindings(bindingProperties);
	DefaultBinderFactory binderFactory = createMockBinderFactory();
	BindingService service = new BindingService(serviceProperties, binderFactory);
	MessageChannel outputChannel = new DirectChannel();
	try {
		service.bindProducer(outputChannel, outputChannelName);
		fail("Producer properties should be validated.");
	}
	catch (IllegalStateException e) {
		assertThat(e)
				.hasMessageContaining("Partition count should be greater than zero.");
	}
}
 
源代码10 项目: spring-cloud-stream   文件: BindingServiceTests.java
@Test
public void testConsumerPropertiesValidation() {
	BindingServiceProperties serviceProperties = new BindingServiceProperties();
	Map<String, BindingProperties> bindingProperties = new HashMap<>();
	BindingProperties props = new BindingProperties();
	ConsumerProperties consumerProperties = new ConsumerProperties();
	consumerProperties.setConcurrency(0);
	props.setDestination("foo");
	props.setConsumer(consumerProperties);
	final String inputChannelName = "input";
	bindingProperties.put(inputChannelName, props);
	serviceProperties.setBindings(bindingProperties);
	DefaultBinderFactory binderFactory = createMockBinderFactory();
	BindingService service = new BindingService(serviceProperties, binderFactory);
	MessageChannel inputChannel = new DirectChannel();
	try {
		service.bindConsumer(inputChannel, inputChannelName);
		fail("Consumer properties should be validated.");
	}
	catch (IllegalStateException e) {
		assertThat(e)
				.hasMessageContaining("Concurrency should be greater than zero.");
	}
}
 
源代码11 项目: spring-cloud-stream   文件: BindingServiceTests.java
@Test
public void testUnknownBinderOnBindingFailure() {
	HashMap<String, String> properties = new HashMap<>();
	properties.put("spring.cloud.stream.bindings.input.destination", "fooInput");
	properties.put("spring.cloud.stream.bindings.input.binder", "mock");
	properties.put("spring.cloud.stream.bindings.output.destination", "fooOutput");
	properties.put("spring.cloud.stream.bindings.output.binder", "mockError");
	BindingServiceProperties bindingServiceProperties = createBindingServiceProperties(
			properties);
	BindingService bindingService = new BindingService(bindingServiceProperties,
			createMockBinderFactory());
	bindingService.bindConsumer(new DirectChannel(), "input");
	try {
		bindingService.bindProducer(new DirectChannel(), "output");
		fail("Expected 'Unknown binder configuration'");
	}
	catch (IllegalStateException e) {
		assertThat(e).hasMessageContaining("Unknown binder configuration: mockError");
	}
}
 
源代码12 项目: spring-cloud-stream   文件: BindingServiceTests.java
@Test
public void testUnrecognizedBinderAllowedIfNotUsed() {
	HashMap<String, String> properties = new HashMap<>();
	properties.put("spring.cloud.stream.bindings.input.destination", "fooInput");
	properties.put("spring.cloud.stream.bindings.output.destination", "fooOutput");
	properties.put("spring.cloud.stream.defaultBinder", "mock1");
	properties.put("spring.cloud.stream.binders.mock1.type", "mock");
	properties.put("spring.cloud.stream.binders.kafka1.type", "kafka");
	BindingServiceProperties bindingServiceProperties = createBindingServiceProperties(
			properties);
	BinderFactory binderFactory = new BindingServiceConfiguration()
			.binderFactory(createMockBinderTypeRegistry(), bindingServiceProperties);
	BindingService bindingService = new BindingService(bindingServiceProperties,
			binderFactory);
	bindingService.bindConsumer(new DirectChannel(), "input");
	bindingService.bindProducer(new DirectChannel(), "output");
}
 
源代码13 项目: spring-cloud-stream   文件: BindingServiceTests.java
@Test
public void testUnrecognizedBinderDisallowedIfUsed() {
	HashMap<String, String> properties = new HashMap<>();
	properties.put("spring.cloud.stream.bindings.input.destination", "fooInput");
	properties.put("spring.cloud.stream.bindings.input.binder", "mock1");
	properties.put("spring.cloud.stream.bindings.output.destination", "fooOutput");
	properties.put("spring.cloud.stream.bindings.output.type", "kafka1");
	properties.put("spring.cloud.stream.binders.mock1.type", "mock");
	properties.put("spring.cloud.stream.binders.kafka1.type", "kafka");
	BindingServiceProperties bindingServiceProperties = createBindingServiceProperties(
			properties);
	BinderFactory binderFactory = new BindingServiceConfiguration()
			.binderFactory(createMockBinderTypeRegistry(), bindingServiceProperties);
	BindingService bindingService = new BindingService(bindingServiceProperties,
			binderFactory);
	bindingService.bindConsumer(new DirectChannel(), "input");
	try {
		bindingService.bindProducer(new DirectChannel(), "output");
		fail("Expected 'Unknown binder configuration'");
	}
	catch (IllegalArgumentException e) {
		assertThat(e).hasMessageContaining("Binder type kafka is not defined");
	}
}
 
@SuppressWarnings("unchecked")
@Before
public void setupContext() throws Exception {

	this.context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					BinderAwareChannelResolverTests.InterceptorConfiguration.class))
							.web(WebApplicationType.NONE).run();

	this.resolver = this.context.getBean(BinderAwareChannelResolver.class);
	this.binder = this.context.getBean(Binder.class);
	this.bindingServiceProperties = this.context
			.getBean(BindingServiceProperties.class);
	this.bindingTargetFactory = this.context
			.getBean(SubscribableChannelBindingTargetFactory.class);
}
 
KStreamBoundElementFactory(BindingServiceProperties bindingServiceProperties,
						KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue,
						EncodingDecodingBindAdviceHandler encodingDecodingBindAdviceHandler) {
	super(KStream.class);
	this.bindingServiceProperties = bindingServiceProperties;
	this.kafkaStreamsBindingInformationCatalogue = KafkaStreamsBindingInformationCatalogue;
	this.encodingDecodingBindAdviceHandler = encodingDecodingBindAdviceHandler;
}
 
public AbstractKafkaStreamsBinderProcessor(BindingServiceProperties bindingServiceProperties,
		KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
		KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
		KeyValueSerdeResolver keyValueSerdeResolver, CleanupConfig cleanupConfig) {
	this.bindingServiceProperties = bindingServiceProperties;
	this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
	this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
	this.keyValueSerdeResolver = keyValueSerdeResolver;
	this.cleanupConfig = cleanupConfig;
}
 
@Bean
@ConfigurationProperties(prefix = "spring.cloud.stream.kafka.streams.binder")
public KafkaStreamsBinderConfigurationProperties binderConfigurationProperties(
		KafkaProperties kafkaProperties, ConfigurableEnvironment environment,
		BindingServiceProperties properties, ConfigurableApplicationContext context) throws Exception {
	final Map<String, BinderConfiguration> binderConfigurations = getBinderConfigurations(
			properties);
	for (Map.Entry<String, BinderConfiguration> entry : binderConfigurations
			.entrySet()) {
		final BinderConfiguration binderConfiguration = entry.getValue();
		final String binderType = binderConfiguration.getBinderType();
		if (binderType != null && (binderType.equals(KSTREAM_BINDER_TYPE)
				|| binderType.equals(KTABLE_BINDER_TYPE)
				|| binderType.equals(GLOBALKTABLE_BINDER_TYPE))) {
			Map<String, Object> binderProperties = new HashMap<>();
			this.flatten(null, binderConfiguration.getProperties(), binderProperties);
			environment.getPropertySources().addFirst(
					new MapPropertySource(entry.getKey() + "-kafkaStreamsBinderEnv", binderProperties));

			Binder binder = new Binder(ConfigurationPropertySources.get(environment),
					new PropertySourcesPlaceholdersResolver(environment),
					IntegrationUtils.getConversionService(context.getBeanFactory()), null);
			final Constructor<KafkaStreamsBinderConfigurationProperties> kafkaStreamsBinderConfigurationPropertiesConstructor =
					ReflectionUtils.accessibleConstructor(KafkaStreamsBinderConfigurationProperties.class, KafkaProperties.class);
			final KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties =
					BeanUtils.instantiateClass(kafkaStreamsBinderConfigurationPropertiesConstructor, kafkaProperties);
			final BindResult<KafkaStreamsBinderConfigurationProperties> bind = binder.bind("spring.cloud.stream.kafka.streams.binder", Bindable.ofInstance(kafkaStreamsBinderConfigurationProperties));
			context.getBeanFactory().registerSingleton(
					entry.getKey() + "-KafkaStreamsBinderConfigurationProperties",
					bind.get());
		}
	}
	return new KafkaStreamsBinderConfigurationProperties(kafkaProperties);
}
 
@Bean
public KStreamBoundElementFactory kStreamBoundElementFactory(
		BindingServiceProperties bindingServiceProperties,
		KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue,
		EncodingDecodingBindAdviceHandler encodingDecodingBindAdviceHandler) {
	return new KStreamBoundElementFactory(bindingServiceProperties,
			KafkaStreamsBindingInformationCatalogue, encodingDecodingBindAdviceHandler);
}
 
@Bean
public InitializingBean functionInitializer(FunctionCatalog functionCatalog, FunctionInspector functionInspector,
		StreamFunctionProperties functionProperties, @Nullable BindableProxyFactory[] bindableProxyFactories,
		BindingServiceProperties serviceProperties, ConfigurableApplicationContext applicationContext,
		FunctionBindingRegistrar bindingHolder, StreamBridge streamBridge) {

	boolean shouldCreateInitializer = applicationContext.containsBean("output")
			|| ObjectUtils.isEmpty(applicationContext.getBeanNamesForAnnotation(EnableBinding.class));

	return shouldCreateInitializer
			? new FunctionToDestinationBinder(functionCatalog, functionProperties,
					serviceProperties, streamBridge)
					: null;
}
 
FunctionToDestinationBinder(FunctionCatalog functionCatalog, StreamFunctionProperties functionProperties,
		BindingServiceProperties serviceProperties, StreamBridge streamBridge) {
	this.functionCatalog = functionCatalog;
	this.functionProperties = functionProperties;
	this.serviceProperties = serviceProperties;
	this.streamBridge = streamBridge;
}
 
源代码21 项目: spring-cloud-stream   文件: BindingService.java
public BindingService(BindingServiceProperties bindingServiceProperties,
		BinderFactory binderFactory, TaskScheduler taskScheduler) {
	this.bindingServiceProperties = bindingServiceProperties;
	this.binderFactory = binderFactory;
	this.validator = new CustomValidatorBean();
	this.validator.afterPropertiesSet();
	this.taskScheduler = taskScheduler;
}
 
public MessageConverterConfigurer(BindingServiceProperties bindingServiceProperties,
		CompositeMessageConverter compositeMessageConverter, StreamFunctionProperties streamFunctionProperties) {
	Assert.notNull(compositeMessageConverter,
			"The message converter factory cannot be null");
	this.bindingServiceProperties = bindingServiceProperties;
	this.compositeMessageConverter = compositeMessageConverter;

	this.headersField = ReflectionUtils.findField(MessageHeaders.class, "headers");
	this.headersField.setAccessible(true);
	this.streamFunctionProperties = streamFunctionProperties;
}
 
@Test
@Ignore
public void testConfigureOutputChannelCannotConvert() {
	BindingServiceProperties props = new BindingServiceProperties();
	BindingProperties bindingProps = new BindingProperties();
	bindingProps.setContentType("foo/bar");
	props.setBindings(Collections.singletonMap("foo", bindingProps));
	MessageConverter converter = new AbstractMessageConverter(
			new MimeType("foo", "bar")) {

		@Override
		protected boolean supports(Class<?> clazz) {
			return true;
		}

		@Override
		protected Object convertToInternal(Object payload, MessageHeaders headers,
				Object conversionHint) {
			return null;
		}

	};
	CompositeMessageConverterFactory converterFactory = new CompositeMessageConverterFactory(
			Collections.<MessageConverter>singletonList(converter), null);
	MessageConverterConfigurer configurer = new MessageConverterConfigurer(props,
			converterFactory.getMessageConverterForAllRegistered());
	QueueChannel out = new QueueChannel();
	configurer.configureOutputChannel(out, "foo");
	try {
		out.send(new GenericMessage<Foo>(new Foo(),
				Collections.<String, Object>singletonMap(MessageHeaders.CONTENT_TYPE,
						"bad/ct")));
		fail("Expected MessageConversionException: " + out.receive(0));
	}
	catch (MessageConversionException e) {
		assertThat(e.getMessage())
				.endsWith("to the configured output type: 'foo/bar'");
	}
}
 
源代码24 项目: spring-cloud-stream   文件: BindingServiceTests.java
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testDefaultGroup() throws Exception {
	BindingServiceProperties properties = new BindingServiceProperties();
	Map<String, BindingProperties> bindingProperties = new HashMap<>();
	BindingProperties props = new BindingProperties();
	props.setDestination("foo");
	final String inputChannelName = "input";
	bindingProperties.put(inputChannelName, props);
	properties.setBindings(bindingProperties);
	DefaultBinderFactory binderFactory = createMockBinderFactory();
	Binder binder = binderFactory.getBinder("mock", MessageChannel.class);
	BindingService service = new BindingService(properties, binderFactory);
	MessageChannel inputChannel = new DirectChannel();
	Binding<MessageChannel> mockBinding = Mockito.mock(Binding.class);
	when(binder.bindConsumer(eq("foo"), isNull(), same(inputChannel),
			any(ConsumerProperties.class))).thenReturn(mockBinding);
	Collection<Binding<MessageChannel>> bindings = service.bindConsumer(inputChannel,
			inputChannelName);
	assertThat(bindings).hasSize(1);
	Binding<MessageChannel> binding = bindings.iterator().next();
	assertThat(binding).isSameAs(mockBinding);
	service.unbindConsumers(inputChannelName);
	verify(binder).bindConsumer(eq("foo"), isNull(), same(inputChannel),
			any(ConsumerProperties.class));
	verify(binding).unbind();
	binderFactory.destroy();
}
 
源代码25 项目: spring-cloud-stream   文件: BindingServiceTests.java
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testExplicitGroup() throws Exception {
	BindingServiceProperties properties = new BindingServiceProperties();
	Map<String, BindingProperties> bindingProperties = new HashMap<>();
	BindingProperties props = new BindingProperties();
	props.setDestination("foo");
	props.setGroup("fooGroup");
	final String inputChannelName = "input";
	bindingProperties.put(inputChannelName, props);
	properties.setBindings(bindingProperties);
	DefaultBinderFactory binderFactory = createMockBinderFactory();
	Binder binder = binderFactory.getBinder("mock", MessageChannel.class);
	BindingService service = new BindingService(properties, binderFactory);
	MessageChannel inputChannel = new DirectChannel();
	Binding<MessageChannel> mockBinding = Mockito.mock(Binding.class);
	when(binder.bindConsumer(eq("foo"), eq("fooGroup"), same(inputChannel),
			any(ConsumerProperties.class))).thenReturn(mockBinding);
	Collection<Binding<MessageChannel>> bindings = service.bindConsumer(inputChannel,
			inputChannelName);
	assertThat(bindings).hasSize(1);
	Binding<MessageChannel> binding = bindings.iterator().next();
	assertThat(binding).isSameAs(mockBinding);

	service.unbindConsumers(inputChannelName);
	verify(binder).bindConsumer(eq("foo"), eq(props.getGroup()), same(inputChannel),
			any(ConsumerProperties.class));
	verify(binding).unbind();
	binderFactory.destroy();
}
 
源代码26 项目: spring-cloud-stream   文件: BindingServiceTests.java
private BindingServiceProperties createBindingServiceProperties(
		HashMap<String, String> properties) {
	BindingServiceProperties bindingServiceProperties = new BindingServiceProperties();
	org.springframework.boot.context.properties.bind.Binder propertiesBinder;
	propertiesBinder = new org.springframework.boot.context.properties.bind.Binder(
			new MapConfigurationPropertySource(properties));
	propertiesBinder.bind("spring.cloud.stream",
			org.springframework.boot.context.properties.bind.Bindable
					.ofInstance(bindingServiceProperties));
	return bindingServiceProperties;
}
 
@Test
public void testConvertSimpler() {
	TestChannelBinder binder = createBinder();
	MessageConverterConfigurer configurer = this.context
			.getBean(MessageConverterConfigurer.class);
	BindingServiceProperties bsps = this.context
			.getBean(BindingServiceProperties.class);
	BindingProperties props = new BindingProperties();
	props.setContentType("text/plain");
	bsps.setBindings(Collections.singletonMap("foo", props));

	binder.setMessageSourceDelegate(() -> new GenericMessage<>("foo".getBytes()));
	DefaultPollableMessageSource pollableSource = new DefaultPollableMessageSource(
			this.messageConverter);
	configurer.configurePolledMessageSource(pollableSource, "foo");

	ExtendedConsumerProperties<Object> properties = new ExtendedConsumerProperties<>(
			null);
	properties.setMaxAttempts(1);
	properties.setBackOffInitialInterval(0);
	binder.bindPollableConsumer("foo", "bar", pollableSource, properties);
	final AtomicReference<Object> payload = new AtomicReference<>();
	assertThat(pollableSource.poll(received -> {
		payload.set(received.getPayload());
	}, new ParameterizedTypeReference<String>() {
	})).isTrue();
	assertThat(payload.get()).isInstanceOf(String.class);
	assertThat(payload.get()).isEqualTo("foo");
	// test the cache for coverage
	assertThat(pollableSource.poll(received -> {
		payload.set(received.getPayload());
	}, new ParameterizedTypeReference<String>() {
	})).isTrue();
	assertThat(payload.get()).isInstanceOf(String.class);
	assertThat(payload.get()).isEqualTo("foo");
}
 
源代码28 项目: spring-cloud-stream   文件: AbstractBinderTests.java
private MessageConverterConfigurer createConverterConfigurer(String channelName,
		BindingProperties bindingProperties) throws Exception {
	BindingServiceProperties bindingServiceProperties = new BindingServiceProperties();
	bindingServiceProperties.getBindings().put(channelName, bindingProperties);
	bindingServiceProperties.setApplicationContext(applicationContext);
	bindingServiceProperties.setConversionService(new DefaultConversionService());
	bindingServiceProperties.afterPropertiesSet();
	MessageConverterConfigurer messageConverterConfigurer = new MessageConverterConfigurer(
			bindingServiceProperties,
			new CompositeMessageConverterFactory(null, null).getMessageConverterForAllRegistered());
	messageConverterConfigurer.setBeanFactory(applicationContext.getBeanFactory());
	return messageConverterConfigurer;
}
 
private BusProperties setupBusAutoConfig(
		HashMap<String, BindingProperties> properties) {
	BindingServiceProperties serviceProperties = mock(BindingServiceProperties.class);
	when(serviceProperties.getBindings()).thenReturn(properties);

	BusProperties bus = new BusProperties();
	BusAutoConfiguration configuration = new BusAutoConfiguration(
			mock(ServiceMatcher.class), serviceProperties, bus);
	configuration.init();
	return bus;
}
 
KTableBoundElementFactory(BindingServiceProperties bindingServiceProperties,
						EncodingDecodingBindAdviceHandler encodingDecodingBindAdviceHandler) {
	super(KTable.class);
	this.bindingServiceProperties = bindingServiceProperties;
	this.encodingDecodingBindAdviceHandler = encodingDecodingBindAdviceHandler;
}