下面列出了io.fabric8.kubernetes.api.model.SecretVolumeSourceBuilder#io.fabric8.kubernetes.api.model.KeyToPath 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testPodWithHadoopConfVolume() throws IOException {
setHadoopConfDirEnv();
generateHadoopConfFileItems();
final FlinkPod resultFlinkPod = hadoopConfMountDecorator.decorateFlinkPod(baseFlinkPod);
final List<Volume> resultVolumes = resultFlinkPod.getPod().getSpec().getVolumes();
assertEquals(1, resultVolumes.size());
final Volume resultVolume = resultVolumes.get(0);
assertEquals(Constants.HADOOP_CONF_VOLUME, resultVolume.getName());
final ConfigMapVolumeSource resultVolumeConfigMap = resultVolume.getConfigMap();
assertEquals(HadoopConfMountDecorator.getHadoopConfConfigMapName(CLUSTER_ID),
resultVolumeConfigMap.getName());
final Map<String, String> expectedKeyToPaths = new HashMap<String, String>() {
{
put("hdfs-site.xml", "hdfs-site.xml");
put("core-site.xml", "core-site.xml");
}
};
final Map<String, String> resultKeyToPaths = resultVolumeConfigMap.getItems().stream()
.collect(Collectors.toMap(KeyToPath::getKey, KeyToPath::getPath));
assertEquals(expectedKeyToPaths, resultKeyToPaths);
}
@Test
public void testDecoratedFlinkPodWithoutLog4jAndLogback() {
final FlinkPod resultFlinkPod = flinkConfMountDecorator.decorateFlinkPod(baseFlinkPod);
final List<KeyToPath> expectedKeyToPaths = Collections.singletonList(
new KeyToPathBuilder()
.withKey(FLINK_CONF_FILENAME)
.withPath(FLINK_CONF_FILENAME)
.build());
final List<Volume> expectedVolumes = Collections.singletonList(
new VolumeBuilder()
.withName(Constants.FLINK_CONF_VOLUME)
.withNewConfigMap()
.withName(getFlinkConfConfigMapName(CLUSTER_ID))
.withItems(expectedKeyToPaths)
.endConfigMap()
.build());
assertEquals(expectedVolumes, resultFlinkPod.getPod().getSpec().getVolumes());
final List<VolumeMount> expectedVolumeMounts = Collections.singletonList(
new VolumeMountBuilder()
.withName(Constants.FLINK_CONF_VOLUME)
.withMountPath(FLINK_CONF_DIR_IN_POD)
.build());
assertEquals(expectedVolumeMounts, resultFlinkPod.getMainContainer().getVolumeMounts());
}
@Test
public void testDecoratedFlinkPodWithLog4j() throws IOException {
KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, "log4j.properties");
final FlinkPod resultFlinkPod = flinkConfMountDecorator.decorateFlinkPod(baseFlinkPod);
final List<KeyToPath> expectedKeyToPaths = Arrays.asList(
new KeyToPathBuilder()
.withKey("log4j.properties")
.withPath("log4j.properties")
.build(),
new KeyToPathBuilder()
.withKey(FLINK_CONF_FILENAME)
.withPath(FLINK_CONF_FILENAME)
.build());
final List<Volume> expectedVolumes = Collections.singletonList(
new VolumeBuilder()
.withName(Constants.FLINK_CONF_VOLUME)
.withNewConfigMap()
.withName(getFlinkConfConfigMapName(CLUSTER_ID))
.withItems(expectedKeyToPaths)
.endConfigMap()
.build());
assertEquals(expectedVolumes, resultFlinkPod.getPod().getSpec().getVolumes());
}
@Test
public void testDecoratedFlinkPodWithLogback() throws IOException {
KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, "logback.xml");
final FlinkPod resultFlinkPod = flinkConfMountDecorator.decorateFlinkPod(baseFlinkPod);
final List<KeyToPath> expectedKeyToPaths = Arrays.asList(
new KeyToPathBuilder()
.withKey("logback.xml")
.withPath("logback.xml")
.build(),
new KeyToPathBuilder()
.withKey(FLINK_CONF_FILENAME)
.withPath(FLINK_CONF_FILENAME)
.build());
final List<Volume> expectedVolumes = Collections.singletonList(
new VolumeBuilder()
.withName(Constants.FLINK_CONF_VOLUME)
.withNewConfigMap()
.withName(getFlinkConfConfigMapName(CLUSTER_ID))
.withItems(expectedKeyToPaths)
.endConfigMap()
.build());
assertEquals(expectedVolumes, resultFlinkPod.getPod().getSpec().getVolumes());
}
/**
* Creates a secret volume with given items
*
* @param name Name of the Volume
* @param secretName Name of the Secret
* @param items contents of the Secret
* @param isOpenshift true if underlying cluster OpenShift
* @return The Volume created
*/
public static Volume createSecretVolume(String name, String secretName, Map<String, String> items, boolean isOpenshift) {
String validName = getValidVolumeName(name);
int mode = 0444;
if (isOpenshift) {
mode = 0440;
}
List<KeyToPath> keysPaths = new ArrayList<>();
for (Map.Entry<String, String> item : items.entrySet()) {
KeyToPath keyPath = new KeyToPathBuilder()
.withNewKey(item.getKey())
.withNewPath(item.getValue())
.build();
keysPaths.add(keyPath);
}
SecretVolumeSource secretVolumeSource = new SecretVolumeSourceBuilder()
.withDefaultMode(mode)
.withSecretName(secretName)
.withItems(keysPaths)
.build();
Volume volume = new VolumeBuilder()
.withName(validName)
.withSecret(secretVolumeSource)
.build();
log.trace("Created secret Volume named '{}' with source secret '{}'", validName, secretName);
return volume;
}
private Pod decoratePod(Pod pod) {
final List<KeyToPath> keyToPaths = getLocalLogConfFiles().stream()
.map(file -> new KeyToPathBuilder()
.withKey(file.getName())
.withPath(file.getName())
.build())
.collect(Collectors.toList());
keyToPaths.add(new KeyToPathBuilder()
.withKey(FLINK_CONF_FILENAME)
.withPath(FLINK_CONF_FILENAME)
.build());
final Volume flinkConfVolume = new VolumeBuilder()
.withName(FLINK_CONF_VOLUME)
.withNewConfigMap()
.withName(getFlinkConfConfigMapName(kubernetesComponentConf.getClusterId()))
.withItems(keyToPaths)
.endConfigMap()
.build();
return new PodBuilder(pod)
.editSpec()
.addNewVolumeLike(flinkConfVolume)
.endVolume()
.endSpec()
.build();
}
@Test
public void testDecoratedFlinkPodWithLog4jAndLogback() throws IOException {
KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, "log4j.properties");
KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, "logback.xml");
final FlinkPod resultFlinkPod = flinkConfMountDecorator.decorateFlinkPod(baseFlinkPod);
final List<KeyToPath> expectedKeyToPaths = Arrays.asList(
new KeyToPathBuilder()
.withKey("logback.xml")
.withPath("logback.xml")
.build(),
new KeyToPathBuilder()
.withKey("log4j.properties")
.withPath("log4j.properties")
.build(),
new KeyToPathBuilder()
.withKey(FLINK_CONF_FILENAME)
.withPath(FLINK_CONF_FILENAME)
.build());
final List<Volume> expectedVolumes = Collections.singletonList(
new VolumeBuilder()
.withName(Constants.FLINK_CONF_VOLUME)
.withNewConfigMap()
.withName(getFlinkConfConfigMapName(CLUSTER_ID))
.withItems(expectedKeyToPaths)
.endConfigMap()
.build());
assertEquals(expectedVolumes, resultFlinkPod.getPod().getSpec().getVolumes());
}
@Override
public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
Volume hadoopConfVolume;
final Optional<String> existingConfigMap = kubernetesParameters.getExistingHadoopConfigurationConfigMap();
if (existingConfigMap.isPresent()) {
hadoopConfVolume = new VolumeBuilder()
.withName(Constants.HADOOP_CONF_VOLUME)
.withNewConfigMap()
.withName(existingConfigMap.get())
.endConfigMap()
.build();
} else {
final Optional<String> localHadoopConfigurationDirectory = kubernetesParameters.getLocalHadoopConfigurationDirectory();
if (!localHadoopConfigurationDirectory.isPresent()) {
return flinkPod;
}
final List<File> hadoopConfigurationFileItems = getHadoopConfigurationFileItems(localHadoopConfigurationDirectory.get());
if (hadoopConfigurationFileItems.isEmpty()) {
LOG.warn("Found 0 files in directory {}, skip to mount the Hadoop Configuration ConfigMap.",
localHadoopConfigurationDirectory.get());
return flinkPod;
}
final List<KeyToPath> keyToPaths = hadoopConfigurationFileItems.stream()
.map(file -> new KeyToPathBuilder()
.withKey(file.getName())
.withPath(file.getName())
.build())
.collect(Collectors.toList());
hadoopConfVolume = new VolumeBuilder()
.withName(Constants.HADOOP_CONF_VOLUME)
.withNewConfigMap()
.withName(getHadoopConfConfigMapName(kubernetesParameters.getClusterId()))
.withItems(keyToPaths)
.endConfigMap()
.build();
}
final Pod podWithHadoopConf = new PodBuilder(flinkPod.getPod())
.editOrNewSpec()
.addNewVolumeLike(hadoopConfVolume)
.endVolume()
.endSpec()
.build();
final Container containerWithHadoopConf = new ContainerBuilder(flinkPod.getMainContainer())
.addNewVolumeMount()
.withName(Constants.HADOOP_CONF_VOLUME)
.withMountPath(Constants.HADOOP_CONF_DIR_IN_POD)
.endVolumeMount()
.addNewEnv()
.withName(Constants.ENV_HADOOP_CONF_DIR)
.withValue(Constants.HADOOP_CONF_DIR_IN_POD)
.endEnv()
.build();
return new FlinkPod.Builder(flinkPod)
.withPod(podWithHadoopConf)
.withMainContainer(containerWithHadoopConf)
.build();
}