org.springframework.boot.test.context.FilteredClassLoader#org.springframework.cloud.stream.binder.Binding源码实例Demo

下面列出了org.springframework.boot.test.context.FilteredClassLoader#org.springframework.cloud.stream.binder.Binding 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Override
protected Binding<KStream<Object, Object>> doBindConsumer(String name, String group,
		KStream<Object, Object> inputTarget,
		ExtendedConsumerProperties<KafkaStreamsConsumerProperties> properties) {

	KStream<Object, Object> delegate = ((KStreamBoundElementFactory.KStreamWrapperHandler)
			((Advised) inputTarget).getAdvisors()[0].getAdvice()).getDelegate();

	this.kafkaStreamsBindingInformationCatalogue.registerConsumerProperties(delegate, properties.getExtension());

	if (!StringUtils.hasText(group)) {
		group = properties.getExtension().getApplicationId();
	}
	KafkaStreamsBinderUtils.prepareConsumerBinding(name, group,
			getApplicationContext(), this.kafkaTopicProvisioner,
			this.binderConfigurationProperties, properties);

	return new DefaultBinding<>(name, group, inputTarget, null);
}
 
源代码2 项目: spring-cloud-stream   文件: BindingService.java
public <T> Binding<T> doBindProducer(T output, String bindingTarget,
		Binder<T, ?, ProducerProperties> binder,
		ProducerProperties producerProperties) {
	if (this.taskScheduler == null
			|| this.bindingServiceProperties.getBindingRetryInterval() <= 0) {
		return binder.bindProducer(bindingTarget, output, producerProperties);
	}
	else {
		try {
			return binder.bindProducer(bindingTarget, output, producerProperties);
		}
		catch (RuntimeException e) {
			LateBinding<T> late = new LateBinding<T>(bindingTarget,
					e.getCause() == null ? e.toString() : e.getCause().getMessage(), producerProperties, false);
			rescheduleProducerBinding(output, bindingTarget, binder,
					producerProperties, late, e);
			return late;
		}
	}
}
 
@Override
public Collection<Binding<Object>> createAndBindInputs(
	BindingService bindingService) {
	List<Binding<Object>> bindings = new ArrayList<>();
	if (log.isDebugEnabled()) {
		log.debug(
			String.format("Binding inputs for %s:%s", this.namespace, this.type));
	}
	for (Map.Entry<String, BoundTargetHolder> boundTargetHolderEntry : this.inputHolders
		.entrySet()) {
		String inputTargetName = boundTargetHolderEntry.getKey();
		BoundTargetHolder boundTargetHolder = boundTargetHolderEntry.getValue();
		if (boundTargetHolder.isBindable()) {
			if (log.isDebugEnabled()) {
				log.debug(String.format("Binding %s:%s:%s", this.namespace, this.type,
					inputTargetName));
			}
			bindings.addAll(bindingService.bindConsumer(
				boundTargetHolder.getBoundTarget(), inputTargetName));
		}
	}
	return bindings;
}
 
@Test
@SuppressWarnings("unchecked")
public void testSyncProducerMetadata() throws Exception {
	Binder binder = getBinder(createConfigurationProperties());
	DirectChannel output = new DirectChannel();
	String testTopicName = UUID.randomUUID().toString();
	ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
	properties.getExtension().setSync(true);
	Binding<MessageChannel> producerBinding = binder.bindProducer(testTopicName,
			output, properties);
	DirectFieldAccessor accessor = new DirectFieldAccessor(
			extractEndpoint(producerBinding));
	KafkaProducerMessageHandler wrappedInstance = (KafkaProducerMessageHandler) accessor
			.getWrappedInstance();
	assertThat(new DirectFieldAccessor(wrappedInstance).getPropertyValue("sync")
			.equals(Boolean.TRUE))
					.withFailMessage("Kafka Sync Producer should have been enabled.");
	producerBinding.unbind();
}
 
@Test
@SuppressWarnings("unchecked")
public void testSendTimeoutExpressionProducerMetadata() throws Exception {
	Binder binder = getBinder(createConfigurationProperties());
	DirectChannel output = new DirectChannel();
	String testTopicName = UUID.randomUUID().toString();
	ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
	properties.getExtension().setSync(true);
	SpelExpressionParser parser = new SpelExpressionParser();
	Expression sendTimeoutExpression = parser.parseExpression("5000");
	properties.getExtension().setSendTimeoutExpression(sendTimeoutExpression);
	Binding<MessageChannel> producerBinding = binder.bindProducer(testTopicName,
			output, properties);
	DirectFieldAccessor accessor = new DirectFieldAccessor(
			extractEndpoint(producerBinding));
	KafkaProducerMessageHandler wrappedInstance = (KafkaProducerMessageHandler) accessor
			.getWrappedInstance();
	assertThat(new DirectFieldAccessor(wrappedInstance).getPropertyValue("sendTimeoutExpression")
			.equals(sendTimeoutExpression));
	producerBinding.unbind();
}
 
@Test
@SuppressWarnings("unchecked")
public void testAutoConfigureTopicsDisabledSucceedsIfTopicExisting()
		throws Throwable {
	KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();

	String testTopicName = "existing" + System.currentTimeMillis();
	invokeCreateTopic(testTopicName, 5, 1);
	configurationProperties.setAutoCreateTopics(false);
	Binder binder = getBinder(configurationProperties);

	ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();

	DirectChannel input = createBindableChannel("input",
			createConsumerBindingProperties(consumerProperties));
	Binding<MessageChannel> binding = binder.bindConsumer(testTopicName, "test",
			input, consumerProperties);
	binding.unbind();
}
 
源代码7 项目: spring-cloud-stream   文件: BindingService.java
@SuppressWarnings({ "rawtypes", "unchecked" })
public <T> Binding<T> doBindPollableConsumer(T input, String inputName,
		Binder<T, ConsumerProperties, ?> binder,
		ConsumerProperties consumerProperties, String target) {
	if (this.taskScheduler == null
			|| this.bindingServiceProperties.getBindingRetryInterval() <= 0) {
		return ((PollableConsumerBinder) binder).bindPollableConsumer(target,
				this.bindingServiceProperties.getGroup(inputName),
				(PollableSource) input, consumerProperties);
	}
	else {
		try {
			return ((PollableConsumerBinder) binder).bindPollableConsumer(target,
					this.bindingServiceProperties.getGroup(inputName),
					(PollableSource) input, consumerProperties);
		}
		catch (RuntimeException e) {
			LateBinding<T> late = new LateBinding<T>(target,
					e.getCause() == null ? e.toString() : e.getCause().getMessage(), consumerProperties, true);
			reschedulePollableConsumerBinding(input, inputName, binder,
					consumerProperties, target, late, e);
			return late;
		}
	}
}
 
@Test
@SuppressWarnings("unchecked")
public void testAutoAddPartitionsDisabledSucceedsIfTopicUnderPartitionedAndAutoRebalanceEnabled()
		throws Throwable {
	KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();

	String testTopicName = "existing" + System.currentTimeMillis();
	invokeCreateTopic(testTopicName, 1, 1);
	configurationProperties.setAutoAddPartitions(false);
	Binder binder = getBinder(configurationProperties);
	GenericApplicationContext context = new GenericApplicationContext();
	context.refresh();

	ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();

	DirectChannel input = createBindableChannel("input",
			createConsumerBindingProperties(consumerProperties));

	// this consumer must consume from partition 2
	consumerProperties.setInstanceCount(3);
	consumerProperties.setInstanceIndex(2);
	Binding binding = binder.bindConsumer(testTopicName, "test", input,
			consumerProperties);
	binding.unbind();
	assertThat(invokePartitionSize(testTopicName)).isEqualTo(1);
}
 
@Test
@SuppressWarnings("unchecked")
public void testPartitionCountNotReduced() throws Throwable {
	String testTopicName = "existing" + System.currentTimeMillis();

	KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();

	invokeCreateTopic(testTopicName, 6, 1);
	configurationProperties.setAutoAddPartitions(true);
	Binder binder = getBinder(configurationProperties);
	GenericApplicationContext context = new GenericApplicationContext();
	context.refresh();

	ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
	DirectChannel input = createBindableChannel("input",
			createConsumerBindingProperties(consumerProperties));

	Binding<?> binding = binder.bindConsumer(testTopicName, "test", input,
			consumerProperties);
	binding.unbind();

	assertThat(partitionSize(testTopicName)).isEqualTo(6);
}
 
@Test
public void testAnonWithBuiltInExchange() throws Exception {
	RabbitTestBinder binder = getBinder();
	ExtendedConsumerProperties<RabbitConsumerProperties> properties = createConsumerProperties();
	properties.getExtension().setDeclareExchange(false);
	properties.getExtension().setQueueNameGroupOnly(true);

	Binding<MessageChannel> consumerBinding = binder.bindConsumer("amq.topic", null,
			createBindableChannel("input", new BindingProperties()), properties);
	Lifecycle endpoint = extractEndpoint(consumerBinding);
	SimpleMessageListenerContainer container = TestUtils.getPropertyValue(endpoint,
			"messageListenerContainer", SimpleMessageListenerContainer.class);
	String queueName = container.getQueueNames()[0];
	assertThat(queueName).startsWith("anonymous.");
	assertThat(container.isRunning()).isTrue();
	consumerBinding.unbind();
	assertThat(container.isRunning()).isFalse();
}
 
@Test
public void testPolledConsumer() throws Exception {
	RabbitTestBinder binder = getBinder();
	PollableSource<MessageHandler> inboundBindTarget = new DefaultPollableMessageSource(
			this.messageConverter);
	Binding<PollableSource<MessageHandler>> binding = binder.bindPollableConsumer(
			"pollable", "group", inboundBindTarget, createConsumerProperties());
	RabbitTemplate template = new RabbitTemplate(
			this.rabbitAvailableRule.getResource());
	template.convertAndSend("pollable.group", "testPollable");
	boolean polled = inboundBindTarget.poll(m -> {
		assertThat(m.getPayload()).isEqualTo("testPollable");
	});
	int n = 0;
	while (n++ < 100 && !polled) {
		polled = inboundBindTarget.poll(m -> {
			assertThat(m.getPayload()).isEqualTo("testPollable");
		});
	}
	assertThat(polled).isTrue();
	binding.unbind();
}
 
@Test
public void testAutoCreateStreamForNonExistingStream() throws Exception {
	KinesisTestBinder binder = getBinder();
	DirectChannel output = createBindableChannel("output", new BindingProperties());
	ExtendedConsumerProperties<KinesisConsumerProperties> consumerProperties = createConsumerProperties();
	Date testDate = new Date();
	consumerProperties.getExtension().setShardIteratorType(
			ShardIteratorType.AT_TIMESTAMP.name() + ":" + testDate.getTime());
	String testStreamName = "nonexisting" + System.currentTimeMillis();
	Binding<?> binding = binder.bindConsumer(testStreamName, "test", output,
			consumerProperties);
	binding.unbind();

	DescribeStreamResult streamResult = AMAZON_KINESIS.describeStream(testStreamName);
	String createdStreamName = streamResult.getStreamDescription().getStreamName();
	int createdShards = streamResult.getStreamDescription().getShards().size();
	String createdStreamStatus = streamResult.getStreamDescription()
			.getStreamStatus();

	assertThat(createdStreamName).isEqualTo(testStreamName);
	assertThat(createdShards).isEqualTo(consumerProperties.getInstanceCount()
			* consumerProperties.getConcurrency());
	assertThat(createdStreamStatus).isEqualTo(StreamStatus.ACTIVE.toString());

	KinesisShardOffset shardOffset = TestUtils.getPropertyValue(binding,
			"lifecycle.streamInitialSequence", KinesisShardOffset.class);
	assertThat(shardOffset.getIteratorType())
			.isEqualTo(ShardIteratorType.AT_TIMESTAMP);
	assertThat(shardOffset.getTimestamp()).isEqualTo(testDate);
}
 
源代码13 项目: spring-cloud-stream   文件: BindingService.java
public void unbindProducers(String outputName) {
	Binding<?> binding = this.producerBindings.remove(outputName);

	if (binding != null) {
		binding.stop();
		//then
		binding.unbind();
	}
	else if (this.log.isWarnEnabled()) {
		this.log.warn("Trying to unbind '" + outputName + "', but no binding found.");
	}
}
 
源代码14 项目: spring-cloud-stream   文件: BindingServiceTests.java
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testDefaultGroup() throws Exception {
	BindingServiceProperties properties = new BindingServiceProperties();
	Map<String, BindingProperties> bindingProperties = new HashMap<>();
	BindingProperties props = new BindingProperties();
	props.setDestination("foo");
	final String inputChannelName = "input";
	bindingProperties.put(inputChannelName, props);
	properties.setBindings(bindingProperties);
	DefaultBinderFactory binderFactory = createMockBinderFactory();
	Binder binder = binderFactory.getBinder("mock", MessageChannel.class);
	BindingService service = new BindingService(properties, binderFactory);
	MessageChannel inputChannel = new DirectChannel();
	Binding<MessageChannel> mockBinding = Mockito.mock(Binding.class);
	when(binder.bindConsumer(eq("foo"), isNull(), same(inputChannel),
			any(ConsumerProperties.class))).thenReturn(mockBinding);
	Collection<Binding<MessageChannel>> bindings = service.bindConsumer(inputChannel,
			inputChannelName);
	assertThat(bindings).hasSize(1);
	Binding<MessageChannel> binding = bindings.iterator().next();
	assertThat(binding).isSameAs(mockBinding);
	service.unbindConsumers(inputChannelName);
	verify(binder).bindConsumer(eq("foo"), isNull(), same(inputChannel),
			any(ConsumerProperties.class));
	verify(binding).unbind();
	binderFactory.destroy();
}
 
@Override
@SuppressWarnings("unchecked")
protected Binding<KStream<Object, Object>> doBindProducer(String name,
		KStream<Object, Object> outboundBindTarget,
		ExtendedProducerProperties<KafkaStreamsProducerProperties> properties) {

	ExtendedProducerProperties<KafkaProducerProperties> extendedProducerProperties =
			(ExtendedProducerProperties) properties;

	this.kafkaTopicProvisioner.provisionProducerDestination(name, extendedProducerProperties);
	Serde<?> keySerde = this.keyValueSerdeResolver
			.getOuboundKeySerde(properties.getExtension(), kafkaStreamsBindingInformationCatalogue.getOutboundKStreamResolvable());
	LOG.info("Key Serde used for (outbound) " + name + ": " + keySerde.getClass().getName());

	Serde<?> valueSerde;
	if (properties.isUseNativeEncoding()) {
		valueSerde = this.keyValueSerdeResolver.getOutboundValueSerde(properties,
				properties.getExtension(), kafkaStreamsBindingInformationCatalogue.getOutboundKStreamResolvable());
	}
	else {
		valueSerde = Serdes.ByteArray();
	}
	LOG.info("Value Serde used for (outbound) " + name + ": " + valueSerde.getClass().getName());

	to(properties.isUseNativeEncoding(), name, outboundBindTarget,
			(Serde<Object>) keySerde, (Serde<Object>) valueSerde, properties.getExtension());
	return new DefaultBinding<>(name, null, outboundBindTarget, null);
}
 
@Override
protected Binding<GlobalKTable<Object, Object>> doBindProducer(String name,
		GlobalKTable<Object, Object> outboundBindTarget,
		ExtendedProducerProperties<KafkaStreamsProducerProperties> properties) {
	throw new UnsupportedOperationException(
			"No producer level binding is allowed for GlobalKTable");
}
 
源代码17 项目: spring-cloud-stream   文件: BindingsEndpoint.java
@SuppressWarnings("unchecked")
private List<Binding<?>> gatherInputBindings() {
	List<Binding<?>> inputBindings = new ArrayList<>();
	for (InputBindingLifecycle inputBindingLifecycle : this.inputBindingLifecycles) {
		Collection<Binding<?>> lifecycleInputBindings = (Collection<Binding<?>>) new DirectFieldAccessor(
				inputBindingLifecycle).getPropertyValue("inputBindings");
		inputBindings.addAll(lifecycleInputBindings);
	}
	return inputBindings;
}
 
@Test
public void testKafkaBinderMetricsWhenNoMicrometer() {
	new ApplicationContextRunner().withUserConfiguration(KafkaMetricsTestConfig.class)
			.withClassLoader(new FilteredClassLoader("io.micrometer.core"))
			.run(context -> {
				assertThat(context.getBeanNamesForType(MeterRegistry.class))
						.isEmpty();
				assertThat(context.getBeanNamesForType(MeterBinder.class)).isEmpty();

				DirectFieldAccessor channelBindingServiceAccessor = new DirectFieldAccessor(
						context.getBean(BindingService.class));
				@SuppressWarnings("unchecked")
				Map<String, List<Binding<MessageChannel>>> consumerBindings =
					(Map<String, List<Binding<MessageChannel>>>) channelBindingServiceAccessor
						.getPropertyValue("consumerBindings");
				assertThat(new DirectFieldAccessor(
						consumerBindings.get("input").get(0)).getPropertyValue(
								"lifecycle.messageListenerContainer.beanName"))
										.isEqualTo("setByCustomizer:input");
				assertThat(new DirectFieldAccessor(
						consumerBindings.get("input").get(0)).getPropertyValue(
								"lifecycle.beanName"))
										.isEqualTo("setByCustomizer:input");
				assertThat(new DirectFieldAccessor(
						consumerBindings.get("source").get(0)).getPropertyValue(
								"lifecycle.beanName"))
										.isEqualTo("setByCustomizer:source");

				@SuppressWarnings("unchecked")
				Map<String, Binding<MessageChannel>> producerBindings =
					(Map<String, Binding<MessageChannel>>) channelBindingServiceAccessor
						.getPropertyValue("producerBindings");

				assertThat(new DirectFieldAccessor(
						producerBindings.get("output")).getPropertyValue(
						"lifecycle.beanName"))
						.isEqualTo("setByCustomizer:output");
			});
}
 
源代码19 项目: spring-cloud-stream   文件: BindingsEndpoint.java
@SuppressWarnings("unchecked")
private List<Binding<?>> gatherOutputBindings() {
	List<Binding<?>> outputBindings = new ArrayList<>();
	for (OutputBindingLifecycle inputBindingLifecycle : this.outputBindingsLifecycles) {
		Collection<Binding<?>> lifecycleInputBindings = (Collection<Binding<?>>) new DirectFieldAccessor(
				inputBindingLifecycle).getPropertyValue("outputBindings");
		outputBindings.addAll(lifecycleInputBindings);
	}
	return outputBindings;
}
 
@Test
@SuppressWarnings({ "unchecked", "rawtypes" })
public void testPartitionedNative() throws Exception {
	Binder binder = getBinder();
	ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
	properties.setPartitionCount(6);

	DirectChannel output = createBindableChannel("output",
			createProducerBindingProperties(properties));
	output.setBeanName("test.output");
	Binding<MessageChannel> outputBinding = binder.bindProducer("partNative.raw.0",
			output, properties);

	ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
	QueueChannel input0 = new QueueChannel();
	input0.setBeanName("test.inputNative");
	Binding<MessageChannel> inputBinding = binder.bindConsumer("partNative.raw.0",
			"test", input0, consumerProperties);

	output.send(new GenericMessage<>("foo".getBytes(),
			Collections.singletonMap(KafkaHeaders.PARTITION_ID, 5)));

	Message<?> received = receive(input0);
	assertThat(received).isNotNull();

	assertThat(received.getPayload()).isEqualTo("foo".getBytes());
	assertThat(received.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
			.isEqualTo(5);

	inputBinding.unbind();
	outputBinding.unbind();
}
 
源代码21 项目: spring-cloud-stream   文件: BindingService.java
public void unbindConsumers(String inputName) {
	List<Binding<?>> bindings = this.consumerBindings.remove(inputName);
	if (bindings != null && !CollectionUtils.isEmpty(bindings)) {
		for (Binding<?> binding : bindings) {
			binding.stop();
			//then
			binding.unbind();
		}
	}
	else if (this.log.isWarnEnabled()) {
		this.log.warn("Trying to unbind '" + inputName + "', but no binding found.");
	}
}
 
@Test
@SuppressWarnings("unchecked")
public void testCustomPartitionCountOverridesDefaultIfLarger() throws Exception {
	byte[] testPayload = new byte[2048];
	Arrays.fill(testPayload, (byte) 65);
	KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
	binderConfiguration.setMinPartitionCount(10);
	Binder binder = getBinder(binderConfiguration);
	QueueChannel moduleInputChannel = new QueueChannel();
	ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
	producerProperties.setPartitionCount(10);
	producerProperties.setPartitionKeyExpression(new LiteralExpression("foo"));

	DirectChannel moduleOutputChannel = createBindableChannel("output",
			createProducerBindingProperties(producerProperties));

	ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
	long uniqueBindingId = System.currentTimeMillis();
	Binding<MessageChannel> producerBinding = binder.bindProducer(
			"foo" + uniqueBindingId + ".0", moduleOutputChannel, producerProperties);
	Binding<MessageChannel> consumerBinding = binder.bindConsumer(
			"foo" + uniqueBindingId + ".0", null, moduleInputChannel,
			consumerProperties);
	Message<?> message = org.springframework.integration.support.MessageBuilder
			.withPayload(testPayload).build();
	// Let the consumer actually bind to the producer before sending a msg
	binderBindUnbindLatency();
	moduleOutputChannel.send(message);
	Message<?> inbound = receive(moduleInputChannel);
	assertThat(inbound).isNotNull();
	assertThat((byte[]) inbound.getPayload()).containsExactly(testPayload);

	assertThat(partitionSize("foo" + uniqueBindingId + ".0")).isEqualTo(10);
	producerBinding.unbind();
	consumerBinding.unbind();
}
 
源代码23 项目: spring-cloud-stream   文件: BindingService.java
@SuppressWarnings({ "unchecked", "rawtypes" })
public <T> Binding<T> bindProducer(T output, String outputName, boolean cache) {
	String bindingTarget = this.bindingServiceProperties
			.getBindingDestination(outputName);
	Class<?> outputClass = output.getClass();
	if (output instanceof Advised) {
		outputClass = Stream.of(((Advised) output).getProxiedInterfaces()).filter(c -> !c.getName().contains("org.springframework")).findFirst()
				.orElse(outputClass);
	}
	Binder<T, ?, ProducerProperties> binder = (Binder<T, ?, ProducerProperties>) getBinder(
			outputName, outputClass);
	ProducerProperties producerProperties = this.bindingServiceProperties
			.getProducerProperties(outputName);
	if (binder instanceof ExtendedPropertiesBinder) {
		Object extension = ((ExtendedPropertiesBinder) binder)
				.getExtendedProducerProperties(outputName);
		ExtendedProducerProperties extendedProducerProperties = new ExtendedProducerProperties<>(
				extension);
		BeanUtils.copyProperties(producerProperties, extendedProducerProperties);

		producerProperties = extendedProducerProperties;
	}
	validate(producerProperties);
	Binding<T> binding = doBindProducer(output, bindingTarget, binder,
			producerProperties);
	if (cache) {
		this.producerBindings.put(outputName, binding);
	}
	return binding;
}
 
@Override
public Binding<PollableSource<MessageHandler>> bindPollableConsumer(String name,
		String group, PollableSource<MessageHandler> inboundBindTarget,
		ExtendedConsumerProperties<RabbitConsumerProperties> properties) {
	captureConsumerResources(name, group, properties);
	return super.bindPollableConsumer(name, group, inboundBindTarget, properties);
}
 
@Override
void doStartWithBindable(Bindable bindable) {
	Collection<Binding<Object>> bindableBindings = bindable
			.createAndBindOutputs(this.bindingService);
	if (!CollectionUtils.isEmpty(bindableBindings)) {
		this.outputBindings.addAll(bindableBindings);
	}
}
 
@Test
@SuppressWarnings("unchecked")
public void testAutoAddPartitionsDisabledFailsIfTopicUnderPartitionedAndAutoRebalanceDisabled()
		throws Throwable {
	KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();

	String testTopicName = "existing" + System.currentTimeMillis();
	invokeCreateTopic(testTopicName, 1, 1);
	configurationProperties.setAutoAddPartitions(false);
	Binder binder = getBinder(configurationProperties);
	GenericApplicationContext context = new GenericApplicationContext();
	context.refresh();

	ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
	DirectChannel output = createBindableChannel("output",
			createConsumerBindingProperties(consumerProperties));
	// this consumer must consume from partition 2
	consumerProperties.setInstanceCount(3);
	consumerProperties.setInstanceIndex(2);
	consumerProperties.getExtension().setAutoRebalanceEnabled(false);
	expectedProvisioningException.expect(ProvisioningException.class);
	expectedProvisioningException.expectMessage(
			"The number of expected partitions was: 3, but 1 has been found instead");
	Binding binding = binder.bindConsumer(testTopicName, "test", output,
			consumerProperties);
	if (binding != null) {
		binding.unbind();
	}
}
 
源代码27 项目: spring-cloud-stream   文件: BindingService.java
public synchronized void setDelegate(Binding<T> delegate) {
	if (this.unbound) {
		delegate.unbind();
	}
	else {
		this.delegate = delegate;
	}
}
 
源代码28 项目: spring-cloud-stream   文件: BindingServiceTests.java
@SuppressWarnings("unchecked")
@Test
public void testBindingAutostartup() throws Exception {
	ApplicationContext context = new SpringApplicationBuilder(FooConfiguration.class)
			.web(WebApplicationType.NONE).run("--spring.jmx.enabled=false",
					"--spring.cloud.stream.bindings.input.consumer.auto-startup=false");
	BindingService bindingService = context.getBean(BindingService.class);

	Field cbField = ReflectionUtils.findField(BindingService.class,
			"consumerBindings");
	cbField.setAccessible(true);
	Map<String, Object> cbMap = (Map<String, Object>) cbField.get(bindingService);
	Binding<?> inputBinding = ((List<Binding<?>>) cbMap.get("input")).get(0);
	assertThat(inputBinding.isRunning()).isFalse();
}
 
private KafkaConsumer getKafkaConsumer(Binding binding) {
	DirectFieldAccessor bindingAccessor = new DirectFieldAccessor(binding);
	KafkaMessageDrivenChannelAdapter adapter = (KafkaMessageDrivenChannelAdapter) bindingAccessor
			.getPropertyValue("lifecycle");
	DirectFieldAccessor adapterAccessor = new DirectFieldAccessor(adapter);
	ConcurrentMessageListenerContainer messageListenerContainer = (ConcurrentMessageListenerContainer) adapterAccessor
			.getPropertyValue("messageListenerContainer");
	DirectFieldAccessor containerAccessor = new DirectFieldAccessor(
			messageListenerContainer);
	DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) containerAccessor
			.getPropertyValue("consumerFactory");
	return (KafkaConsumer) consumerFactory.createConsumer();
}
 
@Override
public Binding<MessageChannel> bindConsumer(String name, String group,
		MessageChannel moduleInputChannel,
		ExtendedConsumerProperties<RabbitConsumerProperties> properties) {
	captureConsumerResources(name, group, properties);
	return super.bindConsumer(name, group, moduleInputChannel, properties);
}