下面列出了怎么用org.springframework.boot.WebApplicationType的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void environmentEnrichedInParentContext() {
PropertySourceConfiguration.MAP.put("bootstrap.foo", "bar");
this.context = new SpringApplicationBuilder().sources(BareConfiguration.class)
.child(BareConfiguration.class).web(WebApplicationType.NONE).run();
then(this.context.getEnvironment().getProperty("bootstrap.foo")).isEqualTo("bar");
then(this.context.getParent().getEnvironment())
.isNotSameAs(this.context.getEnvironment());
then(this.context.getEnvironment().getPropertySources().contains(
PropertySourceBootstrapConfiguration.BOOTSTRAP_PROPERTY_SOURCE_NAME
+ "-testBootstrap")).isTrue();
then(((ConfigurableEnvironment) this.context.getParent().getEnvironment())
.getPropertySources().contains(
PropertySourceBootstrapConfiguration.BOOTSTRAP_PROPERTY_SOURCE_NAME
+ "-testBootstrap")).isTrue();
}
@Test
public void typelessMessageToPojoInboundContentTypeBindingJson() {
ApplicationContext context = new SpringApplicationBuilder(
TypelessMessageToPojoStreamListener.class).web(WebApplicationType.NONE)
.run("--spring.cloud.stream.bindings.input.contentType=application/json",
"--spring.jmx.enabled=false");
InputDestination source = context.getBean(InputDestination.class);
OutputDestination target = context.getBean(OutputDestination.class);
String jsonPayload = "{\"name\":\"oleg\"}";
source.send(new GenericMessage<>(jsonPayload.getBytes()));
Message<byte[]> outputMessage = target.receive();
assertThat(outputMessage.getHeaders().get(MessageHeaders.CONTENT_TYPE))
.isEqualTo(MimeTypeUtils.APPLICATION_JSON);
assertThat(new String(outputMessage.getPayload(), StandardCharsets.UTF_8))
.isEqualTo(jsonPayload);
}
private ConfigurableApplicationContext multipleStream() {
System.setProperty("logging.level.org.apache.kafka", "OFF");
SpringApplication app = new SpringApplication(AnotherKStreamApplication.class);
app.setWebApplicationType(WebApplicationType.NONE);
return app.run("--server.port=0", "--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.destination=in",
"--spring.cloud.stream.bindings.output.destination=out",
"--spring.cloud.stream.bindings.input2.destination=in2",
"--spring.cloud.stream.bindings.output2.destination=out2",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde="
+ "org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde="
+ "org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId="
+ "ApplicationHealthTest-xyz",
"--spring.cloud.stream.kafka.streams.bindings.input2.consumer.applicationId="
+ "ApplicationHealthTest2-xyz",
"--spring.cloud.stream.kafka.streams.binder.brokers="
+ embeddedKafka.getBrokersAsString());
}
@Test
public void testSimpleFunctionWithNativeProperty() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(NoEnableBindingConfiguration.class))
.web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false", "--spring.cloud.function.definition=func")) {
InputDestination inputDestination = context.getBean(InputDestination.class);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
inputDestination.send(inputMessage);
Message<byte[]> outputMessage = outputDestination.receive();
assertThat(outputMessage.getPayload()).isEqualTo("Hello".getBytes());
}
}
@Test
public void partitionOnOutputPayloadTestReactive() {
System.clearProperty("spring.cloud.function.definition");
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
.getCompleteConfiguration(PojoFunctionConfiguration.class))
.web(WebApplicationType.NONE).run("--spring.cloud.function.definition=funcReactive",
"--spring.cloud.stream.bindings.funcReactive-out-0.producer.partitionKeyExpression=payload.id",
"--spring.cloud.stream.bindings.funcReactive-out-0.producer.partitionCount=5",
"--spring.jmx.enabled=false")) {
InputDestination inputDestination = context.getBean(InputDestination.class);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message<byte[]> inputMessage = MessageBuilder.withPayload("Jim Lahey".getBytes()).build();
inputDestination.send(inputMessage, "funcReactive-in-0");
assertThat(outputDestination.receive(100, "funcReactive-out-0").getHeaders().get("scst_partition")).isEqualTo(3);
assertThat(outputDestination.receive(100)).isNull();
}
}
@Test
public void mappingRepo() throws IOException {
String defaultRepoUri = ConfigServerTestUtils.prepareLocalRepo("config-repo");
String test1RepoUri = ConfigServerTestUtils.prepareLocalRepo("test1-config-repo");
Map<String, Object> repoMapping = new LinkedHashMap<String, Object>();
repoMapping.put("spring.cloud.config.server.git.repos[test1].pattern", "*test1*");
repoMapping.put("spring.cloud.config.server.git.repos[test1].uri", test1RepoUri);
this.context = new SpringApplicationBuilder(TestConfiguration.class)
.web(WebApplicationType.NONE)
.properties("spring.cloud.config.server.git.uri:" + defaultRepoUri)
.properties(repoMapping).run();
EnvironmentRepository repository = this.context
.getBean(EnvironmentRepository.class);
Environment environment = repository.findOne("test1-svc", "staging", "master");
assertThat(environment.getPropertySources().size()).isEqualTo(2);
}
@Test
public void typelessToPojoInboundContentTypeBindingJson() {
ApplicationContext context = new SpringApplicationBuilder(
TypelessToPojoStreamListener.class).web(WebApplicationType.NONE).run(
"--spring.cloud.stream.bindings.input.contentType=application/json",
"--spring.jmx.enabled=false");
InputDestination source = context.getBean(InputDestination.class);
OutputDestination target = context.getBean(OutputDestination.class);
String jsonPayload = "{\"name\":\"oleg\"}";
source.send(new GenericMessage<>(jsonPayload.getBytes()));
Message<byte[]> outputMessage = target.receive();
assertThat(outputMessage.getHeaders().get(MessageHeaders.CONTENT_TYPE))
.isEqualTo(MimeTypeUtils.APPLICATION_JSON);
assertThat(new String(outputMessage.getPayload(), StandardCharsets.UTF_8))
.isEqualTo(jsonPayload);
}
@Test
public void testSingleInputMultiOutput() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
ReactiveFunctionConfiguration.class))
.web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false",
"--spring.cloud.function.definition=singleInputMultipleOutputs")) {
context.getBean(InputDestination.class);
InputDestination inputDestination = context.getBean(InputDestination.class);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
for (int i = 0; i < 10; i++) {
inputDestination.send(MessageBuilder.withPayload(String.valueOf(i).getBytes()).build(), "singleInputMultipleOutputs-in-0");
}
int counter = 0;
for (int i = 0; i < 5; i++) {
Message<byte[]> even = outputDestination.receive(0, "singleInputMultipleOutputs-out-0");
assertThat(even.getPayload()).isEqualTo(("EVEN: " + String.valueOf(counter++)).getBytes());
Message<byte[]> odd = outputDestination.receive(0, "singleInputMultipleOutputs-out-1");
assertThat(odd.getPayload()).isEqualTo(("ODD: " + String.valueOf(counter++)).getBytes());
}
}
}
@Test
public void testReactiveFunctionWithOutputAsMonoVoid() {
System.clearProperty("spring.cloud.function.definition");
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(FunctionalConsumerConfiguration.class))
.web(WebApplicationType.NONE).run("--spring.jmx.enabled=false")) {
assertThat(context.containsBean("funcConsumer-out-0")).isFalse();
InputDestination inputDestination = context.getBean(InputDestination.class);
Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
inputDestination.send(inputMessage);
assertThat(System.getProperty("consumer")).isEqualTo("Hello");
System.clearProperty("consumer");
}
}
@SuppressWarnings("rawtypes")
@Test
public void testSpecificCustomRetryTemplate() throws Exception {
ApplicationContext context = new SpringApplicationBuilder(
SpecificCustomRetryTemplateConfiguration.class)
.web(WebApplicationType.NONE).run("--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.consumer.retry-template-name=retryTemplateTwo");
RetryTemplate retryTemplateTwo = context.getBean("retryTemplateTwo",
RetryTemplate.class);
BindingServiceProperties bindingServiceProperties = context
.getBean(BindingServiceProperties.class);
ConsumerProperties consumerProperties = bindingServiceProperties
.getConsumerProperties("input");
AbstractBinder binder = context.getBean(AbstractBinder.class);
Method m = AbstractBinder.class.getDeclaredMethod("buildRetryTemplate",
ConsumerProperties.class);
m.setAccessible(true);
RetryTemplate retryTemplate = (RetryTemplate) m.invoke(binder,
consumerProperties);
assertThat(retryTemplate).isEqualTo(retryTemplateTwo);
}
@Test
public void testMetricsWorkWithMultiBinders() {
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(
SimpleApplication.class).web(WebApplicationType.NONE).run(
"--spring.cloud.stream.bindings.input.destination=foo",
"--spring.cloud.stream.bindings.input.binder=inbound",
"--spring.cloud.stream.bindings.input.group=testGroupabc",
"--spring.cloud.stream.binders.inbound.type=kafka",
"--spring.cloud.stream.binders.inbound.environment"
+ ".spring.cloud.stream.kafka.binder.brokers" + "="
+ embeddedKafka.getEmbeddedKafka().getBrokersAsString());
final MeterRegistry meterRegistry = applicationContext.getBean(MeterRegistry.class);
assertThat(meterRegistry).isNotNull();
assertThat(meterRegistry.get("spring.cloud.stream.binder.kafka.offset")
.tag("group", "testGroupabc")
.tag("topic", "foo").gauge().value()).isNotNull();
applicationContext.close();
}
@Test
public void nonExistentKeystoreLocationShouldNotBeAllowed() {
try {
new SpringApplicationBuilder(EncryptionBootstrapConfiguration.class)
.web(WebApplicationType.NONE)
.properties("encrypt.key-store.location:classpath:/server.jks1",
"encrypt.key-store.password:letmein",
"encrypt.key-store.alias:mytestkey",
"encrypt.key-store.secret:changeme")
.run();
then(false).as(
"Should not create an application context with invalid keystore location")
.isTrue();
}
catch (Exception e) {
then(e).hasRootCauseInstanceOf(IllegalStateException.class);
}
}
@Test
public void stringToPojoInboundContentTypeBinding() {
ApplicationContext context = new SpringApplicationBuilder(
StringToPojoStreamListener.class).web(WebApplicationType.NONE).run(
"--spring.cloud.stream.bindings.input.contentType=text/plain",
"--spring.jmx.enabled=false");
InputDestination source = context.getBean(InputDestination.class);
OutputDestination target = context.getBean(OutputDestination.class);
String jsonPayload = "{\"name\":\"oleg\"}";
source.send(new GenericMessage<>(jsonPayload.getBytes()));
Message<byte[]> outputMessage = target.receive();
assertThat(outputMessage.getHeaders().get(MessageHeaders.CONTENT_TYPE))
.isEqualTo(MimeTypeUtils.APPLICATION_JSON);
assertThat(new String(outputMessage.getPayload(), StandardCharsets.UTF_8))
.isEqualTo(jsonPayload);
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
SpringApplication springApp = new SpringApplication(IntegrationTestApp.class);
springApp.setWebApplicationType(WebApplicationType.NONE);
springApp.setDefaultProperties(ImmutableMap.of("spring.main.allow-bean-definition-overriding", "true"));
try {
ConfigurableApplicationContext context = springApp.run(args);
LOG.info("Closing Spring test context.");
context.close();
LOG.info("Tests have been done successfully: {} milliseconds", System.currentTimeMillis() - start);
System.exit(0);
} catch (Exception e) {
LOG.error("Exception occurred during closing Spring Context: {}", e.getMessage(), e);
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
threadSet.stream().forEach(t -> LOG.info("Running threads: {}", t.getName()));
System.exit(1);
}
}
@Test
public void testSendingMessageToOutputOfExistingSupplier() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
.getCompleteConfiguration(TestConfiguration.class))
.web(WebApplicationType.NONE).run("--spring.cloud.stream.source=supplier;foo",
"--spring.jmx.enabled=false")) {
StreamBridge bridge = context.getBean(StreamBridge.class);
bridge.send("supplier-out-0", "blah");
bridge.send("foo-out-0", "b");
OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message<byte[]> message = outputDestination.receive(100, "foo-out-0");
assertThat(new String(message.getPayload())).isEqualTo("b");
message = outputDestination.receive(100, "supplier-out-0");
assertThat(new String(message.getPayload())).isEqualTo("hello");
message = outputDestination.receive(100, "supplier-out-0");
assertThat(new String(message.getPayload())).isEqualTo("blah");
}
}
@Test
public void contentTypeAsByteArrayTest() {
System.clearProperty("spring.cloud.function.definition");
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
.getCompleteConfiguration(PojoFunctionConfiguration.class))
.web(WebApplicationType.NONE).run("--spring.cloud.function.definition=echoPerson",
"--spring.jmx.enabled=false")) {
InputDestination inputDestination = context.getBean(InputDestination.class);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message<byte[]> inputMessage = MessageBuilder.withPayload("{\"name\":\"Jim Lahey\",\"id\":420}".getBytes())
.setHeader(MessageHeaders.CONTENT_TYPE, "application/json".getBytes(StandardCharsets.UTF_8))
.build();
inputDestination.send(inputMessage, "echoPerson-in-0");
assertThat(outputDestination.receive(100, "echoPerson-out-0").getPayload()).isEqualTo("{\"name\":\"Jim Lahey\",\"id\":420}".getBytes());
}
}
@Test
public void testWithContextTypeApplicationProperty() {
System.clearProperty("spring.cloud.function.definition");
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(SingleFunctionConfiguration.class))
.web(WebApplicationType.NONE).run("--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.content-type=text/plain", "--debug")) {
InputDestination inputDestination = context.getBean(InputDestination.class);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message<byte[]> inputMessageOne = MessageBuilder.withPayload("Hello".getBytes()).build();
Message<byte[]> inputMessageTwo = MessageBuilder.withPayload("Hello Again".getBytes()).build();
inputDestination.send(inputMessageOne);
inputDestination.send(inputMessageTwo);
Message<byte[]> outputMessage = outputDestination.receive();
assertThat(outputMessage.getPayload()).isEqualTo("Hello".getBytes());
outputMessage = outputDestination.receive();
assertThat(outputMessage.getPayload()).isEqualTo("Hello Again".getBytes());
}
}
@Test
public void testMultiInputSingleOutputWithCustomContentType() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
ContentTypeConfiguration.class))
.web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false",
"--spring.cloud.function.definition=multiInputSingleOutput",
"--spring.cloud.stream.bindings.multiInputSingleOutput-in-0.content-type=string/person",
"--spring.cloud.stream.bindings.multiInputSingleOutput-in-1.content-type=string/employee")) {
context.getBean(InputDestination.class);
InputDestination inputDestination = context.getBean(InputDestination.class);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message<byte[]> stringInputMessage = MessageBuilder.withPayload("ricky".getBytes()).build();
Message<byte[]> integerInputMessage = MessageBuilder.withPayload("bobby".getBytes()).build();
inputDestination.send(stringInputMessage, "multiInputSingleOutput-in-0");
inputDestination.send(integerInputMessage, "multiInputSingleOutput-in-1");
Message<byte[]> outputMessage = outputDestination.receive(1000, "multiInputSingleOutput-out-0");
assertThat(outputMessage.getPayload()).isEqualTo("RICKY".getBytes());
outputMessage = outputDestination.receive(1000, "multiInputSingleOutput-out-0");
assertThat(outputMessage.getPayload()).isEqualTo("BOBBY".getBytes());
}
}
@Test
public void nestedPropertySource() {
// Prepare context
Map<String, Object> props = new HashMap<>();
props.put("spring.data.mongodb.database", "testdb");
context = new SpringApplicationBuilder(TestConfiguration.class).web(WebApplicationType.NONE).properties(props).run();
// Prepare test
MongoTemplate mongoTemplate = this.context.getBean(MongoTemplate.class);
mongoTemplate.dropCollection("testapp");
MongoPropertySource ps = new MongoPropertySource();
Map<String, String> inner = new HashMap<String, String>();
inner.put("inner", "value");
ps.getSource().put("outer", inner);
mongoTemplate.save(ps, "testapp");
// Test
EnvironmentRepository repository = this.context.getBean(EnvironmentRepository.class);
Environment environment = repository.findOne("testapp", "default", null);
assertEquals("testapp-default", environment.getPropertySources().get(0).getName());
assertEquals(1, environment.getPropertySources().size());
assertEquals(true, environment.getPropertySources().get(0).getSource().containsKey("outer.inner"));
assertEquals("value", environment.getPropertySources().get(0).getSource().get("outer.inner"));
}
@Test
public void testSimpleFunctionWithStreamProperty() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(NoEnableBindingConfiguration.class))
.web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false", "--spring.cloud.function.definition=func")) {
InputDestination inputDestination = context.getBean(InputDestination.class);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
inputDestination.send(inputMessage);
Message<byte[]> outputMessage = outputDestination.receive();
assertThat(outputMessage.getPayload()).isEqualTo("Hello".getBytes());
}
}
@Test
public void testFiniteFluxSupplierSimple() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(FunctionsConfiguration.class,
SimpleFluxSupplierConfiguration.class)).web(WebApplicationType.NONE).run(
"--spring.cloud.stream.function.definition=simpleStreamSupplier",
"--spring.jmx.enabled=false")) {
OutputDestination target = context.getBean(OutputDestination.class);
assertThat(new String(target.receive(2000).getPayload())).isEqualTo("1");
assertThat(new String(target.receive(2000).getPayload())).isEqualTo("2");
assertThat(new String(target.receive(2000).getPayload())).isEqualTo("3");
assertThat(new String(target.receive(2000).getPayload())).isEqualTo("4");
assertThat(new String(target.receive(2000).getPayload())).isEqualTo("5");
assertThat(new String(target.receive(2000).getPayload())).isEqualTo("6");
//assertThat(context.getBean("supplierInitializer")).isNotEqualTo(null);
}
}
@Test
public void partitionOnOutputPayloadWithReactiveSupplierTest() {
System.clearProperty("spring.cloud.function.definition");
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
.getCompleteConfiguration(PojoFunctionConfiguration.class))
.web(WebApplicationType.NONE).run("--spring.cloud.function.definition=reactivePersonSupplier",
"--spring.cloud.stream.bindings.reactivePersonSupplier-out-0.producer.partitionKeyExpression=payload.id",
"--spring.cloud.stream.bindings.reactivePersonSupplier-out-0.producer.partitionCount=5",
"--spring.jmx.enabled=false")) {
OutputDestination outputDestination = context.getBean(OutputDestination.class);
assertThat(outputDestination.receive(1000, "reactivePersonSupplier-out-0").getHeaders().get("scst_partition")).isEqualTo(1);
assertThat(outputDestination.receive(100)).isNull();
}
}
@Test
public void includeProfileFromBootstrapPropertySource() {
PropertySourceConfiguration.MAP.put("spring.profiles.include", "bar,baz");
this.context = new SpringApplicationBuilder().web(WebApplicationType.NONE)
.profiles("foo").sources(BareConfiguration.class).run();
then(this.context.getEnvironment().acceptsProfiles("baz")).isTrue();
then(this.context.getEnvironment().acceptsProfiles("bar")).isTrue();
}
@Test
public void failsOnPropertySource() {
System.setProperty("expected.fail", "true");
this.expected.expectMessage("Planned");
this.context = new SpringApplicationBuilder().web(WebApplicationType.NONE)
.sources(BareConfiguration.class).run();
}
@Test
public void keysComputedWhenAdded() throws Exception {
this.context = new SpringApplicationBuilder(Empty.class)
.web(WebApplicationType.NONE).bannerMode(Mode.OFF)
.properties("spring.cloud.bootstrap.name:none").run();
RefreshScope scope = new RefreshScope();
scope.setApplicationContext(this.context);
this.context.getEnvironment().setActiveProfiles("local");
ContextRefresher contextRefresher = new ContextRefresher(this.context, scope);
RefreshEndpoint endpoint = new RefreshEndpoint(contextRefresher);
Collection<String> keys = endpoint.refresh();
then(keys.contains("added")).isTrue().as("Wrong keys: " + keys);
}
@Test
public void symmetricConfigurationProperties() {
ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestConfiguration.class).web(WebApplicationType.NONE).properties(
"encrypt.key:pie",
"foo.password:{cipher}bf29452295df354e6153c5b31b03ef23c70e55fba24299aa85c63438f1c43c95")
.run();
then(context.getBean(PasswordProperties.class).getPassword()).isEqualTo("test");
}
public static ConfigurableApplicationContext initializeFullIntegrationWebApp(Set<Class> coreBean, Set<Class> apiBean) {
ConfigurableApplicationContext root = initializeGemini(coreBean.toArray(new Class[0]));
ConfigurableApplicationContext webApp = new SpringApplicationBuilder()
.parent(root).sources(Api.class, Autoconfiguration.class).sources(apiBean.toArray(new Class[0]))
.web(WebApplicationType.SERVLET)
.bannerMode(Banner.Mode.OFF)
.run();
return webApp;
}
@Test
public void testKstreamWordCountFunctionWithCustomProducerStreamPartitioner() throws Exception {
SpringApplication app = new SpringApplication(WordCountProcessorApplication.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext context = app.run(
"--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.process-in-0.destination=words-2",
"--spring.cloud.stream.bindings.process-out-0.destination=counts-2",
"--spring.cloud.stream.bindings.process-out-0.producer.partitionCount=2",
"--spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName" +
"=streamPartitioner",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde" +
"=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde" +
"=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
try {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("words-2");
template.sendDefault("foo");
ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer, "counts-2");
assertThat(cr.value().contains("\"word\":\"foo\",\"count\":1")).isTrue();
assertThat(cr.partition() == 0) .isTrue();
template.sendDefault("bar");
cr = KafkaTestUtils.getSingleRecord(consumer, "counts-2");
assertThat(cr.value().contains("\"word\":\"bar\",\"count\":1")).isTrue();
assertThat(cr.partition() == 1) .isTrue();
}
finally {
pf.destroy();
}
}
}
@Test
public void byteArrayToByteArray() {
ApplicationContext context = new SpringApplicationBuilder(
ByteArrayToByteArrayStreamListener.class).web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false");
InputDestination source = context.getBean(InputDestination.class);
OutputDestination target = context.getBean(OutputDestination.class);
String jsonPayload = "{\"name\":\"oleg\"}";
source.send(new GenericMessage<>(jsonPayload.getBytes()));
Message<byte[]> outputMessage = target.receive();
assertThat(new String(outputMessage.getPayload(), StandardCharsets.UTF_8))
.isEqualTo(jsonPayload);
}
@Test
public void testServiceDefinitionToConsulRegistration() throws Exception {
final ZookeeperServer server = new ZookeeperServer(temporaryFolder.newFolder(testName.getMethodName()));
ConfigurableApplicationContext context = new SpringApplicationBuilder(TestConfiguration.class)
.web(WebApplicationType.NONE)
.run(
"--debug=false",
"--spring.main.banner-mode=OFF",
"--spring.application.name=" + UUID.randomUUID().toString(),
"--ribbon.enabled=false",
"--ribbon.eureka.enabled=false",
"--management.endpoint.enabled=false",
"--spring.cloud.zookeeper.enabled=true",
"--spring.cloud.zookeeper.connect-string=" + server.connectString(),
"--spring.cloud.zookeeper.config.enabled=false",
"--spring.cloud.zookeeper.discovery.enabled=true",
"--spring.cloud.service-registry.auto-registration.enabled=false"
);
try {
Map<String, Converter> converters = context.getBeansOfType(Converter.class);
assertThat(converters).isNotNull();
assertThat(converters.values().stream().anyMatch(ServiceDefinitionToZookeeperRegistration.class::isInstance)).isTrue();
} finally {
// shutdown spring context
context.close();
// shutdown zookeeper
server.shutdown();
}
}