下面列出了org.springframework.boot.test.context.FilteredClassLoader#org.springframework.cloud.stream.binder.Binding 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
protected Binding<KStream<Object, Object>> doBindConsumer(String name, String group,
KStream<Object, Object> inputTarget,
ExtendedConsumerProperties<KafkaStreamsConsumerProperties> properties) {
KStream<Object, Object> delegate = ((KStreamBoundElementFactory.KStreamWrapperHandler)
((Advised) inputTarget).getAdvisors()[0].getAdvice()).getDelegate();
this.kafkaStreamsBindingInformationCatalogue.registerConsumerProperties(delegate, properties.getExtension());
if (!StringUtils.hasText(group)) {
group = properties.getExtension().getApplicationId();
}
KafkaStreamsBinderUtils.prepareConsumerBinding(name, group,
getApplicationContext(), this.kafkaTopicProvisioner,
this.binderConfigurationProperties, properties);
return new DefaultBinding<>(name, group, inputTarget, null);
}
public <T> Binding<T> doBindProducer(T output, String bindingTarget,
Binder<T, ?, ProducerProperties> binder,
ProducerProperties producerProperties) {
if (this.taskScheduler == null
|| this.bindingServiceProperties.getBindingRetryInterval() <= 0) {
return binder.bindProducer(bindingTarget, output, producerProperties);
}
else {
try {
return binder.bindProducer(bindingTarget, output, producerProperties);
}
catch (RuntimeException e) {
LateBinding<T> late = new LateBinding<T>(bindingTarget,
e.getCause() == null ? e.toString() : e.getCause().getMessage(), producerProperties, false);
rescheduleProducerBinding(output, bindingTarget, binder,
producerProperties, late, e);
return late;
}
}
}
@Override
public Collection<Binding<Object>> createAndBindInputs(
BindingService bindingService) {
List<Binding<Object>> bindings = new ArrayList<>();
if (log.isDebugEnabled()) {
log.debug(
String.format("Binding inputs for %s:%s", this.namespace, this.type));
}
for (Map.Entry<String, BoundTargetHolder> boundTargetHolderEntry : this.inputHolders
.entrySet()) {
String inputTargetName = boundTargetHolderEntry.getKey();
BoundTargetHolder boundTargetHolder = boundTargetHolderEntry.getValue();
if (boundTargetHolder.isBindable()) {
if (log.isDebugEnabled()) {
log.debug(String.format("Binding %s:%s:%s", this.namespace, this.type,
inputTargetName));
}
bindings.addAll(bindingService.bindConsumer(
boundTargetHolder.getBoundTarget(), inputTargetName));
}
}
return bindings;
}
@Test
@SuppressWarnings("unchecked")
public void testSyncProducerMetadata() throws Exception {
Binder binder = getBinder(createConfigurationProperties());
DirectChannel output = new DirectChannel();
String testTopicName = UUID.randomUUID().toString();
ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
properties.getExtension().setSync(true);
Binding<MessageChannel> producerBinding = binder.bindProducer(testTopicName,
output, properties);
DirectFieldAccessor accessor = new DirectFieldAccessor(
extractEndpoint(producerBinding));
KafkaProducerMessageHandler wrappedInstance = (KafkaProducerMessageHandler) accessor
.getWrappedInstance();
assertThat(new DirectFieldAccessor(wrappedInstance).getPropertyValue("sync")
.equals(Boolean.TRUE))
.withFailMessage("Kafka Sync Producer should have been enabled.");
producerBinding.unbind();
}
@Test
@SuppressWarnings("unchecked")
public void testSendTimeoutExpressionProducerMetadata() throws Exception {
Binder binder = getBinder(createConfigurationProperties());
DirectChannel output = new DirectChannel();
String testTopicName = UUID.randomUUID().toString();
ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
properties.getExtension().setSync(true);
SpelExpressionParser parser = new SpelExpressionParser();
Expression sendTimeoutExpression = parser.parseExpression("5000");
properties.getExtension().setSendTimeoutExpression(sendTimeoutExpression);
Binding<MessageChannel> producerBinding = binder.bindProducer(testTopicName,
output, properties);
DirectFieldAccessor accessor = new DirectFieldAccessor(
extractEndpoint(producerBinding));
KafkaProducerMessageHandler wrappedInstance = (KafkaProducerMessageHandler) accessor
.getWrappedInstance();
assertThat(new DirectFieldAccessor(wrappedInstance).getPropertyValue("sendTimeoutExpression")
.equals(sendTimeoutExpression));
producerBinding.unbind();
}
@Test
@SuppressWarnings("unchecked")
public void testAutoConfigureTopicsDisabledSucceedsIfTopicExisting()
throws Throwable {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
String testTopicName = "existing" + System.currentTimeMillis();
invokeCreateTopic(testTopicName, 5, 1);
configurationProperties.setAutoCreateTopics(false);
Binder binder = getBinder(configurationProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
DirectChannel input = createBindableChannel("input",
createConsumerBindingProperties(consumerProperties));
Binding<MessageChannel> binding = binder.bindConsumer(testTopicName, "test",
input, consumerProperties);
binding.unbind();
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public <T> Binding<T> doBindPollableConsumer(T input, String inputName,
Binder<T, ConsumerProperties, ?> binder,
ConsumerProperties consumerProperties, String target) {
if (this.taskScheduler == null
|| this.bindingServiceProperties.getBindingRetryInterval() <= 0) {
return ((PollableConsumerBinder) binder).bindPollableConsumer(target,
this.bindingServiceProperties.getGroup(inputName),
(PollableSource) input, consumerProperties);
}
else {
try {
return ((PollableConsumerBinder) binder).bindPollableConsumer(target,
this.bindingServiceProperties.getGroup(inputName),
(PollableSource) input, consumerProperties);
}
catch (RuntimeException e) {
LateBinding<T> late = new LateBinding<T>(target,
e.getCause() == null ? e.toString() : e.getCause().getMessage(), consumerProperties, true);
reschedulePollableConsumerBinding(input, inputName, binder,
consumerProperties, target, late, e);
return late;
}
}
}
@Test
@SuppressWarnings("unchecked")
public void testAutoAddPartitionsDisabledSucceedsIfTopicUnderPartitionedAndAutoRebalanceEnabled()
throws Throwable {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
String testTopicName = "existing" + System.currentTimeMillis();
invokeCreateTopic(testTopicName, 1, 1);
configurationProperties.setAutoAddPartitions(false);
Binder binder = getBinder(configurationProperties);
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
DirectChannel input = createBindableChannel("input",
createConsumerBindingProperties(consumerProperties));
// this consumer must consume from partition 2
consumerProperties.setInstanceCount(3);
consumerProperties.setInstanceIndex(2);
Binding binding = binder.bindConsumer(testTopicName, "test", input,
consumerProperties);
binding.unbind();
assertThat(invokePartitionSize(testTopicName)).isEqualTo(1);
}
@Test
@SuppressWarnings("unchecked")
public void testPartitionCountNotReduced() throws Throwable {
String testTopicName = "existing" + System.currentTimeMillis();
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
invokeCreateTopic(testTopicName, 6, 1);
configurationProperties.setAutoAddPartitions(true);
Binder binder = getBinder(configurationProperties);
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
DirectChannel input = createBindableChannel("input",
createConsumerBindingProperties(consumerProperties));
Binding<?> binding = binder.bindConsumer(testTopicName, "test", input,
consumerProperties);
binding.unbind();
assertThat(partitionSize(testTopicName)).isEqualTo(6);
}
@Test
public void testAnonWithBuiltInExchange() throws Exception {
RabbitTestBinder binder = getBinder();
ExtendedConsumerProperties<RabbitConsumerProperties> properties = createConsumerProperties();
properties.getExtension().setDeclareExchange(false);
properties.getExtension().setQueueNameGroupOnly(true);
Binding<MessageChannel> consumerBinding = binder.bindConsumer("amq.topic", null,
createBindableChannel("input", new BindingProperties()), properties);
Lifecycle endpoint = extractEndpoint(consumerBinding);
SimpleMessageListenerContainer container = TestUtils.getPropertyValue(endpoint,
"messageListenerContainer", SimpleMessageListenerContainer.class);
String queueName = container.getQueueNames()[0];
assertThat(queueName).startsWith("anonymous.");
assertThat(container.isRunning()).isTrue();
consumerBinding.unbind();
assertThat(container.isRunning()).isFalse();
}
@Test
public void testPolledConsumer() throws Exception {
RabbitTestBinder binder = getBinder();
PollableSource<MessageHandler> inboundBindTarget = new DefaultPollableMessageSource(
this.messageConverter);
Binding<PollableSource<MessageHandler>> binding = binder.bindPollableConsumer(
"pollable", "group", inboundBindTarget, createConsumerProperties());
RabbitTemplate template = new RabbitTemplate(
this.rabbitAvailableRule.getResource());
template.convertAndSend("pollable.group", "testPollable");
boolean polled = inboundBindTarget.poll(m -> {
assertThat(m.getPayload()).isEqualTo("testPollable");
});
int n = 0;
while (n++ < 100 && !polled) {
polled = inboundBindTarget.poll(m -> {
assertThat(m.getPayload()).isEqualTo("testPollable");
});
}
assertThat(polled).isTrue();
binding.unbind();
}
@Test
public void testAutoCreateStreamForNonExistingStream() throws Exception {
KinesisTestBinder binder = getBinder();
DirectChannel output = createBindableChannel("output", new BindingProperties());
ExtendedConsumerProperties<KinesisConsumerProperties> consumerProperties = createConsumerProperties();
Date testDate = new Date();
consumerProperties.getExtension().setShardIteratorType(
ShardIteratorType.AT_TIMESTAMP.name() + ":" + testDate.getTime());
String testStreamName = "nonexisting" + System.currentTimeMillis();
Binding<?> binding = binder.bindConsumer(testStreamName, "test", output,
consumerProperties);
binding.unbind();
DescribeStreamResult streamResult = AMAZON_KINESIS.describeStream(testStreamName);
String createdStreamName = streamResult.getStreamDescription().getStreamName();
int createdShards = streamResult.getStreamDescription().getShards().size();
String createdStreamStatus = streamResult.getStreamDescription()
.getStreamStatus();
assertThat(createdStreamName).isEqualTo(testStreamName);
assertThat(createdShards).isEqualTo(consumerProperties.getInstanceCount()
* consumerProperties.getConcurrency());
assertThat(createdStreamStatus).isEqualTo(StreamStatus.ACTIVE.toString());
KinesisShardOffset shardOffset = TestUtils.getPropertyValue(binding,
"lifecycle.streamInitialSequence", KinesisShardOffset.class);
assertThat(shardOffset.getIteratorType())
.isEqualTo(ShardIteratorType.AT_TIMESTAMP);
assertThat(shardOffset.getTimestamp()).isEqualTo(testDate);
}
public void unbindProducers(String outputName) {
Binding<?> binding = this.producerBindings.remove(outputName);
if (binding != null) {
binding.stop();
//then
binding.unbind();
}
else if (this.log.isWarnEnabled()) {
this.log.warn("Trying to unbind '" + outputName + "', but no binding found.");
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testDefaultGroup() throws Exception {
BindingServiceProperties properties = new BindingServiceProperties();
Map<String, BindingProperties> bindingProperties = new HashMap<>();
BindingProperties props = new BindingProperties();
props.setDestination("foo");
final String inputChannelName = "input";
bindingProperties.put(inputChannelName, props);
properties.setBindings(bindingProperties);
DefaultBinderFactory binderFactory = createMockBinderFactory();
Binder binder = binderFactory.getBinder("mock", MessageChannel.class);
BindingService service = new BindingService(properties, binderFactory);
MessageChannel inputChannel = new DirectChannel();
Binding<MessageChannel> mockBinding = Mockito.mock(Binding.class);
when(binder.bindConsumer(eq("foo"), isNull(), same(inputChannel),
any(ConsumerProperties.class))).thenReturn(mockBinding);
Collection<Binding<MessageChannel>> bindings = service.bindConsumer(inputChannel,
inputChannelName);
assertThat(bindings).hasSize(1);
Binding<MessageChannel> binding = bindings.iterator().next();
assertThat(binding).isSameAs(mockBinding);
service.unbindConsumers(inputChannelName);
verify(binder).bindConsumer(eq("foo"), isNull(), same(inputChannel),
any(ConsumerProperties.class));
verify(binding).unbind();
binderFactory.destroy();
}
@Override
@SuppressWarnings("unchecked")
protected Binding<KStream<Object, Object>> doBindProducer(String name,
KStream<Object, Object> outboundBindTarget,
ExtendedProducerProperties<KafkaStreamsProducerProperties> properties) {
ExtendedProducerProperties<KafkaProducerProperties> extendedProducerProperties =
(ExtendedProducerProperties) properties;
this.kafkaTopicProvisioner.provisionProducerDestination(name, extendedProducerProperties);
Serde<?> keySerde = this.keyValueSerdeResolver
.getOuboundKeySerde(properties.getExtension(), kafkaStreamsBindingInformationCatalogue.getOutboundKStreamResolvable());
LOG.info("Key Serde used for (outbound) " + name + ": " + keySerde.getClass().getName());
Serde<?> valueSerde;
if (properties.isUseNativeEncoding()) {
valueSerde = this.keyValueSerdeResolver.getOutboundValueSerde(properties,
properties.getExtension(), kafkaStreamsBindingInformationCatalogue.getOutboundKStreamResolvable());
}
else {
valueSerde = Serdes.ByteArray();
}
LOG.info("Value Serde used for (outbound) " + name + ": " + valueSerde.getClass().getName());
to(properties.isUseNativeEncoding(), name, outboundBindTarget,
(Serde<Object>) keySerde, (Serde<Object>) valueSerde, properties.getExtension());
return new DefaultBinding<>(name, null, outboundBindTarget, null);
}
@Override
protected Binding<GlobalKTable<Object, Object>> doBindProducer(String name,
GlobalKTable<Object, Object> outboundBindTarget,
ExtendedProducerProperties<KafkaStreamsProducerProperties> properties) {
throw new UnsupportedOperationException(
"No producer level binding is allowed for GlobalKTable");
}
@SuppressWarnings("unchecked")
private List<Binding<?>> gatherInputBindings() {
List<Binding<?>> inputBindings = new ArrayList<>();
for (InputBindingLifecycle inputBindingLifecycle : this.inputBindingLifecycles) {
Collection<Binding<?>> lifecycleInputBindings = (Collection<Binding<?>>) new DirectFieldAccessor(
inputBindingLifecycle).getPropertyValue("inputBindings");
inputBindings.addAll(lifecycleInputBindings);
}
return inputBindings;
}
@Test
public void testKafkaBinderMetricsWhenNoMicrometer() {
new ApplicationContextRunner().withUserConfiguration(KafkaMetricsTestConfig.class)
.withClassLoader(new FilteredClassLoader("io.micrometer.core"))
.run(context -> {
assertThat(context.getBeanNamesForType(MeterRegistry.class))
.isEmpty();
assertThat(context.getBeanNamesForType(MeterBinder.class)).isEmpty();
DirectFieldAccessor channelBindingServiceAccessor = new DirectFieldAccessor(
context.getBean(BindingService.class));
@SuppressWarnings("unchecked")
Map<String, List<Binding<MessageChannel>>> consumerBindings =
(Map<String, List<Binding<MessageChannel>>>) channelBindingServiceAccessor
.getPropertyValue("consumerBindings");
assertThat(new DirectFieldAccessor(
consumerBindings.get("input").get(0)).getPropertyValue(
"lifecycle.messageListenerContainer.beanName"))
.isEqualTo("setByCustomizer:input");
assertThat(new DirectFieldAccessor(
consumerBindings.get("input").get(0)).getPropertyValue(
"lifecycle.beanName"))
.isEqualTo("setByCustomizer:input");
assertThat(new DirectFieldAccessor(
consumerBindings.get("source").get(0)).getPropertyValue(
"lifecycle.beanName"))
.isEqualTo("setByCustomizer:source");
@SuppressWarnings("unchecked")
Map<String, Binding<MessageChannel>> producerBindings =
(Map<String, Binding<MessageChannel>>) channelBindingServiceAccessor
.getPropertyValue("producerBindings");
assertThat(new DirectFieldAccessor(
producerBindings.get("output")).getPropertyValue(
"lifecycle.beanName"))
.isEqualTo("setByCustomizer:output");
});
}
@SuppressWarnings("unchecked")
private List<Binding<?>> gatherOutputBindings() {
List<Binding<?>> outputBindings = new ArrayList<>();
for (OutputBindingLifecycle inputBindingLifecycle : this.outputBindingsLifecycles) {
Collection<Binding<?>> lifecycleInputBindings = (Collection<Binding<?>>) new DirectFieldAccessor(
inputBindingLifecycle).getPropertyValue("outputBindings");
outputBindings.addAll(lifecycleInputBindings);
}
return outputBindings;
}
@Test
@SuppressWarnings({ "unchecked", "rawtypes" })
public void testPartitionedNative() throws Exception {
Binder binder = getBinder();
ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
properties.setPartitionCount(6);
DirectChannel output = createBindableChannel("output",
createProducerBindingProperties(properties));
output.setBeanName("test.output");
Binding<MessageChannel> outputBinding = binder.bindProducer("partNative.raw.0",
output, properties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
QueueChannel input0 = new QueueChannel();
input0.setBeanName("test.inputNative");
Binding<MessageChannel> inputBinding = binder.bindConsumer("partNative.raw.0",
"test", input0, consumerProperties);
output.send(new GenericMessage<>("foo".getBytes(),
Collections.singletonMap(KafkaHeaders.PARTITION_ID, 5)));
Message<?> received = receive(input0);
assertThat(received).isNotNull();
assertThat(received.getPayload()).isEqualTo("foo".getBytes());
assertThat(received.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.isEqualTo(5);
inputBinding.unbind();
outputBinding.unbind();
}
public void unbindConsumers(String inputName) {
List<Binding<?>> bindings = this.consumerBindings.remove(inputName);
if (bindings != null && !CollectionUtils.isEmpty(bindings)) {
for (Binding<?> binding : bindings) {
binding.stop();
//then
binding.unbind();
}
}
else if (this.log.isWarnEnabled()) {
this.log.warn("Trying to unbind '" + inputName + "', but no binding found.");
}
}
@Test
@SuppressWarnings("unchecked")
public void testCustomPartitionCountOverridesDefaultIfLarger() throws Exception {
byte[] testPayload = new byte[2048];
Arrays.fill(testPayload, (byte) 65);
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
binderConfiguration.setMinPartitionCount(10);
Binder binder = getBinder(binderConfiguration);
QueueChannel moduleInputChannel = new QueueChannel();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setPartitionCount(10);
producerProperties.setPartitionKeyExpression(new LiteralExpression("foo"));
DirectChannel moduleOutputChannel = createBindableChannel("output",
createProducerBindingProperties(producerProperties));
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
long uniqueBindingId = System.currentTimeMillis();
Binding<MessageChannel> producerBinding = binder.bindProducer(
"foo" + uniqueBindingId + ".0", moduleOutputChannel, producerProperties);
Binding<MessageChannel> consumerBinding = binder.bindConsumer(
"foo" + uniqueBindingId + ".0", null, moduleInputChannel,
consumerProperties);
Message<?> message = org.springframework.integration.support.MessageBuilder
.withPayload(testPayload).build();
// Let the consumer actually bind to the producer before sending a msg
binderBindUnbindLatency();
moduleOutputChannel.send(message);
Message<?> inbound = receive(moduleInputChannel);
assertThat(inbound).isNotNull();
assertThat((byte[]) inbound.getPayload()).containsExactly(testPayload);
assertThat(partitionSize("foo" + uniqueBindingId + ".0")).isEqualTo(10);
producerBinding.unbind();
consumerBinding.unbind();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public <T> Binding<T> bindProducer(T output, String outputName, boolean cache) {
String bindingTarget = this.bindingServiceProperties
.getBindingDestination(outputName);
Class<?> outputClass = output.getClass();
if (output instanceof Advised) {
outputClass = Stream.of(((Advised) output).getProxiedInterfaces()).filter(c -> !c.getName().contains("org.springframework")).findFirst()
.orElse(outputClass);
}
Binder<T, ?, ProducerProperties> binder = (Binder<T, ?, ProducerProperties>) getBinder(
outputName, outputClass);
ProducerProperties producerProperties = this.bindingServiceProperties
.getProducerProperties(outputName);
if (binder instanceof ExtendedPropertiesBinder) {
Object extension = ((ExtendedPropertiesBinder) binder)
.getExtendedProducerProperties(outputName);
ExtendedProducerProperties extendedProducerProperties = new ExtendedProducerProperties<>(
extension);
BeanUtils.copyProperties(producerProperties, extendedProducerProperties);
producerProperties = extendedProducerProperties;
}
validate(producerProperties);
Binding<T> binding = doBindProducer(output, bindingTarget, binder,
producerProperties);
if (cache) {
this.producerBindings.put(outputName, binding);
}
return binding;
}
@Override
public Binding<PollableSource<MessageHandler>> bindPollableConsumer(String name,
String group, PollableSource<MessageHandler> inboundBindTarget,
ExtendedConsumerProperties<RabbitConsumerProperties> properties) {
captureConsumerResources(name, group, properties);
return super.bindPollableConsumer(name, group, inboundBindTarget, properties);
}
@Override
void doStartWithBindable(Bindable bindable) {
Collection<Binding<Object>> bindableBindings = bindable
.createAndBindOutputs(this.bindingService);
if (!CollectionUtils.isEmpty(bindableBindings)) {
this.outputBindings.addAll(bindableBindings);
}
}
@Test
@SuppressWarnings("unchecked")
public void testAutoAddPartitionsDisabledFailsIfTopicUnderPartitionedAndAutoRebalanceDisabled()
throws Throwable {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
String testTopicName = "existing" + System.currentTimeMillis();
invokeCreateTopic(testTopicName, 1, 1);
configurationProperties.setAutoAddPartitions(false);
Binder binder = getBinder(configurationProperties);
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
DirectChannel output = createBindableChannel("output",
createConsumerBindingProperties(consumerProperties));
// this consumer must consume from partition 2
consumerProperties.setInstanceCount(3);
consumerProperties.setInstanceIndex(2);
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
expectedProvisioningException.expect(ProvisioningException.class);
expectedProvisioningException.expectMessage(
"The number of expected partitions was: 3, but 1 has been found instead");
Binding binding = binder.bindConsumer(testTopicName, "test", output,
consumerProperties);
if (binding != null) {
binding.unbind();
}
}
public synchronized void setDelegate(Binding<T> delegate) {
if (this.unbound) {
delegate.unbind();
}
else {
this.delegate = delegate;
}
}
@SuppressWarnings("unchecked")
@Test
public void testBindingAutostartup() throws Exception {
ApplicationContext context = new SpringApplicationBuilder(FooConfiguration.class)
.web(WebApplicationType.NONE).run("--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.consumer.auto-startup=false");
BindingService bindingService = context.getBean(BindingService.class);
Field cbField = ReflectionUtils.findField(BindingService.class,
"consumerBindings");
cbField.setAccessible(true);
Map<String, Object> cbMap = (Map<String, Object>) cbField.get(bindingService);
Binding<?> inputBinding = ((List<Binding<?>>) cbMap.get("input")).get(0);
assertThat(inputBinding.isRunning()).isFalse();
}
private KafkaConsumer getKafkaConsumer(Binding binding) {
DirectFieldAccessor bindingAccessor = new DirectFieldAccessor(binding);
KafkaMessageDrivenChannelAdapter adapter = (KafkaMessageDrivenChannelAdapter) bindingAccessor
.getPropertyValue("lifecycle");
DirectFieldAccessor adapterAccessor = new DirectFieldAccessor(adapter);
ConcurrentMessageListenerContainer messageListenerContainer = (ConcurrentMessageListenerContainer) adapterAccessor
.getPropertyValue("messageListenerContainer");
DirectFieldAccessor containerAccessor = new DirectFieldAccessor(
messageListenerContainer);
DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) containerAccessor
.getPropertyValue("consumerFactory");
return (KafkaConsumer) consumerFactory.createConsumer();
}
@Override
public Binding<MessageChannel> bindConsumer(String name, String group,
MessageChannel moduleInputChannel,
ExtendedConsumerProperties<RabbitConsumerProperties> properties) {
captureConsumerResources(name, group, properties);
return super.bindConsumer(name, group, moduleInputChannel, properties);
}