io.fabric8.kubernetes.api.model.apps.StatefulSetSpecBuilder#io.fabric8.kubernetes.api.model.apps.StatefulSetSpec源码实例Demo

下面列出了io.fabric8.kubernetes.api.model.apps.StatefulSetSpecBuilder#io.fabric8.kubernetes.api.model.apps.StatefulSetSpec 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: kubernetes-client   文件: Readiness.java
public static boolean isStatefulSetReady(StatefulSet ss) {
  Utils.checkNotNull(ss, "StatefulSet can't be null.");
  StatefulSetSpec spec = ss.getSpec();
  StatefulSetStatus status = ss.getStatus();

  if (status == null || status.getReplicas() == null || status.getReadyReplicas() == null) {
    return false;
  }

  //Can be true in testing, so handle it to make test writing easier.
  if (spec == null || spec.getReplicas() == null) {
    return false;
  }

  return spec.getReplicas().intValue() == status.getReplicas()
    && spec.getReplicas().intValue() == status.getReadyReplicas();
}
 
源代码2 项目: kubernetes-client   文件: ReadinessTest.java
@Test
public void testStatefulSetReadinessNotEnoughReadyReplicas() {
  StatefulSetStatus status = new StatefulSetStatus();
  status.setReadyReplicas(1);
  status.setReplicas(2);

  StatefulSetSpec spec = new StatefulSetSpec();
  spec.setReplicas(2);

  StatefulSet statefulSet = new StatefulSet();
  statefulSet.setStatus(status);
  statefulSet.setSpec(spec);

  assertFalse(Readiness.isReady(statefulSet));
  assertFalse(Readiness.isStatefulSetReady(statefulSet));
}
 
源代码3 项目: kubernetes-client   文件: ReadinessTest.java
@Test
public void testStatefulSetReadiness() {
  StatefulSetStatus status = new StatefulSetStatus();
  status.setReadyReplicas(2);
  status.setReplicas(2);

  StatefulSetSpec spec = new StatefulSetSpec();
  spec.setReplicas(2);

  StatefulSet statefulSet = new StatefulSet();
  statefulSet.setStatus(status);
  statefulSet.setSpec(spec);

  assertTrue(Readiness.isReady(statefulSet));
  assertTrue(Readiness.isStatefulSetReady(statefulSet));
}
 
源代码4 项目: jkube   文件: StatefulSetHandler.java
private StatefulSetSpec createStatefulSetSpec(ResourceConfig config, List<ImageConfiguration> images) {
    return new StatefulSetSpecBuilder()
            .withReplicas(config.getReplicas())
            .withServiceName(config.getControllerName())
            .withTemplate(podTemplateHandler.getPodTemplate(config,images))
            .build();
}
 
源代码5 项目: kubernetes-client   文件: ReadinessTest.java
@Test
public void testStatefulSetReadinessNoStatus() {
  StatefulSetSpec spec = new StatefulSetSpec();
  spec.setReplicas(1);

  StatefulSet statefulSet = new StatefulSet();
  statefulSet.setSpec(spec);

  assertFalse(Readiness.isReady(statefulSet));
  assertFalse(Readiness.isStatefulSetReady(statefulSet));

}
 
private void mergeStatefulSetSpec(StatefulSetBuilder builder, StatefulSetSpec spec) {
    StatefulSetFluent.SpecNested<StatefulSetBuilder> specBuilder = builder.editSpec();
    KubernetesResourceUtil.mergeSimpleFields(specBuilder, spec);
    specBuilder.endSpec();
}
 
@Test
public void testScaleStatefulSet() {
	log.info("Testing {}...", "ScaleStatefulSet");
	KubernetesDeployerProperties deployProperties = new KubernetesDeployerProperties();

	ContainerFactory containerFactory = new DefaultContainerFactory(deployProperties);
	KubernetesAppDeployer appDeployer = new KubernetesAppDeployer(deployProperties, kubernetesClient,
			containerFactory);

	AppDefinition definition = new AppDefinition(randomName(), null);
	Resource resource = testApplication();

	Map<String, String> props = new HashMap<>();
	props.put(KubernetesAppDeployer.COUNT_PROPERTY_KEY, "3");
	props.put(KubernetesAppDeployer.INDEXED_PROPERTY_KEY, "true");

	AppDeploymentRequest request = new AppDeploymentRequest(definition, resource, props);

	log.info("Deploying {}...", request.getDefinition().getName());
	Timeout timeout = deploymentTimeout();
	String deploymentId = appDeployer.deploy(request);
	assertThat(deploymentId, eventually(hasStatusThat(
			Matchers.hasProperty("state", is(deployed))), timeout.maxAttempts, timeout.pause));
	assertThat(deploymentId, eventually(appInstanceCount(is(3))));

	// Ensure that a StatefulSet is deployed
	Map<String, String> selector = Collections.singletonMap(SPRING_APP_KEY, deploymentId);
	List<StatefulSet> statefulSets = kubernetesClient.apps().statefulSets().withLabels(selector).list().getItems();
	assertNotNull(statefulSets);
	assertEquals(1, statefulSets.size());
	StatefulSet statefulSet = statefulSets.get(0);
	StatefulSetSpec statefulSetSpec = statefulSet.getSpec();
	Assertions.assertThat(statefulSetSpec.getPodManagementPolicy()).isEqualTo("Parallel");
	Assertions.assertThat(statefulSetSpec.getReplicas()).isEqualTo(3);
	Assertions.assertThat(statefulSetSpec.getServiceName()).isEqualTo(deploymentId);
	Assertions.assertThat(statefulSet.getMetadata().getName()).isEqualTo(deploymentId);

	log.info("Scale Down {}...", request.getDefinition().getName());
	appDeployer.scale(new AppScaleRequest(deploymentId, 1));
	assertThat(deploymentId, eventually(appInstanceCount(is(1)), timeout.maxAttempts, timeout.pause));

	statefulSets = kubernetesClient.apps().statefulSets().withLabels(selector).list().getItems();
	assertEquals(1, statefulSets.size());
	statefulSetSpec = statefulSets.get(0).getSpec();
	Assertions.assertThat(statefulSetSpec.getReplicas()).isEqualTo(1);
	Assertions.assertThat(statefulSetSpec.getServiceName()).isEqualTo(deploymentId);
	Assertions.assertThat(statefulSet.getMetadata().getName()).isEqualTo(deploymentId);

	appDeployer.undeploy(deploymentId);
}
 
@Test
public void testCreateStatefulSet() throws Exception {
	Map<String, String> props = new HashMap<>();
	props.put(KubernetesAppDeployer.COUNT_PROPERTY_KEY, "3");
	props.put(KubernetesAppDeployer.INDEXED_PROPERTY_KEY, "true");

	AppDefinition definition = new AppDefinition(randomName(), null);
	AppDeploymentRequest appDeploymentRequest = new AppDeploymentRequest(definition, testApplication(), props);

	KubernetesAppDeployer deployer = new KubernetesAppDeployer(new KubernetesDeployerProperties(),
			kubernetesClient);

	log.info("Deploying {}...", appDeploymentRequest.getDefinition().getName());
	String deploymentId = deployer.deploy(appDeploymentRequest);
	Map<String, String> idMap = deployer.createIdMap(deploymentId, appDeploymentRequest);

	Timeout timeout = deploymentTimeout();
	assertThat(deploymentId, eventually(hasStatusThat(
			Matchers.hasProperty("state", is(deployed))), timeout.maxAttempts, timeout.pause));

	Map<String, String> selector = Collections.singletonMap(SPRING_APP_KEY, deploymentId);

	List<StatefulSet> statefulSets = kubernetesClient.apps().statefulSets().withLabels(selector).list().getItems();

	assertNotNull(statefulSets);
	assertEquals(1, statefulSets.size());

	StatefulSet statefulSet = statefulSets.get(0);

	StatefulSetSpec statefulSetSpec = statefulSet.getSpec();

	List<Container> statefulSetInitContainers = statefulSetSpec.getTemplate().getSpec().getInitContainers();
	assertEquals(1, statefulSetInitContainers.size());
	Container statefulSetInitContainer = statefulSetInitContainers.get(0);
	assertEquals(DeploymentPropertiesResolver.STATEFUL_SET_IMAGE_NAME, statefulSetInitContainer.getImage());

	Assertions.assertThat(statefulSetSpec.getPodManagementPolicy()).isEqualTo("Parallel");
	Assertions.assertThat(statefulSetSpec.getReplicas()).isEqualTo(3);
	Assertions.assertThat(statefulSetSpec.getServiceName()).isEqualTo(deploymentId);

	Assertions.assertThat(statefulSet.getMetadata().getName()).isEqualTo(deploymentId);

	Assertions.assertThat(statefulSetSpec.getSelector().getMatchLabels())
			.containsAllEntriesOf(deployer.createIdMap(deploymentId, appDeploymentRequest));
	Assertions.assertThat(statefulSetSpec.getSelector().getMatchLabels())
			.contains(entry(KubernetesAppDeployer.SPRING_MARKER_KEY, KubernetesAppDeployer.SPRING_MARKER_VALUE));

	Assertions.assertThat(statefulSetSpec.getTemplate().getMetadata().getLabels()).containsAllEntriesOf(idMap);
	Assertions.assertThat(statefulSetSpec.getTemplate().getMetadata().getLabels())
			.contains(entry(KubernetesAppDeployer.SPRING_MARKER_KEY, KubernetesAppDeployer.SPRING_MARKER_VALUE));

	Container container = statefulSetSpec.getTemplate().getSpec().getContainers().get(0);

	Assertions.assertThat(container.getName()).isEqualTo(deploymentId);
	Assertions.assertThat(container.getPorts().get(0).getContainerPort()).isEqualTo(8080);
	Assertions.assertThat(container.getImage()).isEqualTo(testApplication().getURI().getSchemeSpecificPart());

	PersistentVolumeClaim pvc = statefulSetSpec.getVolumeClaimTemplates().get(0);
	Assertions.assertThat(pvc.getMetadata().getName()).isEqualTo(deploymentId);

	PersistentVolumeClaimSpec pvcSpec = pvc.getSpec();
	Assertions.assertThat(pvcSpec.getAccessModes()).containsOnly("ReadWriteOnce");
	Assertions.assertThat(pvcSpec.getStorageClassName()).isNull();
	Assertions.assertThat(pvcSpec.getResources().getLimits().get("storage").getAmount()).isEqualTo("10Mi");
	Assertions.assertThat(pvcSpec.getResources().getRequests().get("storage").getAmount()).isEqualTo("10Mi");

	log.info("Undeploying {}...", deploymentId);
	timeout = undeploymentTimeout();
	appDeployer.undeploy(deploymentId);
	assertThat(deploymentId, eventually(hasStatusThat(
			Matchers.hasProperty("state", is(unknown))), timeout.maxAttempts, timeout.pause));
}
 
@Test
public void testCreateStatefulSetInitContainerImageNamePropOverride() throws Exception {
	Map<String, String> props = new HashMap<>();
	props.put(KubernetesAppDeployer.COUNT_PROPERTY_KEY, "3");
	props.put(KubernetesAppDeployer.INDEXED_PROPERTY_KEY, "true");

	String imageName = testApplication().getURI().getSchemeSpecificPart();

	props.put("spring.cloud.deployer.kubernetes.statefulSetInitContainerImageName", imageName);

	AppDefinition definition = new AppDefinition(randomName(), null);
	AppDeploymentRequest appDeploymentRequest = new AppDeploymentRequest(definition, testApplication(), props);

	KubernetesAppDeployer deployer = new KubernetesAppDeployer(new KubernetesDeployerProperties(),
			kubernetesClient);

	log.info("Deploying {}...", appDeploymentRequest.getDefinition().getName());
	String deploymentId = deployer.deploy(appDeploymentRequest);

	Timeout timeout = deploymentTimeout();
	assertThat(deploymentId, eventually(hasStatusThat(
			Matchers.hasProperty("state", is(deployed))), timeout.maxAttempts, timeout.pause));

	Map<String, String> selector = Collections.singletonMap(SPRING_APP_KEY, deploymentId);

	List<StatefulSet> statefulSets = kubernetesClient.apps().statefulSets().withLabels(selector).list().getItems();

	assertNotNull(statefulSets);
	assertEquals(1, statefulSets.size());

	StatefulSet statefulSet = statefulSets.get(0);

	StatefulSetSpec statefulSetSpec = statefulSet.getSpec();

	List<Container> statefulSetInitContainers = statefulSetSpec.getTemplate().getSpec().getInitContainers();
	assertEquals(1, statefulSetInitContainers.size());
	Container statefulSetInitContainer = statefulSetInitContainers.get(0);
	assertEquals(imageName, statefulSetInitContainer.getImage());

	log.info("Undeploying {}...", deploymentId);
	timeout = undeploymentTimeout();
	appDeployer.undeploy(deploymentId);
	assertThat(deploymentId, eventually(hasStatusThat(
			Matchers.hasProperty("state", is(unknown))), timeout.maxAttempts, timeout.pause));
}
 
@Test
public void createStatefulSetInitContainerImageNameGlobalOverride() throws Exception {
	Map<String, String> props = new HashMap<>();
	props.put(KubernetesAppDeployer.COUNT_PROPERTY_KEY, "3");
	props.put(KubernetesAppDeployer.INDEXED_PROPERTY_KEY, "true");

	AppDefinition definition = new AppDefinition(randomName(), null);
	AppDeploymentRequest appDeploymentRequest = new AppDeploymentRequest(definition, testApplication(), props);

	String imageName = testApplication().getURI().getSchemeSpecificPart();

	KubernetesDeployerProperties kubernetesDeployerProperties = new KubernetesDeployerProperties();
	kubernetesDeployerProperties.setStatefulSetInitContainerImageName(imageName);

	KubernetesAppDeployer deployer = new KubernetesAppDeployer(kubernetesDeployerProperties, kubernetesClient);

	log.info("Deploying {}...", appDeploymentRequest.getDefinition().getName());
	String deploymentId = deployer.deploy(appDeploymentRequest);

	Timeout timeout = deploymentTimeout();
	assertThat(deploymentId, eventually(hasStatusThat(
			Matchers.hasProperty("state", is(deployed))), timeout.maxAttempts, timeout.pause));

	Map<String, String> selector = Collections.singletonMap(SPRING_APP_KEY, deploymentId);

	List<StatefulSet> statefulSets = kubernetesClient.apps().statefulSets().withLabels(selector).list().getItems();

	assertNotNull(statefulSets);
	assertEquals(1, statefulSets.size());

	StatefulSet statefulSet = statefulSets.get(0);

	StatefulSetSpec statefulSetSpec = statefulSet.getSpec();

	List<Container> statefulSetInitContainers = statefulSetSpec.getTemplate().getSpec().getInitContainers();
	assertEquals(1, statefulSetInitContainers.size());
	Container statefulSetInitContainer = statefulSetInitContainers.get(0);
	assertEquals(imageName, statefulSetInitContainer.getImage());

	log.info("Undeploying {}...", deploymentId);
	timeout = undeploymentTimeout();
	appDeployer.undeploy(deploymentId);
	assertThat(deploymentId, eventually(hasStatusThat(
			Matchers.hasProperty("state", is(unknown))), timeout.maxAttempts, timeout.pause));
}
 
@Test
public void createStatefulSetWithOverridingRequest() throws Exception {
	Map<String, String> props = new HashMap<>();
	props.put(KubernetesAppDeployer.COUNT_PROPERTY_KEY, "3");
	props.put(KubernetesAppDeployer.INDEXED_PROPERTY_KEY, "true");
	props.put("spring.cloud.deployer.kubernetes.statefulSet.volumeClaimTemplate.storage", "1g");

	AppDefinition definition = new AppDefinition(randomName(), null);
	AppDeploymentRequest appDeploymentRequest = new AppDeploymentRequest(definition, testApplication(), props);

	KubernetesAppDeployer deployer = new KubernetesAppDeployer(new KubernetesDeployerProperties(),
			kubernetesClient);

	log.info("Deploying {}...", appDeploymentRequest.getDefinition().getName());
	String deploymentId = deployer.deploy(appDeploymentRequest);
	Map<String, String> idMap = deployer.createIdMap(deploymentId, appDeploymentRequest);

	Timeout timeout = deploymentTimeout();
	assertThat(deploymentId, eventually(hasStatusThat(
			Matchers.hasProperty("state", is(deployed))), timeout.maxAttempts, timeout.pause));

	Map<String, String> selector = Collections.singletonMap(SPRING_APP_KEY, deploymentId);

	StatefulSet statefulSet = kubernetesClient.apps().statefulSets().withLabels(selector).list().getItems().get(0);
	StatefulSetSpec statefulSetSpec = statefulSet.getSpec();

	Assertions.assertThat(statefulSetSpec.getPodManagementPolicy()).isEqualTo("Parallel");
	Assertions.assertThat(statefulSetSpec.getReplicas()).isEqualTo(3);
	Assertions.assertThat(statefulSetSpec.getServiceName()).isEqualTo(deploymentId);
	Assertions.assertThat(statefulSet.getMetadata().getName()).isEqualTo(deploymentId);

	Assertions.assertThat(statefulSetSpec.getSelector().getMatchLabels())
			.containsAllEntriesOf(deployer.createIdMap(deploymentId, appDeploymentRequest));
	Assertions.assertThat(statefulSetSpec.getSelector().getMatchLabels())
			.contains(entry(KubernetesAppDeployer.SPRING_MARKER_KEY, KubernetesAppDeployer.SPRING_MARKER_VALUE));

	Assertions.assertThat(statefulSetSpec.getTemplate().getMetadata().getLabels()).containsAllEntriesOf(idMap);
	Assertions.assertThat(statefulSetSpec.getTemplate().getMetadata().getLabels())
			.contains(entry(KubernetesAppDeployer.SPRING_MARKER_KEY, KubernetesAppDeployer.SPRING_MARKER_VALUE));

	Container container = statefulSetSpec.getTemplate().getSpec().getContainers().get(0);

	Assertions.assertThat(container.getName()).isEqualTo(deploymentId);
	Assertions.assertThat(container.getPorts().get(0).getContainerPort()).isEqualTo(8080);
	Assertions.assertThat(container.getImage()).isEqualTo(testApplication().getURI().getSchemeSpecificPart());

	PersistentVolumeClaim pvc = statefulSetSpec.getVolumeClaimTemplates().get(0);
	Assertions.assertThat(pvc.getMetadata().getName()).isEqualTo(deploymentId);

	PersistentVolumeClaimSpec pvcSpec = pvc.getSpec();
	Assertions.assertThat(pvcSpec.getAccessModes()).containsOnly("ReadWriteOnce");
	Assertions.assertThat(pvcSpec.getResources().getLimits().get("storage").getAmount()).isEqualTo("1Gi");
	Assertions.assertThat(pvcSpec.getResources().getRequests().get("storage").getAmount()).isEqualTo("1Gi");

	log.info("Undeploying {}...", deploymentId);
	timeout = undeploymentTimeout();
	appDeployer.undeploy(deploymentId);
	assertThat(deploymentId, eventually(hasStatusThat(
			Matchers.hasProperty("state", is(unknown))), timeout.maxAttempts, timeout.pause));
}
 
@Test
public void testDeploymentLabelsStatefulSet() {
	log.info("Testing {}...", "DeploymentLabelsForStatefulSet");
	Map<String, String> props = new HashMap<>();
	props.put(KubernetesAppDeployer.COUNT_PROPERTY_KEY, "2");
	props.put(KubernetesAppDeployer.INDEXED_PROPERTY_KEY, "true");
	props.put("spring.cloud.deployer.kubernetes.deploymentLabels",
			"stateful-label1:stateful-value1,stateful-label2:stateful-value2");
	AppDefinition definition = new AppDefinition(randomName(), null);
	AppDeploymentRequest appDeploymentRequest = new AppDeploymentRequest(definition, testApplication(), props);

	KubernetesAppDeployer deployer = new KubernetesAppDeployer(new KubernetesDeployerProperties(),
			kubernetesClient);

	log.info("Deploying {}...", appDeploymentRequest.getDefinition().getName());

	String deploymentId = deployer.deploy(appDeploymentRequest);

	Timeout timeout = deploymentTimeout();
	assertThat(deploymentId, eventually(hasStatusThat(
			Matchers.hasProperty("state", is(deployed))), timeout.maxAttempts, timeout.pause));
	
	Map<String, String> idMap = deployer.createIdMap(deploymentId, appDeploymentRequest);
	
	Map<String, String> selector = Collections.singletonMap(SPRING_APP_KEY, deploymentId);

	StatefulSet statefulSet = kubernetesClient.apps().statefulSets().withLabels(selector).list().getItems().get(0);
	StatefulSetSpec statefulSetSpec = statefulSet.getSpec();
	Assertions.assertThat(statefulSetSpec.getReplicas()).isEqualTo(2);
	Assertions.assertThat(statefulSetSpec.getTemplate().getMetadata().getLabels()).containsAllEntriesOf(idMap);

	//verify stateful set match labels
	Map<String, String> setLabels = statefulSet.getMetadata().getLabels();
	assertTrue("Label 'stateful-label1' not found in StatefulSet metadata", setLabels.containsKey("stateful-label1"));
	assertEquals("Unexpected value in stateful-set metadata label for stateful-label1", "stateful-value1", setLabels.get("stateful-label1"));
	assertTrue("Label 'stateful-label2' not found in StatefulSet metadata", setLabels.containsKey("stateful-label2"));
	assertEquals("Unexpected value in stateful-set metadata label for stateful-label2","stateful-value2", setLabels.get("stateful-label2"));
	
	//verify pod template labels
	Map<String, String> specLabels = statefulSetSpec.getTemplate().getMetadata().getLabels();
	assertTrue("Label 'stateful-label1' not found in template metadata", specLabels.containsKey("stateful-label1"));
	assertEquals("Unexpected value for statefulSet metadata stateful-label1", "stateful-value1", specLabels.get("stateful-label1"));
	assertTrue("Label 'stateful-label2' not found in statefulSet template", specLabels.containsKey("stateful-label2"));
	assertEquals("Unexpected value for statefulSet metadata stateful-label2", "stateful-value2", specLabels.get("stateful-label2"));
	
	//verify that labels got replicated to one of the deployments
	List<Pod> pods =  kubernetesClient.pods().withLabels(selector).list().getItems();
	Map<String, String> podLabels = pods.get(0).getMetadata().getLabels();

	assertTrue("Label 'stateful-label1' not found in podLabels", podLabels.containsKey("stateful-label1"));
	assertEquals("Unexpected value for podLabels stateful-label1", "stateful-value1", podLabels.get("stateful-label1"));
	assertTrue("Label 'stateful-label2' not found in podLabels", podLabels.containsKey("stateful-label2"));
	assertEquals("Unexpected value for podLabels stateful-label2", "stateful-value2", podLabels.get("stateful-label2"));

	log.info("Undeploying {}...", deploymentId);
	timeout = undeploymentTimeout();
	appDeployer.undeploy(deploymentId);
	assertThat(deploymentId, eventually(hasStatusThat(
			Matchers.hasProperty("state", is(unknown))), timeout.maxAttempts, timeout.pause));
}
 
@ParameterizedTest
@MethodSource("data")
public void testReconcileUpdatesKafkaStorageType(Params params, VertxTestContext context) {
    init(params);

    AtomicReference<List<PersistentVolumeClaim>> originalPVCs = new AtomicReference<>();
    AtomicReference<List<Volume>> originalVolumes = new AtomicReference<>();
    AtomicReference<List<Container>> originalInitContainers = new AtomicReference<>();

    Checkpoint async = context.checkpoint();
    initialReconcile(context)
        .onComplete(context.succeeding(v -> context.verify(() -> {
            originalPVCs.set(Optional.ofNullable(client.apps().statefulSets().inNamespace(NAMESPACE).withName(KafkaCluster.kafkaClusterName(CLUSTER_NAME)).get())
                    .map(StatefulSet::getSpec)
                    .map(StatefulSetSpec::getVolumeClaimTemplates)
                    .orElse(new ArrayList<>()));
            originalVolumes.set(Optional.ofNullable(client.apps().statefulSets().inNamespace(NAMESPACE).withName(KafkaCluster.kafkaClusterName(CLUSTER_NAME)).get())
                    .map(StatefulSet::getSpec)
                    .map(StatefulSetSpec::getTemplate)
                    .map(PodTemplateSpec::getSpec)
                    .map(PodSpec::getVolumes)
                    .orElse(new ArrayList<>()));
            originalInitContainers.set(Optional.ofNullable(client.apps().statefulSets().inNamespace(NAMESPACE).withName(KafkaCluster.kafkaClusterName(CLUSTER_NAME)).get())
                    .map(StatefulSet::getSpec)
                    .map(StatefulSetSpec::getTemplate)
                    .map(PodTemplateSpec::getSpec)
                    .map(PodSpec::getInitContainers)
                    .orElse(new ArrayList<>()));

            // Update the storage type
            // ephemeral -> persistent
            // or
            // persistent -> ephemeral
            Kafka changedClusterCm = null;
            if (kafkaStorage instanceof EphemeralStorage) {
                changedClusterCm = new KafkaBuilder(cluster)
                        .editSpec()
                            .editKafka()
                                .withNewPersistentClaimStorage()
                                    .withSize("123")
                                .endPersistentClaimStorage()
                            .endKafka()
                        .endSpec()
                        .build();
            } else if (kafkaStorage instanceof PersistentClaimStorage) {
                changedClusterCm = new KafkaBuilder(cluster)
                        .editSpec()
                            .editKafka()
                                .withNewEphemeralStorage()
                                .endEphemeralStorage()
                            .endKafka()
                        .endSpec()
                        .build();
            } else {
                context.failNow(new Exception("If storage is not ephemeral or persistent something has gone wrong"));
            }
            kafkaAssembly(NAMESPACE, CLUSTER_NAME).patch(changedClusterCm);

            LOGGER.info("Updating with changed storage type");
        })))
        .compose(v -> operator.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME)))
        .onComplete(context.succeeding(v -> context.verify(() -> {
            // Check the Volumes and PVCs were not changed
            assertPVCs(context, KafkaCluster.kafkaClusterName(CLUSTER_NAME), originalPVCs.get());
            assertVolumes(context, KafkaCluster.kafkaClusterName(CLUSTER_NAME), originalVolumes.get());
            assertInitContainers(context, KafkaCluster.kafkaClusterName(CLUSTER_NAME), originalInitContainers.get());
            async.flag();
        })));
}
 
/**
 * Create a StatefulSet
 *
 * @param request the {@link AppDeploymentRequest}
 */
protected void createStatefulSet(AppDeploymentRequest request) {

	String appId = createDeploymentId(request);

	int externalPort = getExternalPort(request);

	Map<String, String> idMap = createIdMap(appId, request);

	int replicas = getCountFromRequest(request);

	Map<String, String> kubernetesDeployerProperties = request.getDeploymentProperties();

	logger.debug(String.format("Creating StatefulSet: %s on %d with %d replicas", appId, externalPort, replicas));

	Map<String, Quantity> storageResource = Collections.singletonMap("storage",
			new Quantity(this.deploymentPropertiesResolver.getStatefulSetStorage(kubernetesDeployerProperties)));

	String storageClassName = this.deploymentPropertiesResolver.getStatefulSetStorageClassName(kubernetesDeployerProperties);

	PersistentVolumeClaimBuilder persistentVolumeClaimBuilder = new PersistentVolumeClaimBuilder().withNewSpec().
			withStorageClassName(storageClassName).withAccessModes(Collections.singletonList("ReadWriteOnce"))
			.withNewResources().addToLimits(storageResource).addToRequests(storageResource).endResources()
			.endSpec().withNewMetadata().withName(appId).withLabels(idMap)
			.addToLabels(SPRING_MARKER_KEY, SPRING_MARKER_VALUE).endMetadata();

	PodSpec podSpec = createPodSpec(request);

	podSpec.getVolumes().add(new VolumeBuilder().withName("config").withNewEmptyDir().endEmptyDir().build());

	podSpec.getContainers().get(0).getVolumeMounts()
		.add(new VolumeMountBuilder().withName("config").withMountPath("/config").build());

	String statefulSetInitContainerImageName = this.deploymentPropertiesResolver.getStatefulSetInitContainerImageName(kubernetesDeployerProperties);

	podSpec.getInitContainers().add(createStatefulSetInitContainer(statefulSetInitContainerImageName));
	
	Map<String, String> deploymentLabels=  this.deploymentPropertiesResolver.getDeploymentLabels(request.getDeploymentProperties());
	
	StatefulSetSpec spec = new StatefulSetSpecBuilder().withNewSelector().addToMatchLabels(idMap)
			.addToMatchLabels(SPRING_MARKER_KEY, SPRING_MARKER_VALUE).endSelector()
			.withVolumeClaimTemplates(persistentVolumeClaimBuilder.build()).withServiceName(appId)
			.withPodManagementPolicy("Parallel").withReplicas(replicas).withNewTemplate().withNewMetadata()
			.withLabels(idMap).addToLabels(SPRING_MARKER_KEY, SPRING_MARKER_VALUE).addToLabels(deploymentLabels)
			.endMetadata().withSpec(podSpec).endTemplate().build();

	StatefulSet statefulSet = new StatefulSetBuilder().withNewMetadata().withName(appId).withLabels(idMap)
			.addToLabels(SPRING_MARKER_KEY, SPRING_MARKER_VALUE).addToLabels(deploymentLabels).endMetadata().withSpec(spec).build();

	client.apps().statefulSets().create(statefulSet);
}