下面列出了org.springframework.validation.beanvalidation.CustomValidatorBean#org.springframework.cloud.stream.config.BindingServiceProperties 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public KafkaStreamsFunctionProcessor(BindingServiceProperties bindingServiceProperties,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
KeyValueSerdeResolver keyValueSerdeResolver,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate,
CleanupConfig cleanupConfig,
StreamFunctionProperties streamFunctionProperties,
KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
StreamsBuilderFactoryBeanCustomizer customizer, ConfigurableEnvironment environment) {
super(bindingServiceProperties, kafkaStreamsBindingInformationCatalogue, kafkaStreamsExtendedBindingProperties,
keyValueSerdeResolver, cleanupConfig);
this.bindingServiceProperties = bindingServiceProperties;
this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
this.keyValueSerdeResolver = keyValueSerdeResolver;
this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
this.kafkaStreamsMessageConversionDelegate = kafkaStreamsMessageConversionDelegate;
this.streamFunctionProperties = streamFunctionProperties;
this.kafkaStreamsBinderConfigurationProperties = kafkaStreamsBinderConfigurationProperties;
this.customizer = customizer;
this.environment = environment;
}
private static Map<String, BinderConfiguration> getBinderConfigurations(
BindingServiceProperties properties) {
Map<String, BinderConfiguration> binderConfigurations = new HashMap<>();
Map<String, BinderProperties> declaredBinders = properties.getBinders();
for (Map.Entry<String, BinderProperties> binderEntry : declaredBinders
.entrySet()) {
BinderProperties binderProperties = binderEntry.getValue();
binderConfigurations.put(binderEntry.getKey(),
new BinderConfiguration(binderProperties.getType(),
binderProperties.getEnvironment(),
binderProperties.isInheritEnvironment(),
binderProperties.isDefaultCandidate()));
}
return binderConfigurations;
}
@Bean
public KafkaStreamsStreamListenerSetupMethodOrchestrator kafkaStreamsStreamListenerSetupMethodOrchestrator(
BindingServiceProperties bindingServiceProperties,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
KeyValueSerdeResolver keyValueSerdeResolver,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
KStreamStreamListenerParameterAdapter kafkaStreamListenerParameterAdapter,
Collection<StreamListenerResultAdapter> streamListenerResultAdapters,
ObjectProvider<CleanupConfig> cleanupConfig,
ObjectProvider<StreamsBuilderFactoryBeanCustomizer> customizerProvider, ConfigurableEnvironment environment) {
return new KafkaStreamsStreamListenerSetupMethodOrchestrator(
bindingServiceProperties, kafkaStreamsExtendedBindingProperties,
keyValueSerdeResolver, kafkaStreamsBindingInformationCatalogue,
kafkaStreamListenerParameterAdapter, streamListenerResultAdapters,
cleanupConfig.getIfUnique(), customizerProvider.getIfUnique(), environment);
}
@Bean
@Conditional(FunctionDetectorCondition.class)
public KafkaStreamsFunctionProcessor kafkaStreamsFunctionProcessor(BindingServiceProperties bindingServiceProperties,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
KeyValueSerdeResolver keyValueSerdeResolver,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate,
ObjectProvider<CleanupConfig> cleanupConfig,
StreamFunctionProperties streamFunctionProperties,
@Qualifier("binderConfigurationProperties") KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
ObjectProvider<StreamsBuilderFactoryBeanCustomizer> customizerProvider, ConfigurableEnvironment environment) {
return new KafkaStreamsFunctionProcessor(bindingServiceProperties, kafkaStreamsExtendedBindingProperties,
keyValueSerdeResolver, kafkaStreamsBindingInformationCatalogue, kafkaStreamsMessageConversionDelegate,
cleanupConfig.getIfUnique(), streamFunctionProperties, kafkaStreamsBinderConfigurationProperties,
customizerProvider.getIfUnique(), environment);
}
KafkaStreamsStreamListenerSetupMethodOrchestrator(
BindingServiceProperties bindingServiceProperties,
KafkaStreamsExtendedBindingProperties extendedBindingProperties,
KeyValueSerdeResolver keyValueSerdeResolver,
KafkaStreamsBindingInformationCatalogue bindingInformationCatalogue,
StreamListenerParameterAdapter streamListenerParameterAdapter,
Collection<StreamListenerResultAdapter> listenerResultAdapters,
CleanupConfig cleanupConfig,
StreamsBuilderFactoryBeanCustomizer customizer,
ConfigurableEnvironment environment) {
super(bindingServiceProperties, bindingInformationCatalogue, extendedBindingProperties, keyValueSerdeResolver, cleanupConfig);
this.bindingServiceProperties = bindingServiceProperties;
this.kafkaStreamsExtendedBindingProperties = extendedBindingProperties;
this.keyValueSerdeResolver = keyValueSerdeResolver;
this.kafkaStreamsBindingInformationCatalogue = bindingInformationCatalogue;
this.streamListenerParameterAdapter = streamListenerParameterAdapter;
this.streamListenerResultAdapters = listenerResultAdapters;
this.customizer = customizer;
this.environment = environment;
}
/**
*
* @param functionCatalog instance of {@link FunctionCatalog}
* @param functionRegistry instance of {@link FunctionRegistry}
* @param bindingServiceProperties instance of {@link BindingServiceProperties}
* @param applicationContext instance of {@link ConfigurableApplicationContext}
*/
@SuppressWarnings("serial")
StreamBridge(FunctionCatalog functionCatalog, FunctionRegistry functionRegistry,
BindingServiceProperties bindingServiceProperties, ConfigurableApplicationContext applicationContext) {
this.functionCatalog = functionCatalog;
this.functionRegistry = functionRegistry;
this.applicationContext = applicationContext;
this.bindingServiceProperties = bindingServiceProperties;
this.channelCache = new LinkedHashMap<String, SubscribableChannel>() {
@Override
protected boolean removeEldestEntry(Map.Entry<String, SubscribableChannel> eldest) {
boolean remove = size() > bindingServiceProperties.getDynamicDestinationCacheSize();
if (remove && logger.isDebugEnabled()) {
logger.debug("Removing message channel from cache " + eldest.getKey());
}
return remove;
}
};
}
@Test
public void testEmptyConfiguration() {
TestChannelBinderConfiguration.applicationContextRunner(SampleConfiguration.class)
.withPropertyValues(
"spring.jmx.enabled=false",
"spring.cloud.stream.bindings.fooDestination.producer.partitionKeyExtractorName=keyExtractor")
.run(context -> {
InputDestination input = context.getBean(InputDestination.class);
input.send(new GenericMessage<String>("fooDestination"));
BindingServiceProperties serviceProperties = context.getBean(BindingServiceProperties.class);
assertThat("keyExtractor").isEqualTo(
serviceProperties.getProducerProperties("fooDestination").getPartitionKeyExtractorName());
OutputDestination output = context.getBean(OutputDestination.class);
assertThat(output.receive(1000).getPayload()).isEqualTo("fooDestination".getBytes());
});
}
public void testConfigureOutputChannelWithBadContentType() {
BindingServiceProperties props = new BindingServiceProperties();
BindingProperties bindingProps = new BindingProperties();
bindingProps.setContentType("application/json");
props.setBindings(Collections.singletonMap("foo", bindingProps));
CompositeMessageConverterFactory converterFactory = new CompositeMessageConverterFactory(
Collections.<MessageConverter>emptyList(), null);
MessageConverterConfigurer configurer = new MessageConverterConfigurer(props,
converterFactory.getMessageConverterForAllRegistered());
QueueChannel out = new QueueChannel();
configurer.configureOutputChannel(out, "foo");
out.send(new GenericMessage<Foo>(new Foo(), Collections
.<String, Object>singletonMap(MessageHeaders.CONTENT_TYPE, "bad/ct")));
Message<?> received = out.receive(0);
assertThat(received).isNotNull();
assertThat(received.getPayload()).isInstanceOf(Foo.class);
}
@Test
public void testProducerPropertiesValidation() {
BindingServiceProperties serviceProperties = new BindingServiceProperties();
Map<String, BindingProperties> bindingProperties = new HashMap<>();
BindingProperties props = new BindingProperties();
ProducerProperties producerProperties = new ProducerProperties();
producerProperties.setPartitionCount(0);
props.setDestination("foo");
props.setProducer(producerProperties);
final String outputChannelName = "output";
bindingProperties.put(outputChannelName, props);
serviceProperties.setBindings(bindingProperties);
DefaultBinderFactory binderFactory = createMockBinderFactory();
BindingService service = new BindingService(serviceProperties, binderFactory);
MessageChannel outputChannel = new DirectChannel();
try {
service.bindProducer(outputChannel, outputChannelName);
fail("Producer properties should be validated.");
}
catch (IllegalStateException e) {
assertThat(e)
.hasMessageContaining("Partition count should be greater than zero.");
}
}
@Test
public void testConsumerPropertiesValidation() {
BindingServiceProperties serviceProperties = new BindingServiceProperties();
Map<String, BindingProperties> bindingProperties = new HashMap<>();
BindingProperties props = new BindingProperties();
ConsumerProperties consumerProperties = new ConsumerProperties();
consumerProperties.setConcurrency(0);
props.setDestination("foo");
props.setConsumer(consumerProperties);
final String inputChannelName = "input";
bindingProperties.put(inputChannelName, props);
serviceProperties.setBindings(bindingProperties);
DefaultBinderFactory binderFactory = createMockBinderFactory();
BindingService service = new BindingService(serviceProperties, binderFactory);
MessageChannel inputChannel = new DirectChannel();
try {
service.bindConsumer(inputChannel, inputChannelName);
fail("Consumer properties should be validated.");
}
catch (IllegalStateException e) {
assertThat(e)
.hasMessageContaining("Concurrency should be greater than zero.");
}
}
@Test
public void testUnknownBinderOnBindingFailure() {
HashMap<String, String> properties = new HashMap<>();
properties.put("spring.cloud.stream.bindings.input.destination", "fooInput");
properties.put("spring.cloud.stream.bindings.input.binder", "mock");
properties.put("spring.cloud.stream.bindings.output.destination", "fooOutput");
properties.put("spring.cloud.stream.bindings.output.binder", "mockError");
BindingServiceProperties bindingServiceProperties = createBindingServiceProperties(
properties);
BindingService bindingService = new BindingService(bindingServiceProperties,
createMockBinderFactory());
bindingService.bindConsumer(new DirectChannel(), "input");
try {
bindingService.bindProducer(new DirectChannel(), "output");
fail("Expected 'Unknown binder configuration'");
}
catch (IllegalStateException e) {
assertThat(e).hasMessageContaining("Unknown binder configuration: mockError");
}
}
@Test
public void testUnrecognizedBinderAllowedIfNotUsed() {
HashMap<String, String> properties = new HashMap<>();
properties.put("spring.cloud.stream.bindings.input.destination", "fooInput");
properties.put("spring.cloud.stream.bindings.output.destination", "fooOutput");
properties.put("spring.cloud.stream.defaultBinder", "mock1");
properties.put("spring.cloud.stream.binders.mock1.type", "mock");
properties.put("spring.cloud.stream.binders.kafka1.type", "kafka");
BindingServiceProperties bindingServiceProperties = createBindingServiceProperties(
properties);
BinderFactory binderFactory = new BindingServiceConfiguration()
.binderFactory(createMockBinderTypeRegistry(), bindingServiceProperties);
BindingService bindingService = new BindingService(bindingServiceProperties,
binderFactory);
bindingService.bindConsumer(new DirectChannel(), "input");
bindingService.bindProducer(new DirectChannel(), "output");
}
@Test
public void testUnrecognizedBinderDisallowedIfUsed() {
HashMap<String, String> properties = new HashMap<>();
properties.put("spring.cloud.stream.bindings.input.destination", "fooInput");
properties.put("spring.cloud.stream.bindings.input.binder", "mock1");
properties.put("spring.cloud.stream.bindings.output.destination", "fooOutput");
properties.put("spring.cloud.stream.bindings.output.type", "kafka1");
properties.put("spring.cloud.stream.binders.mock1.type", "mock");
properties.put("spring.cloud.stream.binders.kafka1.type", "kafka");
BindingServiceProperties bindingServiceProperties = createBindingServiceProperties(
properties);
BinderFactory binderFactory = new BindingServiceConfiguration()
.binderFactory(createMockBinderTypeRegistry(), bindingServiceProperties);
BindingService bindingService = new BindingService(bindingServiceProperties,
binderFactory);
bindingService.bindConsumer(new DirectChannel(), "input");
try {
bindingService.bindProducer(new DirectChannel(), "output");
fail("Expected 'Unknown binder configuration'");
}
catch (IllegalArgumentException e) {
assertThat(e).hasMessageContaining("Binder type kafka is not defined");
}
}
@SuppressWarnings("unchecked")
@Before
public void setupContext() throws Exception {
this.context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
BinderAwareChannelResolverTests.InterceptorConfiguration.class))
.web(WebApplicationType.NONE).run();
this.resolver = this.context.getBean(BinderAwareChannelResolver.class);
this.binder = this.context.getBean(Binder.class);
this.bindingServiceProperties = this.context
.getBean(BindingServiceProperties.class);
this.bindingTargetFactory = this.context
.getBean(SubscribableChannelBindingTargetFactory.class);
}
KStreamBoundElementFactory(BindingServiceProperties bindingServiceProperties,
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue,
EncodingDecodingBindAdviceHandler encodingDecodingBindAdviceHandler) {
super(KStream.class);
this.bindingServiceProperties = bindingServiceProperties;
this.kafkaStreamsBindingInformationCatalogue = KafkaStreamsBindingInformationCatalogue;
this.encodingDecodingBindAdviceHandler = encodingDecodingBindAdviceHandler;
}
public AbstractKafkaStreamsBinderProcessor(BindingServiceProperties bindingServiceProperties,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
KeyValueSerdeResolver keyValueSerdeResolver, CleanupConfig cleanupConfig) {
this.bindingServiceProperties = bindingServiceProperties;
this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
this.keyValueSerdeResolver = keyValueSerdeResolver;
this.cleanupConfig = cleanupConfig;
}
@Bean
@ConfigurationProperties(prefix = "spring.cloud.stream.kafka.streams.binder")
public KafkaStreamsBinderConfigurationProperties binderConfigurationProperties(
KafkaProperties kafkaProperties, ConfigurableEnvironment environment,
BindingServiceProperties properties, ConfigurableApplicationContext context) throws Exception {
final Map<String, BinderConfiguration> binderConfigurations = getBinderConfigurations(
properties);
for (Map.Entry<String, BinderConfiguration> entry : binderConfigurations
.entrySet()) {
final BinderConfiguration binderConfiguration = entry.getValue();
final String binderType = binderConfiguration.getBinderType();
if (binderType != null && (binderType.equals(KSTREAM_BINDER_TYPE)
|| binderType.equals(KTABLE_BINDER_TYPE)
|| binderType.equals(GLOBALKTABLE_BINDER_TYPE))) {
Map<String, Object> binderProperties = new HashMap<>();
this.flatten(null, binderConfiguration.getProperties(), binderProperties);
environment.getPropertySources().addFirst(
new MapPropertySource(entry.getKey() + "-kafkaStreamsBinderEnv", binderProperties));
Binder binder = new Binder(ConfigurationPropertySources.get(environment),
new PropertySourcesPlaceholdersResolver(environment),
IntegrationUtils.getConversionService(context.getBeanFactory()), null);
final Constructor<KafkaStreamsBinderConfigurationProperties> kafkaStreamsBinderConfigurationPropertiesConstructor =
ReflectionUtils.accessibleConstructor(KafkaStreamsBinderConfigurationProperties.class, KafkaProperties.class);
final KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties =
BeanUtils.instantiateClass(kafkaStreamsBinderConfigurationPropertiesConstructor, kafkaProperties);
final BindResult<KafkaStreamsBinderConfigurationProperties> bind = binder.bind("spring.cloud.stream.kafka.streams.binder", Bindable.ofInstance(kafkaStreamsBinderConfigurationProperties));
context.getBeanFactory().registerSingleton(
entry.getKey() + "-KafkaStreamsBinderConfigurationProperties",
bind.get());
}
}
return new KafkaStreamsBinderConfigurationProperties(kafkaProperties);
}
@Bean
public KStreamBoundElementFactory kStreamBoundElementFactory(
BindingServiceProperties bindingServiceProperties,
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue,
EncodingDecodingBindAdviceHandler encodingDecodingBindAdviceHandler) {
return new KStreamBoundElementFactory(bindingServiceProperties,
KafkaStreamsBindingInformationCatalogue, encodingDecodingBindAdviceHandler);
}
@Bean
public InitializingBean functionInitializer(FunctionCatalog functionCatalog, FunctionInspector functionInspector,
StreamFunctionProperties functionProperties, @Nullable BindableProxyFactory[] bindableProxyFactories,
BindingServiceProperties serviceProperties, ConfigurableApplicationContext applicationContext,
FunctionBindingRegistrar bindingHolder, StreamBridge streamBridge) {
boolean shouldCreateInitializer = applicationContext.containsBean("output")
|| ObjectUtils.isEmpty(applicationContext.getBeanNamesForAnnotation(EnableBinding.class));
return shouldCreateInitializer
? new FunctionToDestinationBinder(functionCatalog, functionProperties,
serviceProperties, streamBridge)
: null;
}
FunctionToDestinationBinder(FunctionCatalog functionCatalog, StreamFunctionProperties functionProperties,
BindingServiceProperties serviceProperties, StreamBridge streamBridge) {
this.functionCatalog = functionCatalog;
this.functionProperties = functionProperties;
this.serviceProperties = serviceProperties;
this.streamBridge = streamBridge;
}
public BindingService(BindingServiceProperties bindingServiceProperties,
BinderFactory binderFactory, TaskScheduler taskScheduler) {
this.bindingServiceProperties = bindingServiceProperties;
this.binderFactory = binderFactory;
this.validator = new CustomValidatorBean();
this.validator.afterPropertiesSet();
this.taskScheduler = taskScheduler;
}
public MessageConverterConfigurer(BindingServiceProperties bindingServiceProperties,
CompositeMessageConverter compositeMessageConverter, StreamFunctionProperties streamFunctionProperties) {
Assert.notNull(compositeMessageConverter,
"The message converter factory cannot be null");
this.bindingServiceProperties = bindingServiceProperties;
this.compositeMessageConverter = compositeMessageConverter;
this.headersField = ReflectionUtils.findField(MessageHeaders.class, "headers");
this.headersField.setAccessible(true);
this.streamFunctionProperties = streamFunctionProperties;
}
@Test
@Ignore
public void testConfigureOutputChannelCannotConvert() {
BindingServiceProperties props = new BindingServiceProperties();
BindingProperties bindingProps = new BindingProperties();
bindingProps.setContentType("foo/bar");
props.setBindings(Collections.singletonMap("foo", bindingProps));
MessageConverter converter = new AbstractMessageConverter(
new MimeType("foo", "bar")) {
@Override
protected boolean supports(Class<?> clazz) {
return true;
}
@Override
protected Object convertToInternal(Object payload, MessageHeaders headers,
Object conversionHint) {
return null;
}
};
CompositeMessageConverterFactory converterFactory = new CompositeMessageConverterFactory(
Collections.<MessageConverter>singletonList(converter), null);
MessageConverterConfigurer configurer = new MessageConverterConfigurer(props,
converterFactory.getMessageConverterForAllRegistered());
QueueChannel out = new QueueChannel();
configurer.configureOutputChannel(out, "foo");
try {
out.send(new GenericMessage<Foo>(new Foo(),
Collections.<String, Object>singletonMap(MessageHeaders.CONTENT_TYPE,
"bad/ct")));
fail("Expected MessageConversionException: " + out.receive(0));
}
catch (MessageConversionException e) {
assertThat(e.getMessage())
.endsWith("to the configured output type: 'foo/bar'");
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testDefaultGroup() throws Exception {
BindingServiceProperties properties = new BindingServiceProperties();
Map<String, BindingProperties> bindingProperties = new HashMap<>();
BindingProperties props = new BindingProperties();
props.setDestination("foo");
final String inputChannelName = "input";
bindingProperties.put(inputChannelName, props);
properties.setBindings(bindingProperties);
DefaultBinderFactory binderFactory = createMockBinderFactory();
Binder binder = binderFactory.getBinder("mock", MessageChannel.class);
BindingService service = new BindingService(properties, binderFactory);
MessageChannel inputChannel = new DirectChannel();
Binding<MessageChannel> mockBinding = Mockito.mock(Binding.class);
when(binder.bindConsumer(eq("foo"), isNull(), same(inputChannel),
any(ConsumerProperties.class))).thenReturn(mockBinding);
Collection<Binding<MessageChannel>> bindings = service.bindConsumer(inputChannel,
inputChannelName);
assertThat(bindings).hasSize(1);
Binding<MessageChannel> binding = bindings.iterator().next();
assertThat(binding).isSameAs(mockBinding);
service.unbindConsumers(inputChannelName);
verify(binder).bindConsumer(eq("foo"), isNull(), same(inputChannel),
any(ConsumerProperties.class));
verify(binding).unbind();
binderFactory.destroy();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testExplicitGroup() throws Exception {
BindingServiceProperties properties = new BindingServiceProperties();
Map<String, BindingProperties> bindingProperties = new HashMap<>();
BindingProperties props = new BindingProperties();
props.setDestination("foo");
props.setGroup("fooGroup");
final String inputChannelName = "input";
bindingProperties.put(inputChannelName, props);
properties.setBindings(bindingProperties);
DefaultBinderFactory binderFactory = createMockBinderFactory();
Binder binder = binderFactory.getBinder("mock", MessageChannel.class);
BindingService service = new BindingService(properties, binderFactory);
MessageChannel inputChannel = new DirectChannel();
Binding<MessageChannel> mockBinding = Mockito.mock(Binding.class);
when(binder.bindConsumer(eq("foo"), eq("fooGroup"), same(inputChannel),
any(ConsumerProperties.class))).thenReturn(mockBinding);
Collection<Binding<MessageChannel>> bindings = service.bindConsumer(inputChannel,
inputChannelName);
assertThat(bindings).hasSize(1);
Binding<MessageChannel> binding = bindings.iterator().next();
assertThat(binding).isSameAs(mockBinding);
service.unbindConsumers(inputChannelName);
verify(binder).bindConsumer(eq("foo"), eq(props.getGroup()), same(inputChannel),
any(ConsumerProperties.class));
verify(binding).unbind();
binderFactory.destroy();
}
private BindingServiceProperties createBindingServiceProperties(
HashMap<String, String> properties) {
BindingServiceProperties bindingServiceProperties = new BindingServiceProperties();
org.springframework.boot.context.properties.bind.Binder propertiesBinder;
propertiesBinder = new org.springframework.boot.context.properties.bind.Binder(
new MapConfigurationPropertySource(properties));
propertiesBinder.bind("spring.cloud.stream",
org.springframework.boot.context.properties.bind.Bindable
.ofInstance(bindingServiceProperties));
return bindingServiceProperties;
}
@Test
public void testConvertSimpler() {
TestChannelBinder binder = createBinder();
MessageConverterConfigurer configurer = this.context
.getBean(MessageConverterConfigurer.class);
BindingServiceProperties bsps = this.context
.getBean(BindingServiceProperties.class);
BindingProperties props = new BindingProperties();
props.setContentType("text/plain");
bsps.setBindings(Collections.singletonMap("foo", props));
binder.setMessageSourceDelegate(() -> new GenericMessage<>("foo".getBytes()));
DefaultPollableMessageSource pollableSource = new DefaultPollableMessageSource(
this.messageConverter);
configurer.configurePolledMessageSource(pollableSource, "foo");
ExtendedConsumerProperties<Object> properties = new ExtendedConsumerProperties<>(
null);
properties.setMaxAttempts(1);
properties.setBackOffInitialInterval(0);
binder.bindPollableConsumer("foo", "bar", pollableSource, properties);
final AtomicReference<Object> payload = new AtomicReference<>();
assertThat(pollableSource.poll(received -> {
payload.set(received.getPayload());
}, new ParameterizedTypeReference<String>() {
})).isTrue();
assertThat(payload.get()).isInstanceOf(String.class);
assertThat(payload.get()).isEqualTo("foo");
// test the cache for coverage
assertThat(pollableSource.poll(received -> {
payload.set(received.getPayload());
}, new ParameterizedTypeReference<String>() {
})).isTrue();
assertThat(payload.get()).isInstanceOf(String.class);
assertThat(payload.get()).isEqualTo("foo");
}
private MessageConverterConfigurer createConverterConfigurer(String channelName,
BindingProperties bindingProperties) throws Exception {
BindingServiceProperties bindingServiceProperties = new BindingServiceProperties();
bindingServiceProperties.getBindings().put(channelName, bindingProperties);
bindingServiceProperties.setApplicationContext(applicationContext);
bindingServiceProperties.setConversionService(new DefaultConversionService());
bindingServiceProperties.afterPropertiesSet();
MessageConverterConfigurer messageConverterConfigurer = new MessageConverterConfigurer(
bindingServiceProperties,
new CompositeMessageConverterFactory(null, null).getMessageConverterForAllRegistered());
messageConverterConfigurer.setBeanFactory(applicationContext.getBeanFactory());
return messageConverterConfigurer;
}
private BusProperties setupBusAutoConfig(
HashMap<String, BindingProperties> properties) {
BindingServiceProperties serviceProperties = mock(BindingServiceProperties.class);
when(serviceProperties.getBindings()).thenReturn(properties);
BusProperties bus = new BusProperties();
BusAutoConfiguration configuration = new BusAutoConfiguration(
mock(ServiceMatcher.class), serviceProperties, bus);
configuration.init();
return bus;
}
KTableBoundElementFactory(BindingServiceProperties bindingServiceProperties,
EncodingDecodingBindAdviceHandler encodingDecodingBindAdviceHandler) {
super(KTable.class);
this.bindingServiceProperties = bindingServiceProperties;
this.encodingDecodingBindAdviceHandler = encodingDecodingBindAdviceHandler;
}