下面列出了org.springframework.boot.SpringApplication#setWebApplicationType ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testKstreamWordCountWithApplicationIdSpecifiedAtDefaultConsumer()
throws Exception {
SpringApplication app = new SpringApplication(
WordCountProcessorApplication.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.destination=words",
"--spring.cloud.stream.bindings.output.destination=counts",
"--spring.cloud.stream.kafka.streams.default.consumer.application-id=testKstreamWordCountWithApplicationIdSpecifiedAtDefaultConsumer",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde"
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde"
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.timeWindow.length=5000",
"--spring.cloud.stream.kafka.streams.timeWindow.advanceBy=0",
"--spring.cloud.stream.kafka.binder.brokers="
+ embeddedKafka.getBrokersAsString())) {
receiveAndValidate("words", "counts");
}
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
SpringApplication springApp = new SpringApplication(IntegrationTestApp.class);
springApp.setWebApplicationType(WebApplicationType.NONE);
springApp.setDefaultProperties(ImmutableMap.of("spring.main.allow-bean-definition-overriding", "true"));
try {
ConfigurableApplicationContext context = springApp.run(args);
LOG.info("Closing Spring test context.");
context.close();
LOG.info("Tests have been done successfully: {} milliseconds", System.currentTimeMillis() - start);
System.exit(0);
} catch (Exception e) {
LOG.error("Exception occurred during closing Spring Context: {}", e.getMessage(), e);
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
threadSet.stream().forEach(t -> LOG.info("Running threads: {}", t.getName()));
System.exit(1);
}
}
@Test
public void testStreamToTable() {
SpringApplication app = new SpringApplication(CountClicksPerRegionApplication.class);
app.setWebApplicationType(WebApplicationType.NONE);
Consumer<String, Long> consumer;
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group-1",
"false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
DefaultKafkaConsumerFactory<String, Long> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "output-topic-1");
runTest(app, consumer);
}
public static void main(String[] args) {
final SpringApplication application = new SpringApplication(KafkaStreamsApp.class);
// we don't need web/rest interface in this app
// but CF has default health check that will hit http port
// https://docs.cloudfoundry.org/devguide/deploy-apps/healthchecks.html#understand-healthchecks
application.setWebApplicationType(WebApplicationType.SERVLET);
application.run(args);
}
private SpringApplication springApplication() {
Class<?> sourceClass = this.configurationClass;
SpringApplication application = new org.springframework.cloud.function.context.FunctionalSpringApplication(
sourceClass);
application.setWebApplicationType(WebApplicationType.NONE);
return application;
}
public static void main(String[] args) {
SpringApplication application = new SpringApplication(RpcServerTest.class);
application.setWebApplicationType(WebApplicationType.NONE);
application.run(args);
// make server keep running
synchronized (RpcServerTest.class) {
try {
RpcServerTest.class.wait();
} catch (Throwable e) {
}
}
}
@Test
public void testFind() throws Exception {
log.debug( "testFind : --spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString() );
log.debug( "testFind : --spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString() );
SpringApplication app = new SpringApplication( Application.class );
app.setWebApplicationType( WebApplicationType.NONE );
ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.cloud.service-registry.auto-registration.enabled=false",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.destination=board-events",
"--spring.cloud.stream.bindings.output.destination=board-events",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.bindings.output.producer.headerMode=raw",
"--spring.cloud.stream.bindings.input.consumer.headerMode=raw",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString(),
"--spring.profiles.active=kafka",
"--spring.jackson.serialization.write_dates_as_timestamps=false",
"--logger.level.io.pivotal.dmfrey=DEBUG");
try {
receiveAndValidateBoard( context );
} finally {
context.close();
}
}
@Test
public void testKstreamWordCountWithStringInputAndPojoOuput() throws Exception {
SpringApplication app = new SpringApplication(
WordCountProcessorApplication.class);
app.setWebApplicationType(WebApplicationType.NONE);
ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.destination=words1,words2",
"--spring.cloud.stream.bindings.output.destination=counts",
"--spring.cloud.stream.bindings.output.contentType=application/json",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde"
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde"
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.timeWindow.length=5000",
"--spring.cloud.stream.kafka.streams.timeWindow.advanceBy=0",
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId"
+ "=WordCountProcessorApplication-xyz",
"--spring.cloud.stream.kafka.streams.binder.brokers="
+ embeddedKafka.getBrokersAsString());
try {
receiveAndValidate();
}
finally {
context.close();
}
}
@Test
public void testKstreamStateStoreBuilderBeansDefinedInApplication() throws Exception {
SpringApplication app = new SpringApplication(StateStoreBeanApplication.class);
app.setWebApplicationType(WebApplicationType.NONE);
ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input3.destination=foobar",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde"
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde"
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.bindings.input3.consumer.applicationId"
+ "=KafkaStreamsStateStoreIntegrationTests-xyzabc-123",
"--spring.cloud.stream.kafka.streams.binder.brokers="
+ embeddedKafka.getBrokersAsString());
try {
Thread.sleep(2000);
receiveAndValidateFoo(context, StateStoreBeanApplication.class);
}
catch (Exception e) {
throw e;
}
finally {
context.close();
}
}
@Test
public void testKstreamStateStore() throws Exception {
SpringApplication app = new SpringApplication(ProductCountApplication.class);
app.setWebApplicationType(WebApplicationType.NONE);
ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.destination=foobar",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde"
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde"
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId"
+ "=KafkaStreamsStateStoreIntegrationTests-abc",
"--spring.cloud.stream.kafka.streams.binder.brokers="
+ embeddedKafka.getBrokersAsString());
try {
Thread.sleep(2000);
receiveAndValidateFoo(context, ProductCountApplication.class);
}
catch (Exception e) {
throw e;
}
finally {
context.close();
}
}
@BeforeEach
@Override
public void setUp() throws Exception {
super.setUp();
SpringApplication application = new SpringApplication(TestClientApplication.class);
application.setWebApplicationType(WebApplicationType.REACTIVE);
instance = application.run("--spring.application.name=Test-Client", "--server.port=0",
"--management.endpoints.web.base-path=/mgmt", "--endpoints.health.enabled=true",
"--spring.boot.admin.client.url=" + wireMock.url("/"));
}
public static void main(String[] args) {
((ch.qos.logback.classic.Logger) logger).setLevel(ch.qos.logback.classic.Level.TRACE);
SpringApplication app = new SpringApplication(ScooldServer.class);
initConfig();
app.setAdditionalProfiles(Config.ENVIRONMENT);
app.setWebApplicationType(WebApplicationType.SERVLET);
app.run(args);
}
/**
* This is the initializing method when running ParaServer as executable JAR (or WAR),
* from the command line: java -jar para.jar.
* @param args command line arguments array (same as those in {@code void main(String[] args)} )
* @param sources the application classes that will be scanned
*/
public static void runAsJAR(String[] args, Class<?>... sources) {
// entry point (JAR)
SpringApplication app = new SpringApplication(sources);
app.setAdditionalProfiles(Config.ENVIRONMENT);
app.setWebApplicationType(WebApplicationType.SERVLET);
app.setBannerMode(Banner.Mode.OFF);
if (Config.getConfigBoolean("pidfile_enabled", true)) {
app.addListeners(new ApplicationPidFileWriter(Config.PARA + "_" + getServerPort() + ".pid"));
}
initialize(getCoreModules());
app.run(args);
}
public static void main(String[] args) {
SpringApplication application = new SpringApplication(FamousMatchApplication.class);
// application.addListeners(new SchedulerConfig());
application.setWebApplicationType(WebApplicationType.NONE);
application.run(args);
}
public static void main(String[] args) {
SpringApplication app = new SpringApplication(WebfluxTemplateApplication.class);
app.setWebApplicationType(WebApplicationType.REACTIVE);
app.run(args);
}
@Test
public void testKstreamWordCountFunction() throws NoSuchMethodException {
SpringApplication app = new SpringApplication(SerdeProvidedAsBeanApp.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext context = app.run(
"--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.process-in-0.destination=purchases",
"--spring.cloud.stream.bindings.process-out-0.destination=coffee",
"--spring.cloud.stream.kafka.streams.binder.functions.process.applicationId=process-id-0",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde" +
"=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde" +
"=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
final Method method = SerdeProvidedAsBeanApp.class.getMethod("process");
ResolvableType resolvableType = ResolvableType.forMethodReturnType(method, SerdeProvidedAsBeanApp.class);
final KeyValueSerdeResolver keyValueSerdeResolver = context.getBean(KeyValueSerdeResolver.class);
final BindingServiceProperties bindingServiceProperties = context.getBean(BindingServiceProperties.class);
final KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties = context.getBean(KafkaStreamsExtendedBindingProperties.class);
final ConsumerProperties consumerProperties = bindingServiceProperties.getBindingProperties("process-in-0").getConsumer();
final KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties = kafkaStreamsExtendedBindingProperties.getExtendedConsumerProperties("input");
kafkaStreamsExtendedBindingProperties.getExtendedConsumerProperties("input");
final Serde<?> inboundValueSerde = keyValueSerdeResolver.getInboundValueSerde(consumerProperties, kafkaStreamsConsumerProperties, resolvableType.getGeneric(0));
Assert.isTrue(inboundValueSerde instanceof FooSerde, "Inbound Value Serde is not matched");
final ProducerProperties producerProperties = bindingServiceProperties.getBindingProperties("process-out-0").getProducer();
final KafkaStreamsProducerProperties kafkaStreamsProducerProperties = kafkaStreamsExtendedBindingProperties.getExtendedProducerProperties("output");
kafkaStreamsExtendedBindingProperties.getExtendedProducerProperties("output");
final Serde<?> outboundValueSerde = keyValueSerdeResolver.getOutboundValueSerde(producerProperties, kafkaStreamsProducerProperties, resolvableType.getGeneric(1));
Assert.isTrue(outboundValueSerde instanceof FooSerde, "Outbound Value Serde is not matched");
}
}
public static void main(String[] args) {
SpringApplication application = new SpringApplication(ReleaserApplication.class);
application.setWebApplicationType(WebApplicationType.NONE);
application.run(args);
}
@Test
public void testMultiFunctionsInSameApp() throws InterruptedException {
SpringApplication app = new SpringApplication(MultipleFunctionsInSameApp.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext context = app.run(
"--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.function.definition=process;analyze;anotherProcess",
"--spring.cloud.stream.bindings.process-in-0.destination=purchases",
"--spring.cloud.stream.bindings.process-out-0.destination=coffee",
"--spring.cloud.stream.bindings.process-out-1.destination=electronics",
"--spring.cloud.stream.bindings.analyze-in-0.destination=coffee",
"--spring.cloud.stream.bindings.analyze-in-1.destination=electronics",
"--spring.cloud.stream.kafka.streams.binder.functions.analyze.applicationId=analyze-id-0",
"--spring.cloud.stream.kafka.streams.binder.functions.process.applicationId=process-id-0",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.bindings.process-in-0.consumer.concurrency=2",
"--spring.cloud.stream.bindings.analyze-in-0.consumer.concurrency=1",
"--spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads=3",
"--spring.cloud.stream.kafka.streams.binder.functions.process.configuration.client.id=process-client",
"--spring.cloud.stream.kafka.streams.binder.functions.analyze.configuration.client.id=analyze-client",
"--spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.configuration.client.id=anotherProcess-client",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
receiveAndValidate("purchases", "coffee", "electronics");
StreamsBuilderFactoryBean processStreamsBuilderFactoryBean = context
.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
StreamsBuilderFactoryBean analyzeStreamsBuilderFactoryBean = context
.getBean("&stream-builder-analyze", StreamsBuilderFactoryBean.class);
StreamsBuilderFactoryBean anotherProcessStreamsBuilderFactoryBean = context
.getBean("&stream-builder-anotherProcess", StreamsBuilderFactoryBean.class);
final Properties processStreamsConfiguration = processStreamsBuilderFactoryBean.getStreamsConfiguration();
final Properties analyzeStreamsConfiguration = analyzeStreamsBuilderFactoryBean.getStreamsConfiguration();
final Properties anotherProcessStreamsConfiguration = anotherProcessStreamsBuilderFactoryBean.getStreamsConfiguration();
assertThat(processStreamsConfiguration.getProperty("client.id")).isEqualTo("process-client");
assertThat(analyzeStreamsConfiguration.getProperty("client.id")).isEqualTo("analyze-client");
Integer concurrency = (Integer) processStreamsConfiguration.get(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
assertThat(concurrency).isEqualTo(2);
concurrency = (Integer) analyzeStreamsConfiguration.get(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
assertThat(concurrency).isEqualTo(1);
assertThat(anotherProcessStreamsConfiguration.get(StreamsConfig.NUM_STREAM_THREADS_CONFIG)).isEqualTo("3");
}
}
@Test
@Ignore
public void testPerRecordAvroConentTypeAndVerifySerialization() throws Exception {
SpringApplication app = new SpringApplication(SensorCountAvroApplication.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext ignored = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.consumer.useNativeDecoding=false",
"--spring.cloud.stream.bindings.output.producer.useNativeEncoding=false",
"--spring.cloud.stream.bindings.input.destination=sensors",
"--spring.cloud.stream.bindings.output.destination=received-sensors",
"--spring.cloud.stream.bindings.output.contentType=application/avro",
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id=per-record-avro-contentType-test",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.brokers="
+ embeddedKafka.getBrokersAsString())) {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
// Use a custom avro test serializer
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
TestAvroSerializer.class);
DefaultKafkaProducerFactory<Integer, Sensor> pf = new DefaultKafkaProducerFactory<>(
senderProps);
try {
KafkaTemplate<Integer, Sensor> template = new KafkaTemplate<>(pf, true);
Random random = new Random();
Sensor sensor = new Sensor();
sensor.setId(UUID.randomUUID().toString() + "-v1");
sensor.setAcceleration(random.nextFloat() * 10);
sensor.setVelocity(random.nextFloat() * 100);
sensor.setTemperature(random.nextFloat() * 50);
// Send with avro content type set.
Message<?> message = MessageBuilder.withPayload(sensor)
.setHeader("contentType", "application/avro").build();
template.setDefaultTopic("sensors");
template.send(message);
// Serialized byte[] ^^ is received by the binding process and deserialzed
// it using avro converter.
// Then finally, the data will be output to a return topic as byte[]
// (using the same avro converter).
// Receive the byte[] from return topic
ConsumerRecord<String, byte[]> cr = KafkaTestUtils
.getSingleRecord(consumer, "received-sensors");
final byte[] value = cr.value();
// Convert the byte[] received back to avro object and verify that it is
// the same as the one we sent ^^.
AvroSchemaMessageConverter avroSchemaMessageConverter = new AvroSchemaMessageConverter();
Message<?> receivedMessage = MessageBuilder.withPayload(value)
.setHeader("contentType",
MimeTypeUtils.parseMimeType("application/avro"))
.build();
Sensor messageConverted = (Sensor) avroSchemaMessageConverter
.fromMessage(receivedMessage, Sensor.class);
assertThat(messageConverted).isEqualTo(sensor);
}
finally {
pf.destroy();
}
}
}
public static void main(String[] args) {
SpringApplication app = new SpringApplication(PushServerApp.class);
app.setWebApplicationType(WebApplicationType.NONE);
app.run(args);
}