下面列出了org.springframework.boot.actuate.info.InfoContributor#org.apache.camel.CamelContext 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testNettySecureTcpSocket() throws Exception {
CamelContext camelctx = new DefaultCamelContext(new JndiBeanRepository());
camelctx.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")
.to("netty:tcp://" + SOCKET_HOST + ":" + SOCKET_PORT + "?textline=true&ssl=true&sslContextParameters=#sslContextParameters");
}
});
camelctx.start();
try {
String result = camelctx.createProducerTemplate().requestBody("direct:start", "Kermit", String.class);
Assert.assertEquals("Hello Kermit", result);
} finally {
camelctx.close();
}
}
@Test
public void testSendToNagiosWarnAsText() throws Exception {
CamelContext camelctx = createCamelContext();
MessagePayload expectedPayload1 = new MessagePayload("localhost", Level.WARNING, camelctx.getName(), "Hello Nagios");
MockEndpoint mock = camelctx.getEndpoint("mock:result", MockEndpoint.class);
mock.expectedMessageCount(1);
mock.expectedBodiesReceived("Hello Nagios");
camelctx.start();
try {
ProducerTemplate template = camelctx.createProducerTemplate();
template.sendBodyAndHeader("direct:start", "Hello Nagios", NagiosConstants.LEVEL, "WARNING");
mock.assertIsSatisfied();
Mockito.verify(nagiosPassiveCheckSender).send(expectedPayload1);
} finally {
camelctx.close();
}
}
@Deployment(resources = { "process/example.bpmn20.xml" })
public void testRunProcess() throws Exception {
CamelContext ctx = applicationContext.getBean(CamelContext.class);
ProducerTemplate tpl = ctx.createProducerTemplate();
service1.expectedBodiesReceived("ala");
Exchange exchange = ctx.getEndpoint("direct:start").createExchange();
exchange.getIn().setBody(Collections.singletonMap("var1", "ala"));
tpl.send("direct:start", exchange);
String instanceId = (String) exchange.getProperty("PROCESS_ID_PROPERTY");
tpl.sendBodyAndProperty("direct:receive", null, ActivitiProducer.PROCESS_ID_PROPERTY, instanceId);
assertProcessEnded(instanceId);
service1.assertIsSatisfied();
Map<?, ?> m = service2.getExchanges().get(0).getIn().getBody(Map.class);
assertEquals("ala", m.get("var1"));
assertEquals("var2", m.get("var2"));
}
@Override
public void customize(CamelContext camelContext) {
if (config.enableRoutePolicy) {
camelContext.addRoutePolicyFactory(new MicroProfileMetricsRoutePolicyFactory());
}
ManagementStrategy managementStrategy = camelContext.getManagementStrategy();
if (config.enableExchangeEventNotifier) {
managementStrategy.addEventNotifier(new MicroProfileMetricsExchangeEventNotifier());
}
if (config.enableRouteEventNotifier) {
managementStrategy.addEventNotifier(new MicroProfileMetricsRouteEventNotifier());
}
if (config.enableCamelContextEventNotifier) {
managementStrategy.addEventNotifier(new MicroProfileMetricsCamelContextEventNotifier());
}
}
@Test
public void testEndpointRouteWithValidCredentials() throws Exception {
deployer.deploy(SIMPLE_WAR);
try {
CamelContext camelctx = new DefaultCamelContext();
camelctx.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")
.to("cxf://" + getEndpointAddress("/simple", "cxfuser", "cxfpassword"));
}
});
camelctx.start();
try {
ProducerTemplate producer = camelctx.createProducerTemplate();
String result = producer.requestBody("direct:start", "Kermit", String.class);
Assert.assertEquals("Hello Kermit", result);
} finally {
camelctx.close();
}
} finally {
deployer.undeploy(SIMPLE_WAR);
}
}
@Deployment(resources = { "process/empty.bpmn20.xml" })
public void testObjectAsStringVariable() throws Exception {
CamelContext ctx = applicationContext.getBean(CamelContext.class);
ProducerTemplate tpl = ctx.createProducerTemplate();
Object expectedObj = new Long(99);
Exchange exchange = ctx.getEndpoint("direct:startEmptyBodyAsString").createExchange();
exchange.getIn().setBody(expectedObj);
tpl.send("direct:startEmptyBodyAsString", exchange);
String instanceId = (String) exchange.getProperty("PROCESS_ID_PROPERTY");
assertProcessEnded(instanceId);
HistoricVariableInstance var = processEngine.getHistoryService().createHistoricVariableInstanceQuery().variableName("camelBody").singleResult();
assertNotNull(var);
assertEquals(expectedObj.toString(), var.getValue().toString());
}
@Test
public void testToFunctionWithExchange() throws Exception {
CamelContext camelctx = createWildFlyCamelContext();
camelctx.start();
try {
CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
Set<String> values = Collections.synchronizedSet(new TreeSet<>());
CountDownLatch latch = new CountDownLatch(3);
Function<Object, Publisher<Exchange>> fun = crs.to("bean:hello");
Flux.just(1, 2, 3)
.flatMap(fun)
.map(e -> e.getOut())
.map(e -> e.getBody(String.class))
.doOnNext(res -> values.add(res))
.doOnNext(res -> latch.countDown())
.subscribe();
Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
Assert.assertEquals(new TreeSet<>(Arrays.asList("Hello 1", "Hello 2", "Hello 3")), values);
} finally {
camelctx.close();
}
}
@Test
public void testSendFile() throws Exception {
File testFile = resolvePath(FTP_ROOT_DIR).resolve("test.txt").toFile();
CamelContext camelctx = new DefaultCamelContext();
camelctx.start();
try {
Endpoint endpoint = camelctx.getEndpoint(getSftpEndpointUri());
Assert.assertFalse(testFile.exists());
camelctx.createProducerTemplate().sendBodyAndHeader(endpoint, "Hello", "CamelFileName", "test.txt");
Assert.assertTrue(testFile.exists());
} finally {
camelctx.close();
FileUtils.deleteDirectory(resolvePath(FTP_ROOT_DIR));
}
}
@Test
public void testMarshal() throws Exception {
CamelContext camelctx = new DefaultCamelContext();
camelctx.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:marshal").marshal("ical").to("mock:result");
}
});
camelctx.start();
try {
ProducerTemplate producer = camelctx.createProducerTemplate();
Calendar calendar = createTestCalendar();
MockEndpoint mock = camelctx.getEndpoint("mock:result", MockEndpoint.class);
mock.expectedBodiesReceived(calendar.toString());
producer.sendBody("direct:marshal", calendar);
mock.assertIsSatisfied();
} finally {
camelctx.close();
}
}
@Test
public void testRandomConversion() throws Exception {
CamelContext camelctx = new DefaultCamelContext();
camelctx.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")
.setBody().simple("${random(500)}");
}
});
camelctx.start();
try {
ProducerTemplate producer = camelctx.createProducerTemplate();
Integer result = producer.requestBody("direct:start", null, Integer.class);
Assert.assertNotNull(result);
Assert.assertTrue(0 < result);
} finally {
camelctx.close();
}
}
public static void main(String args[]) throws Exception {
// create CamelContext
CamelContext context = new DefaultCamelContext();
// add our route to the CamelContext
context.addRoutes(new RouteBuilder() {
public void configure() {
from("timer://myTimer?period=2000")
.setBody().simple("Current time is ${header.firedTime}")
.to("stream:out");
}
});
// start the route and let it do its work
context.start();
Thread.sleep(5000);
// stop the CamelContext
context.stop();
}
@Test
public void testDockerComponentForHostnameAndPort() throws Exception {
Assume.assumeNotNull("DOCKER_HOST environment variable is not set", System.getenv("DOCKER_HOST"));
CamelContext camelctx = new DefaultCamelContext();
camelctx.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")
.toF("docker:version?host=%s&port=%d", TestUtils.getDockerHost(), TestUtils.getDockerPort());
}
});
camelctx.start();
try {
ProducerTemplate template = camelctx.createProducerTemplate();
Version dockerVersion = template.requestBody("direct:start", null, Version.class);
Assert.assertNotNull("Docker version not null", dockerVersion);
Assert.assertFalse("Docker version was empty", dockerVersion.getVersion().isEmpty());
} finally {
camelctx.close();
}
}
public void server() throws Exception {
CamelContext camel = new DefaultCamelContext();
camel.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("jetty:" + url)
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
String body = exchange.getIn().getBody(String.class);
System.out.println("Received message: " + body);
if (body != null && body.contains("Kabom")) {
throw new Exception("ILLEGAL DATA");
}
exchange.getOut().setBody("OK");
}
});
}
});
camel.start();
}
@Test
public void testElSQLConsumer() throws Exception {
CamelContext camelctx = new DefaultCamelContext(new JndiBeanRepository());
camelctx.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("elsql:dbusers:users.elsql?dataSource=java:jboss/datasources/ExampleDS")
.to("mock:end");
}
});
MockEndpoint mockEndpoint = camelctx.getEndpoint("mock:end", MockEndpoint.class);
mockEndpoint.expectedMessageCount(1);
camelctx.start();
try {
mockEndpoint.assertIsSatisfied();
List<Exchange> exchanges = mockEndpoint.getReceivedExchanges();
Assert.assertEquals("SA", exchanges.get(0).getIn().getBody(Map.class).get("NAME"));
} finally {
camelctx.close();
}
}
public static Set<String> lookupCustomizerIDs(CamelContext context) {
Set<String> customizers = new TreeSet<>();
String customizerIDs = System.getenv().getOrDefault(Constants.ENV_CAMEL_K_CUSTOMIZERS, "");
if (ObjectHelper.isEmpty(customizerIDs)) {
// TODO: getPropertiesComponent().resolveProperty() throws exception instead
// of returning abd empty optional
customizerIDs = context.getPropertiesComponent()
.loadProperties(Constants.PROPERTY_CAMEL_K_CUSTOMIZER::equals)
.getProperty(Constants.PROPERTY_CAMEL_K_CUSTOMIZER, "");
}
if (ObjectHelper.isNotEmpty(customizerIDs)) {
for (String customizerId : customizerIDs.split(",", -1)) {
customizers.add(customizerId);
}
}
return customizers;
}
public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy,
boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming,
boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork,
boolean parallelAggregate) {
notNull(camelContext, "camelContext");
this.camelContext = camelContext;
this.processors = processors;
this.aggregationStrategy = aggregationStrategy;
this.executorService = executorService;
this.shutdownExecutorService = shutdownExecutorService;
this.streaming = streaming;
this.stopOnException = stopOnException;
// must enable parallel if executor service is provided
this.parallelProcessing = parallelProcessing || executorService != null;
this.timeout = timeout;
this.onPrepare = onPrepare;
this.shareUnitOfWork = shareUnitOfWork;
this.parallelAggregate = parallelAggregate;
}
@Override
protected CamelContext createCamelContext() throws Exception {
enableJMX();
CamelContext camelContext = super.createCamelContext();
// Force hostname to be "localhost" for testing purposes
final DefaultManagementNamingStrategy naming = (DefaultManagementNamingStrategy) camelContext.getManagementStrategy().getManagementNamingStrategy();
naming.setHostName("localhost");
naming.setDomainName("org.apache.camel");
// setup the ManagementAgent to include the hostname
camelContext.getManagementStrategy().getManagementAgent().setIncludeHostName(true);
return camelContext;
}
private static Object mandatoryLoadResource(CamelContext context, String resource) {
Object instance = null;
if (resource.startsWith("classpath:")) {
try (InputStream is = ResourceHelper.resolveMandatoryResourceAsInputStream(context, resource)) {
ExtendedCamelContext extendedCamelContext = context.adapt(ExtendedCamelContext.class);
XMLRoutesDefinitionLoader loader = extendedCamelContext.getXMLRoutesDefinitionLoader();
instance = loader.loadRoutesDefinition(context, is);
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
} else if (resource.startsWith("class:")) {
Class<?> type = context.getClassResolver().resolveClass(resource.substring("class:".length()));
instance = context.getInjector().newInstance(type);
} else if (resource.startsWith("bean:")) {
instance = context.getRegistry().lookupByName(resource.substring("bean:".length()));
}
if (instance == null) {
throw new IllegalArgumentException("Unable to resolve resource: " + resource);
}
return instance;
}
@Override
public boolean configure(CamelContext camelContext, Object obj, String name, Object value, boolean ignoreCase) {
QuteEndpoint target = (QuteEndpoint) obj;
switch (ignoreCase ? name.toLowerCase() : name) {
case "allowcontextmapall":
case "allowContextMapAll": target.setAllowContextMapAll(property(camelContext, boolean.class, value)); return true;
case "allowtemplatefromheader":
case "allowTemplateFromHeader": target.setAllowTemplateFromHeader(property(camelContext, boolean.class, value)); return true;
case "basicpropertybinding":
case "basicPropertyBinding": target.setBasicPropertyBinding(property(camelContext, boolean.class, value)); return true;
case "contentcache":
case "contentCache": target.setContentCache(property(camelContext, boolean.class, value)); return true;
case "encoding": target.setEncoding(property(camelContext, java.lang.String.class, value)); return true;
case "lazystartproducer":
case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true;
case "synchronous": target.setSynchronous(property(camelContext, boolean.class, value)); return true;
default: return false;
}
}
@Override
protected CamelContext createCamelContext() throws Exception {
SimpleRegistry registry = new SimpleRegistry();
auditDataSource = EmbeddedDataSourceFactory.getDataSource("sql/schema.sql");
registry.put("auditDataSource", auditDataSource);
DataSourceTransactionManager transactionManager = new DataSourceTransactionManager(auditDataSource);
registry.put("transactionManager", transactionManager);
SpringTransactionPolicy propagationRequired = new SpringTransactionPolicy();
propagationRequired.setTransactionManager(transactionManager);
propagationRequired.setPropagationBehaviorName("PROPAGATION_REQUIRED");
registry.put("PROPAGATION_REQUIRED", propagationRequired);
auditLogDao = new AuditLogDao(auditDataSource);
TransactionTemplate transactionTemplate = new TransactionTemplate();
transactionTemplate.setTransactionManager(transactionManager);
transactionTemplate.setPropagationBehaviorName("PROPAGATION_REQUIRES_NEW");
idempotentRepository = new JdbcMessageIdRepository(auditDataSource, transactionTemplate, "ws");
CamelContext camelContext = new DefaultCamelContext(registry);
SqlComponent sqlComponent = new SqlComponent();
sqlComponent.setDataSource(auditDataSource);
camelContext.addComponent("sql", sqlComponent);
return camelContext;
}
@Override
public Optional<ProcessorDefinition<?>> configure(CamelContext context, ProcessorDefinition<?> route, Map<String, Object> parameters) {
ObjectHelper.notNull(route, "route");
ObjectHelper.notNull(poolsize, "poolsize");
ObjectHelper.notNull(maxPoolSize, "maxpoolsize");
ObjectHelper.notNull(threadName, "threadname");
return Optional.of(route.threads(poolsize, maxPoolSize, threadName));
}
@Override
public void start(Map<String, String> props) {
try {
LOG.info("Starting CamelSourceTask connector task");
Map<String, String> actualProps = TaskHelper.mergeProperties(getDefaultConfig(), props);
config = getCamelSourceConnectorConfig(actualProps);
maxBatchPollSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF);
maxPollDuration = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_POLL_DURATION_CONF);
camelMessageHeaderKey = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF);
String remoteUrl = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF);
final String unmarshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_UNMARSHAL_CONF);
topic = config.getString(CamelSourceConnectorConfig.TOPIC_CONF);
String localUrl = getLocalUrlWithPollingOptions(config);
CamelContext camelContext = new DefaultCamelContext();
if (remoteUrl == null) {
remoteUrl = TaskHelper.buildUrl(camelContext.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog(),
actualProps, config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF),
CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX, CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
}
cms = new CamelMainSupport(actualProps, remoteUrl, localUrl, null, unmarshaller, 10, 500, camelContext);
Endpoint endpoint = cms.getEndpoint(localUrl);
consumer = endpoint.createPollingConsumer();
consumer.start();
cms.start();
LOG.info("CamelSourceTask connector task started");
} catch (Exception e) {
throw new ConnectException("Failed to create and start Camel context", e);
}
}
@Before
public void before() throws Exception {
CamelContext camelctx = contextRegistry.getCamelContext("springwsContext");
resultEndpoint = camelctx.getEndpoint("mock:result", MockEndpoint.class);
inOnlyEndpoint = camelctx.getEndpoint("mock:inOnly", MockEndpoint.class);
template = camelctx.createProducerTemplate();
}
@Test
public void testClusterService() throws Exception {
final int zkPort = AvailablePortFinder.getNextAvailable();
final File zkDir = temporaryFolder.newFolder();
final TestingServer zkServer = new TestingServer(zkPort, zkDir);
zkServer.start();
try {
new ApplicationContextRunner()
.withUserConfiguration(TestConfiguration.class)
.withPropertyValues(
"debug=false",
"spring.main.banner-mode=OFF",
"spring.application.name=" + UUID.randomUUID().toString(),
"camel.component.zookeeper.cluster.service.enabled=true",
"camel.component.zookeeper.cluster.service.nodes=localhost:" + zkPort,
"camel.component.zookeeper.cluster.service.id=" + UUID.randomUUID().toString(),
"camel.component.zookeeper.cluster.service.base-path=" + SERVICE_PATH)
.run(
context -> {
assertThat(context).hasSingleBean(CamelContext.class);
assertThat(context).hasSingleBean(CamelClusterService.class);
final CamelContext camelContext = context.getBean(CamelContext.class);
final CamelClusterService clusterService = camelContext.hasService(CamelClusterService.class);
assertThat(clusterService).isNotNull();
assertThat(clusterService).isInstanceOf(ZooKeeperClusterService.class);
}
);
} finally {
zkServer.stop();
}
}
@Test
public void testReceiveSingleMessage() throws Exception {
mllpClient.setMllpHost("localhost");
mllpClient.setMllpPort(AvailablePortFinder.getNextAvailable());
CamelContext camelctx = new DefaultCamelContext();
camelctx.addRoutes(new RouteBuilder() {
int connectTimeout = 500;
int responseTimeout = 5000;
@Override
public void configure() throws Exception {
String routeId = "mllp-test-receiver-route";
fromF("mllp://%s:%d?autoAck=true&connectTimeout=%d&receiveTimeout=%d&reuseAddress=true",
mllpClient.getMllpHost(), mllpClient.getMllpPort(), connectTimeout, responseTimeout)
.routeId(routeId)
.log(LoggingLevel.INFO, routeId, "Test route received message")
.to("mock:result");
}
});
MockEndpoint mock = camelctx.getEndpoint("mock:result", MockEndpoint.class);
mock.expectedMessageCount(1);
camelctx.start();
try {
mllpClient.connect();
mllpClient.sendMessageAndWaitForAcknowledgement(Hl7MessageGenerator.generateMessage(), 10000);
MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, mock);
} finally {
mllpClient.disconnect();
camelctx.close();
}
}
@Test
public void testDefaultLogger() {
CamelContext context = new DefaultCamelContext();
Exchange exchange = new DefaultExchange(context);
exchange.getIn().setBody(body);
Assert.assertEquals(logResult, new BodyLogger.Default().log(exchange));
}
@Before
public void setupApplicationContext() {
applicationContext = new AnnotationConfigApplicationContext(CamelAutoConfiguration.class);
camelContext = applicationContext.getBean(CamelContext.class);
consumerTemplate = applicationContext.getBean(ConsumerTemplate.class);
producerTemplate = applicationContext.getBean(ProducerTemplate.class);
fluentProducerTemplate = applicationContext.getBean(FluentProducerTemplate.class);
}
@Override
public List<RoutesBuilder> collectRoutesFromRegistry(
CamelContext camelContext,
String excludePattern,
String includePattern) {
return Collections.emptyList();
}
@Override
protected CamelContext createCamelContext() {
applicationContext = new AnnotationConfigApplicationContext(MockedKuduConfiguration.class);
final CamelContext ctx = new SpringCamelContext(applicationContext);
PropertiesComponent pc = new PropertiesComponent();
pc.addLocation("classpath:test-options.properties");
ctx.setPropertiesComponent(pc);
return ctx;
}
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
camelContext.addComponent("jms", jmsComponentClientAcknowledge(connectionFactory));
return camelContext;
}