下面列出了io.fabric8.kubernetes.api.model.apps.StatefulSetSpecBuilder#io.fabric8.kubernetes.api.model.apps.StatefulSetSpec 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static boolean isStatefulSetReady(StatefulSet ss) {
Utils.checkNotNull(ss, "StatefulSet can't be null.");
StatefulSetSpec spec = ss.getSpec();
StatefulSetStatus status = ss.getStatus();
if (status == null || status.getReplicas() == null || status.getReadyReplicas() == null) {
return false;
}
//Can be true in testing, so handle it to make test writing easier.
if (spec == null || spec.getReplicas() == null) {
return false;
}
return spec.getReplicas().intValue() == status.getReplicas()
&& spec.getReplicas().intValue() == status.getReadyReplicas();
}
@Test
public void testStatefulSetReadinessNotEnoughReadyReplicas() {
StatefulSetStatus status = new StatefulSetStatus();
status.setReadyReplicas(1);
status.setReplicas(2);
StatefulSetSpec spec = new StatefulSetSpec();
spec.setReplicas(2);
StatefulSet statefulSet = new StatefulSet();
statefulSet.setStatus(status);
statefulSet.setSpec(spec);
assertFalse(Readiness.isReady(statefulSet));
assertFalse(Readiness.isStatefulSetReady(statefulSet));
}
@Test
public void testStatefulSetReadiness() {
StatefulSetStatus status = new StatefulSetStatus();
status.setReadyReplicas(2);
status.setReplicas(2);
StatefulSetSpec spec = new StatefulSetSpec();
spec.setReplicas(2);
StatefulSet statefulSet = new StatefulSet();
statefulSet.setStatus(status);
statefulSet.setSpec(spec);
assertTrue(Readiness.isReady(statefulSet));
assertTrue(Readiness.isStatefulSetReady(statefulSet));
}
private StatefulSetSpec createStatefulSetSpec(ResourceConfig config, List<ImageConfiguration> images) {
return new StatefulSetSpecBuilder()
.withReplicas(config.getReplicas())
.withServiceName(config.getControllerName())
.withTemplate(podTemplateHandler.getPodTemplate(config,images))
.build();
}
@Test
public void testStatefulSetReadinessNoStatus() {
StatefulSetSpec spec = new StatefulSetSpec();
spec.setReplicas(1);
StatefulSet statefulSet = new StatefulSet();
statefulSet.setSpec(spec);
assertFalse(Readiness.isReady(statefulSet));
assertFalse(Readiness.isStatefulSetReady(statefulSet));
}
private void mergeStatefulSetSpec(StatefulSetBuilder builder, StatefulSetSpec spec) {
StatefulSetFluent.SpecNested<StatefulSetBuilder> specBuilder = builder.editSpec();
KubernetesResourceUtil.mergeSimpleFields(specBuilder, spec);
specBuilder.endSpec();
}
@Test
public void testScaleStatefulSet() {
log.info("Testing {}...", "ScaleStatefulSet");
KubernetesDeployerProperties deployProperties = new KubernetesDeployerProperties();
ContainerFactory containerFactory = new DefaultContainerFactory(deployProperties);
KubernetesAppDeployer appDeployer = new KubernetesAppDeployer(deployProperties, kubernetesClient,
containerFactory);
AppDefinition definition = new AppDefinition(randomName(), null);
Resource resource = testApplication();
Map<String, String> props = new HashMap<>();
props.put(KubernetesAppDeployer.COUNT_PROPERTY_KEY, "3");
props.put(KubernetesAppDeployer.INDEXED_PROPERTY_KEY, "true");
AppDeploymentRequest request = new AppDeploymentRequest(definition, resource, props);
log.info("Deploying {}...", request.getDefinition().getName());
Timeout timeout = deploymentTimeout();
String deploymentId = appDeployer.deploy(request);
assertThat(deploymentId, eventually(hasStatusThat(
Matchers.hasProperty("state", is(deployed))), timeout.maxAttempts, timeout.pause));
assertThat(deploymentId, eventually(appInstanceCount(is(3))));
// Ensure that a StatefulSet is deployed
Map<String, String> selector = Collections.singletonMap(SPRING_APP_KEY, deploymentId);
List<StatefulSet> statefulSets = kubernetesClient.apps().statefulSets().withLabels(selector).list().getItems();
assertNotNull(statefulSets);
assertEquals(1, statefulSets.size());
StatefulSet statefulSet = statefulSets.get(0);
StatefulSetSpec statefulSetSpec = statefulSet.getSpec();
Assertions.assertThat(statefulSetSpec.getPodManagementPolicy()).isEqualTo("Parallel");
Assertions.assertThat(statefulSetSpec.getReplicas()).isEqualTo(3);
Assertions.assertThat(statefulSetSpec.getServiceName()).isEqualTo(deploymentId);
Assertions.assertThat(statefulSet.getMetadata().getName()).isEqualTo(deploymentId);
log.info("Scale Down {}...", request.getDefinition().getName());
appDeployer.scale(new AppScaleRequest(deploymentId, 1));
assertThat(deploymentId, eventually(appInstanceCount(is(1)), timeout.maxAttempts, timeout.pause));
statefulSets = kubernetesClient.apps().statefulSets().withLabels(selector).list().getItems();
assertEquals(1, statefulSets.size());
statefulSetSpec = statefulSets.get(0).getSpec();
Assertions.assertThat(statefulSetSpec.getReplicas()).isEqualTo(1);
Assertions.assertThat(statefulSetSpec.getServiceName()).isEqualTo(deploymentId);
Assertions.assertThat(statefulSet.getMetadata().getName()).isEqualTo(deploymentId);
appDeployer.undeploy(deploymentId);
}
@Test
public void testCreateStatefulSet() throws Exception {
Map<String, String> props = new HashMap<>();
props.put(KubernetesAppDeployer.COUNT_PROPERTY_KEY, "3");
props.put(KubernetesAppDeployer.INDEXED_PROPERTY_KEY, "true");
AppDefinition definition = new AppDefinition(randomName(), null);
AppDeploymentRequest appDeploymentRequest = new AppDeploymentRequest(definition, testApplication(), props);
KubernetesAppDeployer deployer = new KubernetesAppDeployer(new KubernetesDeployerProperties(),
kubernetesClient);
log.info("Deploying {}...", appDeploymentRequest.getDefinition().getName());
String deploymentId = deployer.deploy(appDeploymentRequest);
Map<String, String> idMap = deployer.createIdMap(deploymentId, appDeploymentRequest);
Timeout timeout = deploymentTimeout();
assertThat(deploymentId, eventually(hasStatusThat(
Matchers.hasProperty("state", is(deployed))), timeout.maxAttempts, timeout.pause));
Map<String, String> selector = Collections.singletonMap(SPRING_APP_KEY, deploymentId);
List<StatefulSet> statefulSets = kubernetesClient.apps().statefulSets().withLabels(selector).list().getItems();
assertNotNull(statefulSets);
assertEquals(1, statefulSets.size());
StatefulSet statefulSet = statefulSets.get(0);
StatefulSetSpec statefulSetSpec = statefulSet.getSpec();
List<Container> statefulSetInitContainers = statefulSetSpec.getTemplate().getSpec().getInitContainers();
assertEquals(1, statefulSetInitContainers.size());
Container statefulSetInitContainer = statefulSetInitContainers.get(0);
assertEquals(DeploymentPropertiesResolver.STATEFUL_SET_IMAGE_NAME, statefulSetInitContainer.getImage());
Assertions.assertThat(statefulSetSpec.getPodManagementPolicy()).isEqualTo("Parallel");
Assertions.assertThat(statefulSetSpec.getReplicas()).isEqualTo(3);
Assertions.assertThat(statefulSetSpec.getServiceName()).isEqualTo(deploymentId);
Assertions.assertThat(statefulSet.getMetadata().getName()).isEqualTo(deploymentId);
Assertions.assertThat(statefulSetSpec.getSelector().getMatchLabels())
.containsAllEntriesOf(deployer.createIdMap(deploymentId, appDeploymentRequest));
Assertions.assertThat(statefulSetSpec.getSelector().getMatchLabels())
.contains(entry(KubernetesAppDeployer.SPRING_MARKER_KEY, KubernetesAppDeployer.SPRING_MARKER_VALUE));
Assertions.assertThat(statefulSetSpec.getTemplate().getMetadata().getLabels()).containsAllEntriesOf(idMap);
Assertions.assertThat(statefulSetSpec.getTemplate().getMetadata().getLabels())
.contains(entry(KubernetesAppDeployer.SPRING_MARKER_KEY, KubernetesAppDeployer.SPRING_MARKER_VALUE));
Container container = statefulSetSpec.getTemplate().getSpec().getContainers().get(0);
Assertions.assertThat(container.getName()).isEqualTo(deploymentId);
Assertions.assertThat(container.getPorts().get(0).getContainerPort()).isEqualTo(8080);
Assertions.assertThat(container.getImage()).isEqualTo(testApplication().getURI().getSchemeSpecificPart());
PersistentVolumeClaim pvc = statefulSetSpec.getVolumeClaimTemplates().get(0);
Assertions.assertThat(pvc.getMetadata().getName()).isEqualTo(deploymentId);
PersistentVolumeClaimSpec pvcSpec = pvc.getSpec();
Assertions.assertThat(pvcSpec.getAccessModes()).containsOnly("ReadWriteOnce");
Assertions.assertThat(pvcSpec.getStorageClassName()).isNull();
Assertions.assertThat(pvcSpec.getResources().getLimits().get("storage").getAmount()).isEqualTo("10Mi");
Assertions.assertThat(pvcSpec.getResources().getRequests().get("storage").getAmount()).isEqualTo("10Mi");
log.info("Undeploying {}...", deploymentId);
timeout = undeploymentTimeout();
appDeployer.undeploy(deploymentId);
assertThat(deploymentId, eventually(hasStatusThat(
Matchers.hasProperty("state", is(unknown))), timeout.maxAttempts, timeout.pause));
}
@Test
public void testCreateStatefulSetInitContainerImageNamePropOverride() throws Exception {
Map<String, String> props = new HashMap<>();
props.put(KubernetesAppDeployer.COUNT_PROPERTY_KEY, "3");
props.put(KubernetesAppDeployer.INDEXED_PROPERTY_KEY, "true");
String imageName = testApplication().getURI().getSchemeSpecificPart();
props.put("spring.cloud.deployer.kubernetes.statefulSetInitContainerImageName", imageName);
AppDefinition definition = new AppDefinition(randomName(), null);
AppDeploymentRequest appDeploymentRequest = new AppDeploymentRequest(definition, testApplication(), props);
KubernetesAppDeployer deployer = new KubernetesAppDeployer(new KubernetesDeployerProperties(),
kubernetesClient);
log.info("Deploying {}...", appDeploymentRequest.getDefinition().getName());
String deploymentId = deployer.deploy(appDeploymentRequest);
Timeout timeout = deploymentTimeout();
assertThat(deploymentId, eventually(hasStatusThat(
Matchers.hasProperty("state", is(deployed))), timeout.maxAttempts, timeout.pause));
Map<String, String> selector = Collections.singletonMap(SPRING_APP_KEY, deploymentId);
List<StatefulSet> statefulSets = kubernetesClient.apps().statefulSets().withLabels(selector).list().getItems();
assertNotNull(statefulSets);
assertEquals(1, statefulSets.size());
StatefulSet statefulSet = statefulSets.get(0);
StatefulSetSpec statefulSetSpec = statefulSet.getSpec();
List<Container> statefulSetInitContainers = statefulSetSpec.getTemplate().getSpec().getInitContainers();
assertEquals(1, statefulSetInitContainers.size());
Container statefulSetInitContainer = statefulSetInitContainers.get(0);
assertEquals(imageName, statefulSetInitContainer.getImage());
log.info("Undeploying {}...", deploymentId);
timeout = undeploymentTimeout();
appDeployer.undeploy(deploymentId);
assertThat(deploymentId, eventually(hasStatusThat(
Matchers.hasProperty("state", is(unknown))), timeout.maxAttempts, timeout.pause));
}
@Test
public void createStatefulSetInitContainerImageNameGlobalOverride() throws Exception {
Map<String, String> props = new HashMap<>();
props.put(KubernetesAppDeployer.COUNT_PROPERTY_KEY, "3");
props.put(KubernetesAppDeployer.INDEXED_PROPERTY_KEY, "true");
AppDefinition definition = new AppDefinition(randomName(), null);
AppDeploymentRequest appDeploymentRequest = new AppDeploymentRequest(definition, testApplication(), props);
String imageName = testApplication().getURI().getSchemeSpecificPart();
KubernetesDeployerProperties kubernetesDeployerProperties = new KubernetesDeployerProperties();
kubernetesDeployerProperties.setStatefulSetInitContainerImageName(imageName);
KubernetesAppDeployer deployer = new KubernetesAppDeployer(kubernetesDeployerProperties, kubernetesClient);
log.info("Deploying {}...", appDeploymentRequest.getDefinition().getName());
String deploymentId = deployer.deploy(appDeploymentRequest);
Timeout timeout = deploymentTimeout();
assertThat(deploymentId, eventually(hasStatusThat(
Matchers.hasProperty("state", is(deployed))), timeout.maxAttempts, timeout.pause));
Map<String, String> selector = Collections.singletonMap(SPRING_APP_KEY, deploymentId);
List<StatefulSet> statefulSets = kubernetesClient.apps().statefulSets().withLabels(selector).list().getItems();
assertNotNull(statefulSets);
assertEquals(1, statefulSets.size());
StatefulSet statefulSet = statefulSets.get(0);
StatefulSetSpec statefulSetSpec = statefulSet.getSpec();
List<Container> statefulSetInitContainers = statefulSetSpec.getTemplate().getSpec().getInitContainers();
assertEquals(1, statefulSetInitContainers.size());
Container statefulSetInitContainer = statefulSetInitContainers.get(0);
assertEquals(imageName, statefulSetInitContainer.getImage());
log.info("Undeploying {}...", deploymentId);
timeout = undeploymentTimeout();
appDeployer.undeploy(deploymentId);
assertThat(deploymentId, eventually(hasStatusThat(
Matchers.hasProperty("state", is(unknown))), timeout.maxAttempts, timeout.pause));
}
@Test
public void createStatefulSetWithOverridingRequest() throws Exception {
Map<String, String> props = new HashMap<>();
props.put(KubernetesAppDeployer.COUNT_PROPERTY_KEY, "3");
props.put(KubernetesAppDeployer.INDEXED_PROPERTY_KEY, "true");
props.put("spring.cloud.deployer.kubernetes.statefulSet.volumeClaimTemplate.storage", "1g");
AppDefinition definition = new AppDefinition(randomName(), null);
AppDeploymentRequest appDeploymentRequest = new AppDeploymentRequest(definition, testApplication(), props);
KubernetesAppDeployer deployer = new KubernetesAppDeployer(new KubernetesDeployerProperties(),
kubernetesClient);
log.info("Deploying {}...", appDeploymentRequest.getDefinition().getName());
String deploymentId = deployer.deploy(appDeploymentRequest);
Map<String, String> idMap = deployer.createIdMap(deploymentId, appDeploymentRequest);
Timeout timeout = deploymentTimeout();
assertThat(deploymentId, eventually(hasStatusThat(
Matchers.hasProperty("state", is(deployed))), timeout.maxAttempts, timeout.pause));
Map<String, String> selector = Collections.singletonMap(SPRING_APP_KEY, deploymentId);
StatefulSet statefulSet = kubernetesClient.apps().statefulSets().withLabels(selector).list().getItems().get(0);
StatefulSetSpec statefulSetSpec = statefulSet.getSpec();
Assertions.assertThat(statefulSetSpec.getPodManagementPolicy()).isEqualTo("Parallel");
Assertions.assertThat(statefulSetSpec.getReplicas()).isEqualTo(3);
Assertions.assertThat(statefulSetSpec.getServiceName()).isEqualTo(deploymentId);
Assertions.assertThat(statefulSet.getMetadata().getName()).isEqualTo(deploymentId);
Assertions.assertThat(statefulSetSpec.getSelector().getMatchLabels())
.containsAllEntriesOf(deployer.createIdMap(deploymentId, appDeploymentRequest));
Assertions.assertThat(statefulSetSpec.getSelector().getMatchLabels())
.contains(entry(KubernetesAppDeployer.SPRING_MARKER_KEY, KubernetesAppDeployer.SPRING_MARKER_VALUE));
Assertions.assertThat(statefulSetSpec.getTemplate().getMetadata().getLabels()).containsAllEntriesOf(idMap);
Assertions.assertThat(statefulSetSpec.getTemplate().getMetadata().getLabels())
.contains(entry(KubernetesAppDeployer.SPRING_MARKER_KEY, KubernetesAppDeployer.SPRING_MARKER_VALUE));
Container container = statefulSetSpec.getTemplate().getSpec().getContainers().get(0);
Assertions.assertThat(container.getName()).isEqualTo(deploymentId);
Assertions.assertThat(container.getPorts().get(0).getContainerPort()).isEqualTo(8080);
Assertions.assertThat(container.getImage()).isEqualTo(testApplication().getURI().getSchemeSpecificPart());
PersistentVolumeClaim pvc = statefulSetSpec.getVolumeClaimTemplates().get(0);
Assertions.assertThat(pvc.getMetadata().getName()).isEqualTo(deploymentId);
PersistentVolumeClaimSpec pvcSpec = pvc.getSpec();
Assertions.assertThat(pvcSpec.getAccessModes()).containsOnly("ReadWriteOnce");
Assertions.assertThat(pvcSpec.getResources().getLimits().get("storage").getAmount()).isEqualTo("1Gi");
Assertions.assertThat(pvcSpec.getResources().getRequests().get("storage").getAmount()).isEqualTo("1Gi");
log.info("Undeploying {}...", deploymentId);
timeout = undeploymentTimeout();
appDeployer.undeploy(deploymentId);
assertThat(deploymentId, eventually(hasStatusThat(
Matchers.hasProperty("state", is(unknown))), timeout.maxAttempts, timeout.pause));
}
@Test
public void testDeploymentLabelsStatefulSet() {
log.info("Testing {}...", "DeploymentLabelsForStatefulSet");
Map<String, String> props = new HashMap<>();
props.put(KubernetesAppDeployer.COUNT_PROPERTY_KEY, "2");
props.put(KubernetesAppDeployer.INDEXED_PROPERTY_KEY, "true");
props.put("spring.cloud.deployer.kubernetes.deploymentLabels",
"stateful-label1:stateful-value1,stateful-label2:stateful-value2");
AppDefinition definition = new AppDefinition(randomName(), null);
AppDeploymentRequest appDeploymentRequest = new AppDeploymentRequest(definition, testApplication(), props);
KubernetesAppDeployer deployer = new KubernetesAppDeployer(new KubernetesDeployerProperties(),
kubernetesClient);
log.info("Deploying {}...", appDeploymentRequest.getDefinition().getName());
String deploymentId = deployer.deploy(appDeploymentRequest);
Timeout timeout = deploymentTimeout();
assertThat(deploymentId, eventually(hasStatusThat(
Matchers.hasProperty("state", is(deployed))), timeout.maxAttempts, timeout.pause));
Map<String, String> idMap = deployer.createIdMap(deploymentId, appDeploymentRequest);
Map<String, String> selector = Collections.singletonMap(SPRING_APP_KEY, deploymentId);
StatefulSet statefulSet = kubernetesClient.apps().statefulSets().withLabels(selector).list().getItems().get(0);
StatefulSetSpec statefulSetSpec = statefulSet.getSpec();
Assertions.assertThat(statefulSetSpec.getReplicas()).isEqualTo(2);
Assertions.assertThat(statefulSetSpec.getTemplate().getMetadata().getLabels()).containsAllEntriesOf(idMap);
//verify stateful set match labels
Map<String, String> setLabels = statefulSet.getMetadata().getLabels();
assertTrue("Label 'stateful-label1' not found in StatefulSet metadata", setLabels.containsKey("stateful-label1"));
assertEquals("Unexpected value in stateful-set metadata label for stateful-label1", "stateful-value1", setLabels.get("stateful-label1"));
assertTrue("Label 'stateful-label2' not found in StatefulSet metadata", setLabels.containsKey("stateful-label2"));
assertEquals("Unexpected value in stateful-set metadata label for stateful-label2","stateful-value2", setLabels.get("stateful-label2"));
//verify pod template labels
Map<String, String> specLabels = statefulSetSpec.getTemplate().getMetadata().getLabels();
assertTrue("Label 'stateful-label1' not found in template metadata", specLabels.containsKey("stateful-label1"));
assertEquals("Unexpected value for statefulSet metadata stateful-label1", "stateful-value1", specLabels.get("stateful-label1"));
assertTrue("Label 'stateful-label2' not found in statefulSet template", specLabels.containsKey("stateful-label2"));
assertEquals("Unexpected value for statefulSet metadata stateful-label2", "stateful-value2", specLabels.get("stateful-label2"));
//verify that labels got replicated to one of the deployments
List<Pod> pods = kubernetesClient.pods().withLabels(selector).list().getItems();
Map<String, String> podLabels = pods.get(0).getMetadata().getLabels();
assertTrue("Label 'stateful-label1' not found in podLabels", podLabels.containsKey("stateful-label1"));
assertEquals("Unexpected value for podLabels stateful-label1", "stateful-value1", podLabels.get("stateful-label1"));
assertTrue("Label 'stateful-label2' not found in podLabels", podLabels.containsKey("stateful-label2"));
assertEquals("Unexpected value for podLabels stateful-label2", "stateful-value2", podLabels.get("stateful-label2"));
log.info("Undeploying {}...", deploymentId);
timeout = undeploymentTimeout();
appDeployer.undeploy(deploymentId);
assertThat(deploymentId, eventually(hasStatusThat(
Matchers.hasProperty("state", is(unknown))), timeout.maxAttempts, timeout.pause));
}
@ParameterizedTest
@MethodSource("data")
public void testReconcileUpdatesKafkaStorageType(Params params, VertxTestContext context) {
init(params);
AtomicReference<List<PersistentVolumeClaim>> originalPVCs = new AtomicReference<>();
AtomicReference<List<Volume>> originalVolumes = new AtomicReference<>();
AtomicReference<List<Container>> originalInitContainers = new AtomicReference<>();
Checkpoint async = context.checkpoint();
initialReconcile(context)
.onComplete(context.succeeding(v -> context.verify(() -> {
originalPVCs.set(Optional.ofNullable(client.apps().statefulSets().inNamespace(NAMESPACE).withName(KafkaCluster.kafkaClusterName(CLUSTER_NAME)).get())
.map(StatefulSet::getSpec)
.map(StatefulSetSpec::getVolumeClaimTemplates)
.orElse(new ArrayList<>()));
originalVolumes.set(Optional.ofNullable(client.apps().statefulSets().inNamespace(NAMESPACE).withName(KafkaCluster.kafkaClusterName(CLUSTER_NAME)).get())
.map(StatefulSet::getSpec)
.map(StatefulSetSpec::getTemplate)
.map(PodTemplateSpec::getSpec)
.map(PodSpec::getVolumes)
.orElse(new ArrayList<>()));
originalInitContainers.set(Optional.ofNullable(client.apps().statefulSets().inNamespace(NAMESPACE).withName(KafkaCluster.kafkaClusterName(CLUSTER_NAME)).get())
.map(StatefulSet::getSpec)
.map(StatefulSetSpec::getTemplate)
.map(PodTemplateSpec::getSpec)
.map(PodSpec::getInitContainers)
.orElse(new ArrayList<>()));
// Update the storage type
// ephemeral -> persistent
// or
// persistent -> ephemeral
Kafka changedClusterCm = null;
if (kafkaStorage instanceof EphemeralStorage) {
changedClusterCm = new KafkaBuilder(cluster)
.editSpec()
.editKafka()
.withNewPersistentClaimStorage()
.withSize("123")
.endPersistentClaimStorage()
.endKafka()
.endSpec()
.build();
} else if (kafkaStorage instanceof PersistentClaimStorage) {
changedClusterCm = new KafkaBuilder(cluster)
.editSpec()
.editKafka()
.withNewEphemeralStorage()
.endEphemeralStorage()
.endKafka()
.endSpec()
.build();
} else {
context.failNow(new Exception("If storage is not ephemeral or persistent something has gone wrong"));
}
kafkaAssembly(NAMESPACE, CLUSTER_NAME).patch(changedClusterCm);
LOGGER.info("Updating with changed storage type");
})))
.compose(v -> operator.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME)))
.onComplete(context.succeeding(v -> context.verify(() -> {
// Check the Volumes and PVCs were not changed
assertPVCs(context, KafkaCluster.kafkaClusterName(CLUSTER_NAME), originalPVCs.get());
assertVolumes(context, KafkaCluster.kafkaClusterName(CLUSTER_NAME), originalVolumes.get());
assertInitContainers(context, KafkaCluster.kafkaClusterName(CLUSTER_NAME), originalInitContainers.get());
async.flag();
})));
}
/**
* Create a StatefulSet
*
* @param request the {@link AppDeploymentRequest}
*/
protected void createStatefulSet(AppDeploymentRequest request) {
String appId = createDeploymentId(request);
int externalPort = getExternalPort(request);
Map<String, String> idMap = createIdMap(appId, request);
int replicas = getCountFromRequest(request);
Map<String, String> kubernetesDeployerProperties = request.getDeploymentProperties();
logger.debug(String.format("Creating StatefulSet: %s on %d with %d replicas", appId, externalPort, replicas));
Map<String, Quantity> storageResource = Collections.singletonMap("storage",
new Quantity(this.deploymentPropertiesResolver.getStatefulSetStorage(kubernetesDeployerProperties)));
String storageClassName = this.deploymentPropertiesResolver.getStatefulSetStorageClassName(kubernetesDeployerProperties);
PersistentVolumeClaimBuilder persistentVolumeClaimBuilder = new PersistentVolumeClaimBuilder().withNewSpec().
withStorageClassName(storageClassName).withAccessModes(Collections.singletonList("ReadWriteOnce"))
.withNewResources().addToLimits(storageResource).addToRequests(storageResource).endResources()
.endSpec().withNewMetadata().withName(appId).withLabels(idMap)
.addToLabels(SPRING_MARKER_KEY, SPRING_MARKER_VALUE).endMetadata();
PodSpec podSpec = createPodSpec(request);
podSpec.getVolumes().add(new VolumeBuilder().withName("config").withNewEmptyDir().endEmptyDir().build());
podSpec.getContainers().get(0).getVolumeMounts()
.add(new VolumeMountBuilder().withName("config").withMountPath("/config").build());
String statefulSetInitContainerImageName = this.deploymentPropertiesResolver.getStatefulSetInitContainerImageName(kubernetesDeployerProperties);
podSpec.getInitContainers().add(createStatefulSetInitContainer(statefulSetInitContainerImageName));
Map<String, String> deploymentLabels= this.deploymentPropertiesResolver.getDeploymentLabels(request.getDeploymentProperties());
StatefulSetSpec spec = new StatefulSetSpecBuilder().withNewSelector().addToMatchLabels(idMap)
.addToMatchLabels(SPRING_MARKER_KEY, SPRING_MARKER_VALUE).endSelector()
.withVolumeClaimTemplates(persistentVolumeClaimBuilder.build()).withServiceName(appId)
.withPodManagementPolicy("Parallel").withReplicas(replicas).withNewTemplate().withNewMetadata()
.withLabels(idMap).addToLabels(SPRING_MARKER_KEY, SPRING_MARKER_VALUE).addToLabels(deploymentLabels)
.endMetadata().withSpec(podSpec).endTemplate().build();
StatefulSet statefulSet = new StatefulSetBuilder().withNewMetadata().withName(appId).withLabels(idMap)
.addToLabels(SPRING_MARKER_KEY, SPRING_MARKER_VALUE).addToLabels(deploymentLabels).endMetadata().withSpec(spec).build();
client.apps().statefulSets().create(statefulSet);
}